shithub: mc

Download patch

ref: b92c74cab80c4c1fa912d76422d5c83fcad308de
parent: 1b5b7ee4b7a10b39835369ac12462ef2231b6f2b
author: Ori Bernstein <[email protected]>
date: Fri Sep 18 15:34:46 EDT 2015

Add futex-based future implementation.

--- a/lib/thread/bld.proj
+++ b/lib/thread/bld.proj
@@ -1,10 +1,11 @@
 lib thread =
-	future.myr
+	common.myr
 
 	# linux impl of basic thread primitives
 	condvar+linux.myr
 	mutex+linux.myr
 	spawn+linux.myr
+	future+linux.myr
 
 	atomic-impl+x64.s
 	atomic.myr
--- /dev/null
+++ b/lib/thread/common.myr
@@ -1,0 +1,5 @@
+use std
+
+pkg thread = 
+	generic Zptr = 0 castto(@a#)
+;;
--- a/lib/thread/condvar+linux.myr
+++ b/lib/thread/condvar+linux.myr
@@ -2,6 +2,7 @@
 use sys
 
 use "atomic.use"
+use "common.use"
 use "mutex.use"
 
 pkg thread =
@@ -15,8 +16,6 @@
 	const condsignal	: (cond : cond# -> void)
 	const condbroadcast	: (cond : cond# -> void)
 ;;
-
-generic Zptr = 0 castto(@a#)
 
 const mkcond = {mtx
 	-> [._mtx = mtx, ._seq = 0]
--- /dev/null
+++ b/lib/thread/future+linux.myr
@@ -1,0 +1,48 @@
+use std
+use sys
+
+use "atomic.use"
+use "common.use"
+
+pkg thread =
+	type future(@a) = struct
+		_state	: int32
+		_val	: @a
+	;;
+
+	generic mkfut	: (-> future(@a))
+	generic futset	: (fut : future(@a)#, val : @a -> bool)
+	generic futget	: (fut : future(@a)# -> @a)
+	generic futtryget	: (fut : future(@a)# -> std.option(@a))
+;;
+
+const Clear = 0
+const Setting = 1
+const Set = 2
+
+generic mkfut = {
+	-> [._state = Clear ]
+}
+
+generic futset = {fut, val
+	/* If we don't get a clear, we failed to set the value */
+	if xcas(&fut._state, Clear, Setting) != Clear
+		-> false
+	;;
+	fut._val = val
+	fut._state = Set
+	sys.futex(&fut._state, sys.Futexwake | sys.Futexpriv, 0x7fffffff, Zptr, Zptr, 0)
+	-> true
+}
+
+generic futget = {fut
+	var st
+
+	/* we can transition from Set to Setting. */
+	st = fut._state
+	while st != Set
+		sys.futex(&fut._state, sys.Futexwait | sys.Futexpriv, st, Zptr, Zptr, 0)
+		st = fut._state
+	;;
+	-> fut._val
+}
--- /dev/null
+++ b/lib/thread/future.myr
@@ -1,0 +1,63 @@
+use std
+
+use "mutex.use"
+
+pkg thread =
+	type future(@a) = struct
+		mtx	: mutex
+		set	: bool
+		val	: @a
+	;;
+
+	generic mkfut	: (-> future(@a))
+	generic futset	: (fut : future(@a)#, val : @a -> bool)
+	generic futget	: (fut : future(@a)# -> @a)
+	generic futtryget	: (fut : future(@a)# -> std.option(@a))
+	generic futclear	: (fut : future(@a)# -> void)
+;;
+
+const Unset = 0
+const Waiting = 1
+const Set = 2
+
+generic mkfut = {
+	var fut
+
+	fut = [.mtx = mkmtx() ]
+	mtxlock(&fut.mtx)
+	-> fut
+}
+
+generic futset = {fut, val
+	if fut.set
+		-> false
+	;;
+	/* compiler doesn't reorder shit */
+	fut.val = val
+	fut.set = true
+	mtxunlock(&fut.mtx)
+	-> true
+}
+
+generic futtryget = {fut
+	var val
+
+	if !fut.set
+		-> `std.None
+	;;
+	mtxlock(&fut.mtx)
+	val = fut.val
+	mtxunlock(&fut.mtx)
+	-> `std.Some val
+}
+
+generic futget = {fut
+	var val
+
+	mtxlock(&fut.mtx)
+	val = fut.val
+	mtxunlock(&fut.mtx)
+	-> val
+}
+
+
--- a/lib/thread/mutex+linux.myr
+++ b/lib/thread/mutex+linux.myr
@@ -2,6 +2,7 @@
 use sys
 
 use "atomic.use"
+use "common.use"
 
 pkg thread =
 	type mutex = struct
@@ -18,7 +19,6 @@
 	pkglocal const Contended = 2
 ;;
 
-generic Zptr = 0 castto(@a#)
 var nspin = 10	/* FIXME: pick a sane number, based on CPU count */
 
 const mkmtx = {
--- /dev/null
+++ b/lib/thread/test/future.myr
@@ -1,0 +1,51 @@
+use std
+use sys
+use thread
+
+use "test/util.use"
+
+var fut
+var nready : int32
+var ndone : int32
+
+const main = {
+	nready = 0
+	ndone = 0
+	fut = thread.mkfut()
+	/* set after we have some waiters */
+	mkherd(100, getfuture)
+	while nready != 100
+		/* spin */
+	;;
+	std.put("done waiting for ready\n")
+	std.assert(ndone == 0, "thread proceeded too soon\n")
+	thread.futset(&fut, 666)
+	std.assert(thread.futset(&fut, 1) == false, "double set future\n")
+	while ndone != 100
+		std.put("ndone: {}\n", ndone)
+		/* spin */
+	;;
+	std.put("double set future ok")
+	/* start up a few more to make sure we can still read */
+	mkherd(50, getfuture)
+	while ndone != 150
+		/* spin */
+	;;
+
+	
+	/* set ahead of time */
+	ndone = 0
+	fut = thread.mkfut()
+	thread.futset(&fut, 666)
+	std.assert(thread.futset(&fut, 666) == false, "double set future\n")
+	mkherd(100, getfuture)
+	while ndone != 100
+		/* spin */
+	;;
+}
+
+const getfuture = {
+	thread.xadd(&nready, 1)
+	std.assert(thread.futget(&fut) == 666, "wrong value gotten from future")
+	thread.xadd(&ndone, 1)
+}