ref: d2a89e73c70057b57f3ad7e158645744df1f29b1
parent: 2fca1caa1935baf0473efc367e24dab57c52517d
author: iriri <[email protected]>
date: Sat Aug 18 12:29:08 EDT 2018
Add rwlocks. Updating the first three patches. Changes: - Copied all of the relevant timespec, futex, etc. changes to `/support/syscall-gen/`. - Added comment to FreeBSD and OS X condvar implementations about reacquiring the lock after being signalled. - Simplified the fallback condvar implementation. - The `ftxwait` timeout parameter is now a `std.time`. Negative values mean no timeout. - Replaced the fallback rwlock implementation with a much simpler one. - Added comments to the rwlock implementations and made the requested changes to the function names.
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -6,11 +6,13 @@
condvar.myr
mutex.myr
ncpu.myr
+ rwlock.myr
sem.myr
waitgrp.myr
# futex-based impls
mutex+futex.myr
+ rwlock+futex.myr
sem+futex.myr
waitgrp+futex.myr
--- /dev/null
+++ b/lib/thread/rwlock+futex.myr
@@ -1,0 +1,135 @@
+use std
+
+use "atomic"
+use "futex"
+
+pkg thread =
+ /*
+ The 31 low bits of `_state` contain either the number of readers
+ holding the lock, or `0x7fffffff` if the lock is held by a single
+ writer.
+
+ The high bit is set if there are any waiting readers or writers.
+ */
+ type rwlock = struct
+ _state : ftxtag
+ ;;
+
+ const mkrwlock : (-> rwlock)
+ const rdlock : (rw : rwlock# -> void)
+ const wrlock : (rw : rwlock# -> void)
+ const tryrdlock : (rw : rwlock# -> bool)
+ const trywrlock : (rw : rwlock# -> bool)
+ const rdunlock : (rw : rwlock# -> void)
+ const wrunlock : (rw : rwlock# -> void)
+;;
+
+const Nrmask = 0x7fffffff
+const Waitbit = 0x80000000
+
+const mkrwlock = {
+ -> [._state = 0]
+}
+
+const rdlock = {rw
+ for ; ;
+ var s = xget(&rw._state)
+ match s & Nrmask
+ | Nrmask - 1: std.die("error: rwlock overflowed\n")
+ | Nrmask:
+ /*
+ The lock is held by a writer so we attempt to CAS in
+ the wait bit and wait. If the CAS fails, the state of
+ the lock has changed so we can try once again to
+ acquire it.
+ */
+ if xcas(&rw._state, s, Nrmask | Waitbit) == s
+ ftxwait(&rw._state, Nrmask | Waitbit, 0)
+ ;;
+ | _:
+ /*
+ Otherwise the lock is either unlocked or held by some
+ number of readers. Either way we simply increment the
+ reader count via CAS.
+ */
+ if xcas(&rw._state, s, s + 1) == s
+ -> void
+ ;;
+ ;;
+ ;;
+}
+
+const wrlock = {rw
+ for ; ;
+ /*
+ `_state` must be 0 for a writer to acquire the lock. Anything
+ else means the lock is either held or in the process of being
+ released by the last reader.
+ */
+ var s = xcas(&rw._state, 0, Nrmask)
+ if s == 0
+ -> void
+ ;;
+
+ /*
+ If we fail to acquire the lock, attempt to CAS in the wait bit
+ and wait. It the CAS fails, the state of the lock has changed
+ so we can try once again to acquire it.
+ */
+ if xcas(&rw._state, s, s | Waitbit) == s
+ ftxwait(&rw._state, s | Waitbit, 0)
+ ;;
+ ;;
+}
+
+const tryrdlock = {rw
+ for ; ;
+ var s = xget(&rw._state)
+ match s & Nrmask
+ | Nrmask - 1: std.die("error: rwlock overflowed\n")
+ | Nrmask: -> false
+ | _:
+ if xcas(&rw._state, s, s + 1) == s
+ -> true
+ ;;
+ ;;
+ ;;
+ -> false /* Unreachable */
+}
+
+const trywrlock = {rw
+ -> xcas(&rw._state, 0, Nrmask) == 0
+}
+
+const rdunlock = {rw
+ /*
+ Only the last reader needs to potentially wake a writer so all other
+ readers can get away with simply decrementing the reader count via FAA.
+ */
+ var prev = xadd(&rw._state, -1)
+ std.assert(prev & Nrmask != 0, "error: rwlock underflowed\n")
+ if prev & Nrmask == 1 && prev & Waitbit != 0
+ /*
+ If there are one or more waiting writers and no other readers
+ have acquired the lock since we fetched the reader count, then
+ the value of `_state` is guaranteed to be `Waitbit`. If the CAS
+ succeeds, we wake one of those writers.
+ */
+ if xcas(&rw._state, Waitbit, 0) == Waitbit
+ ftxwake(&rw._state)
+ ;;
+ ;;
+}
+
+const wrunlock = {rw
+ /*
+ If the wait bit was set then there are one or more waiting readers,
+ writers, or both. In the first and third cases, we need to wake
+ everyone; in the second, we'd like to just wake one thread. However, we
+ currently have no way of knowing which case we're in so we always have
+ to wake everyone.
+ */
+ if xchg(&rw._state, 0) & Waitbit != 0
+ ftxwakeall(&rw._state)
+ ;;
+}
--- /dev/null
+++ b/lib/thread/rwlock.myr
@@ -1,0 +1,78 @@
+use "mutex"
+
+pkg thread =
+ /*
+ `_lock` grants exclusive access to either n readers or one writer.
+
+ `_rlock` protects `_rcount` and allows the first reader to attempt to
+ lock `_lockk` without letting other readers in.
+ */
+ type rwlock = struct
+ _lock : mutex
+ _rlock : mutex
+ _rcount : uint32
+ ;;
+
+ const mkrwlock : (-> rwlock)
+ const rdlock : (rw : rwlock# -> void)
+ const wrlock : (rw : rwlock# -> void)
+ const tryrdlock : (rw : rwlock# -> bool)
+ const trywrlock : (rw : rwlock# -> bool)
+ const rdunlock : (rw : rwlock# -> void)
+ const wrunlock : (rw : rwlock# -> void)
+;;
+
+const mkrwlock = {
+ -> [._lock = mkmtx(), ._rlock = mkmtx()]
+}
+
+const rdlock = {rw
+ mtxlock(&rw._rlock)
+
+ /*
+ The first reader either successfully acquires `_lock`, locking out all
+ writers, or blocks while holding `_rlock`, locking out all other
+ readers until it is able to acquire `_lock`, meaning that a writer has
+ released it.
+ */
+ if rw._rcount++ == 0
+ mtxlock(&rw._lock)
+ ;;
+ mtxunlock(&rw._rlock)
+}
+
+const wrlock = {rw
+ mtxlock(&rw._lock)
+}
+
+const tryrdlock = {rw
+ var rc = true
+ mtxlock(&rw._rlock)
+ if rw._rcount++ == 0
+ if !mtxtrylock(&rw._lock)
+ rw._rcount--
+ rc = false
+ ;;
+ ;;
+ mtxunlock(&rw._rlock)
+ -> rc
+}
+
+const trywrlock = {rw
+ -> mtxtrylock(&rw._lock)
+}
+
+const rdunlock = {rw
+ mtxlock(&rw._rlock)
+
+ /* `_lock` is not released until the last reader releases the lock. */
+ if --rw._rcount == 0
+ mtxunlock(&rw._lock)
+ ;;
+ mtxunlock(&rw._rlock)
+
+}
+
+const wrunlock = {rw
+ mtxunlock(&rw._lock)
+}
--- a/lib/thread/test/mutex.myr
+++ b/lib/thread/test/mutex.myr
@@ -6,19 +6,16 @@
const Nherd = 20
var val : uint64 = 0
-var done : uint32 = 0
var mtx : thread.mutex
+var done
const main = {
- done = 0
- val = 0
-
mtx = thread.mkmtx()
+ done = thread.mkwg(Nherd)
+
thrtestutil.mkherd(Nherd, incvar)
- while thread.xget(&done) != Nherd
- /* nothing */
- ;;
- if val != 1000 * 20
+ thread.wgwait(&done)
+ if val != 1000 * (Nherd : uint64)
std.fatal("mutexes are broken, got {}\n", val)
;;
}
@@ -29,5 +26,5 @@
val++
thread.mtxunlock(&mtx)
;;
- thread.xadd(&done, 1)
+ thread.wgpost(&done)
}
--- /dev/null
+++ b/lib/thread/test/rwlock.myr
@@ -1,0 +1,49 @@
+use std
+use thread
+
+use thrtestutil
+
+const Nherd = 20
+const Nloops = 100_000
+
+var val : uint64
+var nreaders : uint32
+var nwriters : uint32
+var rw
+var done
+
+const main = {
+ rw = thread.mkrwlock()
+ done = thread.mkwg(Nherd)
+
+ thrtestutil.mkherd(Nherd, read)
+ thrtestutil.mkherd(Nherd, incvar)
+ thread.wgwait(&done)
+ if val != Nloops * (Nherd : uint64)
+ std.fatal("rwlocks are broken, got {}\n", val)
+ ;;
+}
+
+const incvar = {
+ for var i = 0; i < Nloops; i++
+ thread.wrlock(&rw)
+ thread.xadd(&nwriters, 1)
+ std.assert(thread.xget(&nreaders) == 0, "incvar: rwlocks are broken\n")
+ val++
+ thread.xadd(&nwriters, -1)
+ thread.wrunlock(&rw)
+ ;;
+ thread.wgpost(&done)
+}
+
+const read = {
+ /* Linux seems to not want to end the process when there are still running threads. */
+ while thread.xget(&done._val) != 0
+ thread.rdlock(&rw)
+ thread.xadd(&nreaders, 1)
+ std.assert(thread.xget(&nwriters) == 0, "read: rwlocks are broken\n")
+ thread.xadd(&nreaders, -1)
+ thread.rdunlock(&rw)
+ std.usleep(1000)
+ ;;
+}