ref: 4efbb423950483a2deec1384c732a5d34fe639c7
parent: 03220ea343818819d6c155bd83e609ca2282a6e4
author: iriri <[email protected]>
date: Mon Jul 23 19:20:20 EDT 2018
Subject: [PATCH 2/2] Add/fix condvar implementations.
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -1,18 +1,21 @@
lib thread =
common.myr
hookstd.myr # install thread hooks
+
+ # generic fallbacks
+ condvar.myr
+ mutex.myr
+ ncpu.myr
+ sem.myr
+ waitgrp.myr
+
+ # futex-based impls
mutex+futex.myr
sem+futex.myr
waitgrp+futex.myr
- mutex.myr # fallback, for unimplemented platforms
- sem.myr # fallback, for unimplemented platforms
- waitgrp.myr # fallback, for unimplemented platforms
- #generic fallbacks
- ncpu.myr
-
# linux impl of basic thread primitives
- #condvar+linux.myr
+ condvar+linux.myr
exit+linux-x64.s
futex+linux.myr
ncpu+linux.myr
@@ -19,7 +22,7 @@
spawn+linux.myr
# freebsd impl of thread primitives
- #condvar+freebsd.myr
+ condvar+freebsd.myr
exit+freebsd-x64.s
futex+freebsd.myr
ncpu+freebsd.myr
@@ -33,7 +36,7 @@
#exit+netbsd-x64.s
# osx impl of thread primitives
- #condvar+osx.myr
+ condvar+osx.myr
futex+osx.myr
spawn+osx.myr
start+osx-x64.s
@@ -47,6 +50,7 @@
spawn+plan9.myr
# openbsd impl of thread primitives
+ condvar+openbsd:6.2.myr
exit+openbsd-x64.s
futex+openbsd:6.2.myr
ncpu+openbsd.myr
--- a/lib/thread/condvar+freebsd.myr
+++ b/lib/thread/condvar+freebsd.myr
@@ -1,14 +1,14 @@
use std
-use sys
use "atomic"
use "common"
use "mutex"
+use "futex"
pkg thread =
type cond = struct
_mtx : mutex#
- _seq : uint32
+ _seq : ftxtag
;;
const mkcond : (mtx : mutex# -> cond)
@@ -27,33 +27,28 @@
mtx = cond._mtx
seq = cond._seq
-
mtxunlock(mtx)
- sys.umtx_op((&cond._seq : void#), \
- sys.Umtxwaituintpriv, \
- (seq : uint64), \
- Zptr, Zptr)
/*
- We need to atomically set the mutex to contended. This allows us to
- pass responsibility for waking up the potential other waiters on to the
- unlocker of the mutex.
+ FIXME?: `ftxwait` can be interrupted but `condwait` should always be
+ done in a loop anyway.
*/
- while xchg(&mtx._state, Contended) != Unlocked
- sys.umtx_op((&mtx._state : void#), \
- sys.Umtxwaituintpriv, \
- (Contended : uint64), \
- Zptr, Zptr)
- ;;
+ ftxwait(&cond._seq, seq, Zptr)
+
+ mtxlock(mtx)
}
const condsignal = {cond : cond#
xadd(&cond._seq, 1)
- sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 1, Zptr, Zptr)
+ ftxwake(&cond._seq)
}
+/*
+`umtx_op` fully supports implementing condvars efficiently but also requires
+condvars to be implemented in a specific way. For now we'll just invite the
+thundering herd.
+*/
const condbroadcast = {cond : cond#
xadd(&cond._seq, 1)
- sys.umtx_op((&cond._seq : void#), sys.Umtxwakepriv, 0x7ffffff, Zptr, Zptr)
+ ftxwakeall(&cond._seq)
}
-
--- a/lib/thread/condvar+linux.myr
+++ b/lib/thread/condvar+linux.myr
@@ -27,8 +27,12 @@
mtx = cond._mtx
seq = cond._seq
-
mtxunlock(mtx)
+
+ /*
+ FIXME?: `futex` can be interrupted but `condwait` should always be done
+ in a loop anyway.
+ */
sys.futex(&cond._seq, sys.Futexwait | sys.Futexpriv, seq, Zptr, Zptr, 0)
/*
@@ -36,10 +40,7 @@
pass responsibility for waking up the potential other waiters on to the
unlocker of the mutex.
*/
- while xchg(&mtx._state, Contended) != Unlocked
- sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, \
- Contended, Zptr, Zptr, 0)
- ;;
+ mtxcontended(mtx)
}
const condsignal = {cond : cond#
@@ -54,8 +55,7 @@
used for the number of threads to move, and is not ignored when
requeueing
*/
- sys.futex(&cond._seq, sys.Futexcmprequeue | sys.Futexpriv, \
- 1, (0x7fffffff : sys.timespec#), \
- &cond._mtx._state, cond._seq)
+ sys.futex(&cond._seq, sys.Futexrequeue | sys.Futexpriv,
+ 1, (0x7fffffff : sys.timespec#),
+ (&cond._mtx._state : int32#), 0)
}
-
--- /dev/null
+++ b/lib/thread/condvar+openbsd:6.2.myr
@@ -1,0 +1,56 @@
+use std
+use sys
+
+use "atomic"
+use "common"
+use "mutex"
+
+pkg thread =
+ type cond = struct
+ _mtx : mutex#
+ _seq : uint32
+ ;;
+
+ const mkcond : (mtx : mutex# -> cond)
+ const condwait : (cond : cond# -> void)
+ const condsignal : (cond : cond# -> void)
+ const condbroadcast : (cond : cond# -> void)
+;;
+
+const mkcond = {mtx
+ -> [._mtx = mtx, ._seq = 0]
+}
+
+const condwait = {cond
+ var seq
+ var mtx
+
+ mtx = cond._mtx
+ seq = cond._seq
+ mtxunlock(mtx)
+
+ /*
+ FIXME?: `futex` can be interrupted but `condwait` should always be done
+ in a loop anyway.
+ */
+ sys.futex(&cond._seq, sys.Futexwait, seq, Zptr, Zptr)
+
+ /*
+ We need to atomically set the mutex to contended. This allows us to
+ pass responsibility for waking up the potential other waiters on to the
+ unlocker of the mutex.
+ */
+ mtxcontended(mtx)
+}
+
+const condsignal = {cond : cond#
+ xadd(&cond._seq, 1)
+ sys.futex(&cond._seq, sys.Futexwake, 1, Zptr, Zptr)
+}
+
+const condbroadcast = {cond : cond#
+ xadd(&cond._seq, 1)
+ sys.futex(&cond._seq, sys.Futexrequeue, 1,
+ (0x7fffffff : sys.timespec#),
+ (&cond._mtx._state : uint32#))
+}
--- /dev/null
+++ b/lib/thread/condvar+osx.myr
@@ -1,0 +1,53 @@
+use std
+
+use "atomic"
+use "common"
+use "mutex"
+use "futex"
+
+pkg thread =
+ type cond = struct
+ _mtx : mutex#
+ _seq : ftxtag
+ ;;
+
+ const mkcond : (mtx : mutex# -> cond)
+ const condwait : (cond : cond# -> void)
+ const condsignal : (cond : cond# -> void)
+ const condbroadcast : (cond : cond# -> void)
+;;
+
+const mkcond = {mtx
+ -> [._mtx = mtx, ._seq = 0]
+}
+
+const condwait = {cond
+ var seq
+ var mtx
+
+ mtx = cond._mtx
+ seq = cond._seq
+ mtxunlock(mtx)
+
+ /*
+ FIXME?: `ftxwait` can be interrupted but `condwait` should always be
+ done in a loop anyway.
+ */
+ ftxwait(&cond._seq, seq, Zptr)
+
+ mtxlock(mtx)
+}
+
+const condsignal = {cond : cond#
+ xadd(&cond._seq, 1)
+ ftxwake(&cond._seq)
+}
+
+/*
+Yes, this invites the thundering herd but that's what OS X gets for not having
+a requeue operation.
+*/
+const condbroadcast = {cond : cond#
+ xadd(&cond._seq, 1)
+ ftxwakeall(&cond._seq)
+}
--- /dev/null
+++ b/lib/thread/condvar.myr
@@ -1,0 +1,87 @@
+use "atomic"
+use "common"
+use "mutex"
+use "sem"
+
+pkg thread =
+ type cond = struct
+ _mtx : mutex#
+ _waitq : condwaiter#
+ _lock : mutex
+ ;;
+
+ const mkcond : (mtx : mutex# -> cond)
+ const condwait : (cond : cond# -> void)
+ const condsignal : (cond : cond# -> void)
+ const condbroadcast : (cond : cond# -> void)
+;;
+
+/*
+The waitqueue is a doubly-linked list because we'll need to remove waiters from
+anywhere in the list when we add timeout support.
+
+`cond._waitq.prev` is the tail of the queue.
+*/
+type condwaiter = struct
+ next : condwaiter#
+ prev : condwaiter#
+ sem : sem
+;;
+
+const mkcond = {mtx
+ -> [._mtx = mtx, ._lock = mkmtx()]
+}
+
+const condwait = {cond
+ var mtx = cond._mtx
+ var lock = &cond._lock
+ var waiter = [.sem = mksem(0)]
+
+ mtxlock(lock)
+ match cond._waitq
+ | Zptr:
+ waiter.prev = &waiter
+ cond._waitq = &waiter
+ | q:
+ waiter.prev = q.prev
+ waiter.prev.next = &waiter
+ q.prev = &waiter
+ ;;
+
+ mtxunlock(lock)
+ mtxunlock(mtx)
+ semwait(&waiter.sem)
+
+ mtxlock(mtx)
+}
+
+const condsignal = {cond
+ var lock = &cond._lock
+
+ mtxlock(lock)
+ var head = cond._waitq
+ if head != Zptr
+ if head.next != Zptr
+ head.next.prev = head.prev
+ ;;
+ cond._waitq = head.next
+ sempost(&head.sem)
+ ;;
+ mtxunlock(lock)
+}
+
+/*
+Yes, this invites the thundering herd but that's what you get for not
+supporting futexes at all.
+*/
+const condbroadcast = {cond
+ var lock = &cond._lock
+ var head = Zptr
+
+ mtxlock(lock)
+ while (head = cond._waitq) != Zptr
+ cond._waitq = head.next
+ sempost(&head.sem)
+ ;;
+ mtxunlock(lock)
+}
--- a/lib/thread/mutex+futex.myr
+++ b/lib/thread/mutex+futex.myr
@@ -12,11 +12,13 @@
const mtxtrylock : (mtx : mutex# -> bool)
const mtxunlock : (mtx : mutex# -> void)
- pkglocal const Unlocked = 0
- pkglocal const Locked = 1
- pkglocal const Contended = 2
+ pkglocal const mtxcontended : (mtx : mutex# -> void)
;;
+const Unlocked = 0
+const Locked = 1
+const Contended = 2
+
var nspin = 10 /* FIXME: pick a sane number, based on CPU count */
const mkmtx = {
@@ -38,9 +40,9 @@
;;
/*
- Contended case: we set the lock state to Contended. This indicates that there
- the lock is locked, and we potentially have threads waiting on it, which means
- that we will need to wake them up.
+ Contended case: we set the lock state to Contended. This indicates that
+ the lock is locked, and we potentially have threads waiting on it,
+ which means that we will need to wake them up.
*/
if c == Locked
c = xchg(&mtx._state, Contended)
@@ -70,4 +72,10 @@
/* wake one thread */
ftxwake(&mtx._state)
+}
+
+const mtxcontended = {mtx
+ while xchg(&mtx._state, Contended) != Unlocked
+ ftxwait(&mtx._state, Contended, Zptr)
+ ;;
}
--- a/lib/thread/test/condvar.myr
+++ b/lib/thread/test/condvar.myr
@@ -1,55 +1,44 @@
use std
use thread
-use "util"
+use thrtestutil
const Nwakes = 1000
var cv
+var cv1
var mtx
var val
-var done : int32
+var ready
var nwoken : int32
-var nready : int32
-var locked : int32
const main = {
- done = 0
+ ready = thread.mkwg(2)
val = 123
mtx = thread.mkmtx()
cv = thread.mkcond(&mtx)
+ cv1 = thread.mkcond(&mtx)
thread.spawn(cvwait)
thread.spawn(cvwake)
- while done == 0
- /* nothing */
- ;;
+ thread.wgwait(&ready)
std.assert(nwoken == Nwakes, "wrong number of wakes")
- std.assert(val == 123, "wrong val after all are done")
+ ready = thread.mkwg(100)
nwoken = 0
- nready = 0
- mkherd(100, cvwaitonce)
+ thrtestutil.mkherd(100, cvwaitonce)
- /* wait until the herd is ready */
- while nready != 100 /* 0 to 99 */
- /* nothing */
- ;;
- while locked == 0
- /* nothing */
- ;;
- thread.condbroadcast(&cv)
+ thread.wgwait(&ready)
while nwoken != 100
- /* nothing */
+ thread.condbroadcast(&cv)
;;
- std.assert(nwoken == 100, "wrong thread count woken")
-
}
const cvwait = {
for var i = 0; i < Nwakes; i++
thread.mtxlock(&mtx)
+ thread.condsignal(&cv1)
thread.condwait(&cv)
std.assert(val == 456, "wrong val after signal\n")
val = 123
@@ -57,9 +46,8 @@
thread.xadd(&nwoken, 1)
;;
- val = 123
- thread.xadd(&done, 1)
-
+ thread.condsignal(&cv1)
+ thread.wgpost(&ready)
}
const cvwake = {
@@ -66,20 +54,20 @@
while true
thread.mtxlock(&mtx)
val = 456
+ thread.condsignal(&cv)
+ thread.condwait(&cv1)
thread.mtxunlock(&mtx)
- thread.condsignal(&cv)
if nwoken >= Nwakes
break
;;
;;
+ thread.wgpost(&ready)
}
const cvwaitonce = {
- thread.xadd(&nready, 1)
-
thread.mtxlock(&mtx)
- thread.xadd(&locked, 1)
+ thread.wgpost(&ready)
thread.condwait(&cv)
thread.mtxunlock(&mtx)