ref: 77c74d43d5540fd3bb3915d45d786986827ef0f8
parent: e841ef7c45f5ed30368c5070d52e1cf5912a1a41
author: Ori Bernstein <[email protected]>
date: Tue Sep 15 20:11:40 EDT 2015
Add mutex implementation.
--- a/lib/thread/atomic-impl+x64.s
+++ b/lib/thread/atomic-impl+x64.s
@@ -34,11 +34,19 @@
thread$xcas32:
movl %esi, %eax
lock cmpxchgl %edx, (%rdi)
- sete %al
ret
.globl thread$xcas64
thread$xcas64:
movq %rsi, %rax
lock cmpxchgq %rdx, (%rdi)
- sete %al
+ ret
+.globl thread$xchg32
+thread$xchg32:
+ movl %esi, %eax
+ lock xchgl (%rdi), %eax
+ ret
+.globl thread$xchg64
+thread$xchg64:
+ movq %rsi, %rax
+ lock xchgq (%rdi), %rax
ret
--- a/lib/thread/atomic.myr
+++ b/lib/thread/atomic.myr
@@ -6,7 +6,8 @@
xset : (p : @a#, v : @a -> void)
xadd : (p : @a#, v : @a -> @a)
xsub : (p : @a#, v : @a -> @a)
- xcas : (p : @a#, old : @a, new : @a -> bool)
+ xcas : (p : @a#, old : @a, new : @a -> @a)
+ xchg : (p : @a#, new : @a -> @a)
;;
impl atomic int32
@@ -20,7 +21,8 @@
xset = {p, v; xset32(p castto(uint32#), v castto(uint32))}
xadd = {p, v; -> xadd32(p castto(uint32#), v castto(uint32)) castto(int32)}
xsub = {p, v; -> xsub32(p castto(uint32#), v castto(uint32)) castto(int32)}
- xcas = {p, old, new; -> xcas32(p castto(uint32#), old castto(uint32), new castto(uint32))}
+ xcas = {p, old, new; -> xcas32(p castto(uint32#), old castto(uint32), new castto(uint32)) castto(int32)}
+ xchg = {p, v; -> xchg32(p castto(uint32#), v castto(uint32)) castto(int32)}
;;
@@ -29,7 +31,8 @@
xset = {p, v; xset64(p castto(uint64#), v castto(uint64))}
xadd = {p, v; -> xadd64(p castto(uint64#), v castto(uint64)) castto(int64)}
xsub = {p, v; -> xsub64(p castto(uint64#), v castto(uint64)) castto(int64)}
- xcas = {p, old, new; -> xcas64(p castto(uint64#), old castto(uint64), new castto(uint64))}
+ xcas = {p, old, new; -> xcas64(p castto(uint64#), old castto(uint64), new castto(uint64)) castto(int64)}
+ xchg = {p, v; -> xchg64(p castto(uint64#), v castto(uint64)) castto(int64)}
;;
impl atomic uint32 =
@@ -38,6 +41,7 @@
xadd = {p, v; -> xadd32(p, v)}
xsub = {p, v; -> xsub32(p, v)}
xcas = {p, old, new; -> xcas32(p, old, new)}
+ xchg = {p, v; -> xchg32(p, v)}
;;
@@ -47,6 +51,7 @@
xadd = {p, v; -> xadd64(p, v)}
xsub = {p, v; -> xsub64(p, v)}
xcas = {p, old, new; -> xcas64(p, old, new)}
+ xchg = {p, v; -> xchg64(p, v)}
;;
extern const xget32 : (p : uint32# -> uint32)
@@ -61,5 +66,8 @@
extern const xsub32 : (p : uint32#, v : uint32 -> uint32)
extern const xsub64 : (p : uint64#, v : uint64 -> uint64)
-extern const xcas32 : (p : uint32#, old: uint32, new : uint32 -> bool)
-extern const xcas64 : (p : uint64#, old: uint64, new : uint64 -> bool)
+extern const xcas32 : (p : uint32#, old: uint32, new : uint32 -> uint32)
+extern const xcas64 : (p : uint64#, old: uint64, new : uint64 -> uint64)
+
+extern const xchg32 : (p : uint32#, v : uint32 -> uint32)
+extern const xchg64 : (p : uint64#, v : uint64 -> uint64)
--- a/lib/thread/bld.proj
+++ b/lib/thread/bld.proj
@@ -1,4 +1,5 @@
lib thread =
+ mutex+linux.myr
spawn+linux.myr
atomic-impl+x64.s
atomic.myr
--- /dev/null
+++ b/lib/thread/mutex+linux.myr
@@ -1,0 +1,79 @@
+use std
+use sys
+
+use "atomic.use"
+
+pkg thread =
+ type mutex = struct
+ _state : int32
+ ;;
+
+ const mkmtx : (-> mutex)
+ const mtxlock : (mtx : mutex# -> void)
+ const mtxtrylock : (mtx : mutex# -> bool)
+ const mtxunlock : (mtx : mutex# -> void)
+;;
+
+const Unlocked = 0
+const Locked = 1
+const Sleep = 2
+generic Zptr = 0 castto(@a#)
+var nspin = 1 /* FIXME: pick a sane number, based on CPU count */
+
+const mkmtx = {
+ -> [._state = Unlocked]
+}
+
+const mtxlock = {mtx
+ var c
+
+ /* uncontended case: we get an unlocked mutex, and we lock it */
+ for var i = 0; i < nspin; i++
+ c = xcas(&mtx._state, Unlocked, Locked)
+ if c == Unlocked
+ ->
+ ;;
+ relax()
+ ;;
+
+ /* contended: we set the lock _state to sleep */
+ if c == Locked
+ c = xchg(&mtx._state, Sleep)
+ ;;
+
+ while c != Unlocked
+ sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, Sleep, Zptr, Zptr, 0)
+ c = xchg(&mtx._state, 2)
+ ;;
+}
+
+const mtxtrylock = {mtx
+ -> xcas(&mtx._state, Unlocked, Locked) == Unlocked
+}
+
+const mtxunlock = {mtx
+ var r
+
+ /* uncontended sleep means we can just unlock and move on */
+ if mtx._state == Sleep
+ mtx._state = Unlocked
+ elif xchg(&mtx._state, Unlocked) == Locked
+ ->
+ ;;
+
+ for var i = 0; i < nspin; i++
+ if mtx._state != Unlocked
+ /* there might have been waiters, but we set the _state to unlocked */
+ if xcas(&mtx._state, Locked, Sleep) == Sleep
+ ->
+ ;;
+ ;;
+ relax()
+ ;;
+
+ r = sys.futex(&mtx._state, sys.Futexwake | sys.Futexpriv, Locked, Zptr, Zptr, 0)
+
+}
+
+const relax = {
+}
--- a/lib/thread/smoketest.myr
+++ b/lib/thread/smoketest.myr
@@ -4,8 +4,9 @@
var val : uint64 = 0
var done : uint32 = 0
+var mtx : thread.mutex
-const setvar = {
+const atomicincvar = {
var i
for i = 0; i < 10_000_000; i++
@@ -15,10 +16,10 @@
thread.xset(&done, 1)
}
-const main = {
+const atomictest = {
var i
- match thread.spawn(setvar)
+ match thread.spawn(atomicincvar)
| `std.Ok tid:
for i = 0; i < 100_000; i++
thread.xadd(&val, 1)
@@ -26,8 +27,49 @@
while thread.xget(&done) == 0
/* nothing */
;;
- std.assert(val == 10_100_000, "atomics are broken")
+ std.assert(val == 10_100_000, "atomics are broken\n")
| `std.Fail err:
std.fatal("errno = {}\n", err)
;;
+}
+
+const mtxincvar = {
+ var i
+
+ for i = 0; i < 1_000_000; i++
+ thread.mtxlock(&mtx)
+ val++
+ thread.mtxunlock(&mtx)
+ ;;
+ std.write(1, "done\n")
+ thread.xset(&done, 1)
+}
+
+const mtxtest = {
+ var i
+
+ mtx = thread.mkmtx()
+ match thread.spawn(mtxincvar)
+ | `std.Ok tid:
+ for i = 0; i < 1_000_000; i++
+ thread.mtxlock(&mtx)
+ val++
+ thread.mtxunlock(&mtx)
+ ;;
+ while thread.xget(&done) == 0
+ /* nothing */
+ ;;
+ if val != 2_000_000
+ std.fatal("mutexes are broken, got {}\n", val)
+ ;;
+ | `std.Fail err:
+ std.fatal("errno = {}\n", err)
+ ;;
+}
+
+const main = {
+ val = 0
+ atomictest()
+ val = 0
+ mtxtest()
}