ref: b92c74cab80c4c1fa912d76422d5c83fcad308de
parent: 1b5b7ee4b7a10b39835369ac12462ef2231b6f2b
author: Ori Bernstein <[email protected]>
date: Fri Sep 18 15:34:46 EDT 2015
Add futex-based future implementation.
--- a/lib/thread/bld.proj
+++ b/lib/thread/bld.proj
@@ -1,10 +1,11 @@
lib thread =
- future.myr
+ common.myr
# linux impl of basic thread primitives
condvar+linux.myr
mutex+linux.myr
spawn+linux.myr
+ future+linux.myr
atomic-impl+x64.s
atomic.myr
--- /dev/null
+++ b/lib/thread/common.myr
@@ -1,0 +1,5 @@
+use std
+
+pkg thread =
+ generic Zptr = 0 castto(@a#)
+;;
--- a/lib/thread/condvar+linux.myr
+++ b/lib/thread/condvar+linux.myr
@@ -2,6 +2,7 @@
use sys
use "atomic.use"
+use "common.use"
use "mutex.use"
pkg thread =
@@ -15,8 +16,6 @@
const condsignal : (cond : cond# -> void)
const condbroadcast : (cond : cond# -> void)
;;
-
-generic Zptr = 0 castto(@a#)
const mkcond = {mtx
-> [._mtx = mtx, ._seq = 0]
--- /dev/null
+++ b/lib/thread/future+linux.myr
@@ -1,0 +1,48 @@
+use std
+use sys
+
+use "atomic.use"
+use "common.use"
+
+pkg thread =
+ type future(@a) = struct
+ _state : int32
+ _val : @a
+ ;;
+
+ generic mkfut : (-> future(@a))
+ generic futset : (fut : future(@a)#, val : @a -> bool)
+ generic futget : (fut : future(@a)# -> @a)
+ generic futtryget : (fut : future(@a)# -> std.option(@a))
+;;
+
+const Clear = 0
+const Setting = 1
+const Set = 2
+
+generic mkfut = {
+ -> [._state = Clear ]
+}
+
+generic futset = {fut, val
+ /* If we don't get a clear, we failed to set the value */
+ if xcas(&fut._state, Clear, Setting) != Clear
+ -> false
+ ;;
+ fut._val = val
+ fut._state = Set
+ sys.futex(&fut._state, sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0)
+ -> true
+}
+
+generic futget = {fut
+ var st
+
+ /* we can transition from Set to Setting. */
+ st = fut._state
+ while st != Set
+ sys.futex(&fut._state, sys.Futexwait | sys.Futexpriv, st, Zptr, Zptr, 0)
+ st = fut._state
+ ;;
+ -> fut._val
+}
--- /dev/null
+++ b/lib/thread/future.myr
@@ -1,0 +1,63 @@
+use std
+
+use "mutex.use"
+
+pkg thread =
+ type future(@a) = struct
+ mtx : mutex
+ set : bool
+ val : @a
+ ;;
+
+ generic mkfut : (-> future(@a))
+ generic futset : (fut : future(@a)#, val : @a -> bool)
+ generic futget : (fut : future(@a)# -> @a)
+ generic futtryget : (fut : future(@a)# -> std.option(@a))
+ generic futclear : (fut : future(@a)# -> void)
+;;
+
+const Unset = 0
+const Waiting = 1
+const Set = 2
+
+generic mkfut = {
+ var fut
+
+ fut = [.mtx = mkmtx() ]
+ mtxlock(&fut.mtx)
+ -> fut
+}
+
+generic futset = {fut, val
+ if fut.set
+ -> false
+ ;;
+ /* compiler doesn't reorder shit */
+ fut.val = val
+ fut.set = true
+ mtxunlock(&fut.mtx)
+ -> true
+}
+
+generic futtryget = {fut
+ var val
+
+ if !fut.set
+ -> `std.None
+ ;;
+ mtxlock(&fut.mtx)
+ val = fut.val
+ mtxunlock(&fut.mtx)
+ -> `std.Some val
+}
+
+generic futget = {fut
+ var val
+
+ mtxlock(&fut.mtx)
+ val = fut.val
+ mtxunlock(&fut.mtx)
+ -> val
+}
+
+
--- a/lib/thread/mutex+linux.myr
+++ b/lib/thread/mutex+linux.myr
@@ -2,6 +2,7 @@
use sys
use "atomic.use"
+use "common.use"
pkg thread =
type mutex = struct
@@ -18,7 +19,6 @@
pkglocal const Contended = 2
;;
-generic Zptr = 0 castto(@a#)
var nspin = 10 /* FIXME: pick a sane number, based on CPU count */
const mkmtx = {
--- /dev/null
+++ b/lib/thread/test/future.myr
@@ -1,0 +1,51 @@
+use std
+use sys
+use thread
+
+use "test/util.use"
+
+var fut
+var nready : int32
+var ndone : int32
+
+const main = {
+ nready = 0
+ ndone = 0
+ fut = thread.mkfut()
+ /* set after we have some waiters */
+ mkherd(100, getfuture)
+ while nready != 100
+ /* spin */
+ ;;
+ std.put("done waiting for ready\n")
+ std.assert(ndone == 0, "thread proceeded too soon\n")
+ thread.futset(&fut, 666)
+ std.assert(thread.futset(&fut, 1) == false, "double set future\n")
+ while ndone != 100
+ std.put("ndone: {}\n", ndone)
+ /* spin */
+ ;;
+ std.put("double set future ok")
+ /* start up a few more to make sure we can still read */
+ mkherd(50, getfuture)
+ while ndone != 150
+ /* spin */
+ ;;
+
+
+ /* set ahead of time */
+ ndone = 0
+ fut = thread.mkfut()
+ thread.futset(&fut, 666)
+ std.assert(thread.futset(&fut, 666) == false, "double set future\n")
+ mkherd(100, getfuture)
+ while ndone != 100
+ /* spin */
+ ;;
+}
+
+const getfuture = {
+ thread.xadd(&nready, 1)
+ std.assert(thread.futget(&fut) == 666, "wrong value gotten from future")
+ thread.xadd(&ndone, 1)
+}