shithub: mc

Download patch

ref: 9892228031c0ffcfa95a1443fc565b031525bb3a
parent: e54af86927ced66fb7fa8ad744cfb609390f04d8
author: Ori Bernstein <[email protected]>
date: Tue Aug 28 21:29:07 EDT 2018

Implement futures + thread.do

--- a/lib/thread/atomic-impl+plan9-x64.s
+++ b/lib/thread/atomic-impl+plan9-x64.s
@@ -1,3 +1,7 @@
+// get variants
+TEXT thread$xget8+0(SB),1,$0
+	MOVB	(DI), AX
+	RET
 TEXT thread$xget32+0(SB),1,$0
 	MOVL	(DI), AX
 	RET
@@ -8,6 +12,10 @@
 	MOVQ	(DI), AX
 	RET
 
+// set variants
+TEXT thread$xset8+0(SB),1,$0
+	MOVB	SI, (DI)
+	RET
 TEXT thread$xset32+0(SB),1,$0
 	MOVL	SI, (DI)
 	RET
@@ -18,6 +26,11 @@
 	MOVQ	SI, (DI)
 	RET
 
+// add variants
+TEXT thread$xadd8+0(SB),1,$0
+	LOCK; XADDB	SI, (DI)
+	MOVL	SI, AX
+	RET
 TEXT thread$xadd32+0(SB),1,$0
 	LOCK; XADDL	SI, (DI)
 	MOVL	SI, AX
@@ -31,6 +44,11 @@
 	MOVQ	SI, AX
 	RET
 
+// cas variants
+TEXT thread$xcas8+0(SB),1,$0
+	MOVL	SI, AX
+	LOCK; CMPXCHGB	DX, (DI)
+	RET
 TEXT thread$xcas32+0(SB),1,$0
 	MOVL	SI, AX
 	LOCK; CMPXCHGL	DX, (DI)
@@ -44,6 +62,11 @@
 	LOCK; CMPXCHGQ	DX, (DI)
 	RET
 
+// xchg variants
+TEXT thread$xchg8+0(SB),1,$0
+	MOVL	SI, AX
+	LOCK; XCHGB	(DI), AX
+	RET
 TEXT thread$xchg32+0(SB),1,$0
 	MOVL	SI, AX
 	LOCK; XCHGL	(DI), AX
--- a/lib/thread/atomic-impl+x64.s
+++ b/lib/thread/atomic-impl+x64.s
@@ -1,3 +1,10 @@
+# get variants
+.globl thread$xget8
+.globl _thread$xget8
+thread$xget8:
+_thread$xget8:
+	movb	(%rdi), %al
+	ret
 .globl thread$xget32
 .globl _thread$xget32
 thread$xget32:
@@ -15,6 +22,13 @@
 	movq	(%rdi), %rax
 	ret
 
+# set variants
+.globl thread$xset8
+.globl _thread$xset8
+thread$xset8:
+_thread$xset8:
+	movl	%esi, (%rdi)
+	ret
 .globl thread$xset32
 .globl _thread$xset32
 thread$xset32:
@@ -32,6 +46,14 @@
 	movq	%rsi, (%rdi)
 	ret
 
+# add variants
+.globl thread$xadd8
+.globl _thread$xadd8
+thread$xadd8:
+_thread$xadd8:
+	lock xaddb	%sil, (%rdi)
+	movb %sil,%al
+	ret
 .globl thread$xadd32
 .globl _thread$xadd32
 thread$xadd32:
@@ -51,6 +73,14 @@
 	movq %rsi,%rax
 	ret
 
+# cas variants 
+.globl thread$xcas8
+.globl _thread$xcas8
+thread$xcas8:
+_thread$xcas8:
+	movb	%sil, %al
+	lock cmpxchgb	%dl, (%rdi)
+	ret
 .globl thread$xcas32
 .globl _thread$xcas32
 thread$xcas32:
@@ -66,10 +96,18 @@
 thread$xcasp:
 _thread$xcas64:
 _thread$xcasp:
-	movq	%rsi, %rax
+	movq		%rsi, %rax
 	lock cmpxchgq	%rdx, (%rdi)
 	ret
 
+# xchg variants
+.globl thread$xchg8
+.globl _thread$xchg8
+thread$xchg8:
+_thread$xchg8:
+	movb		%sil, %al
+	lock xchgb	(%rdi), %al
+	ret
 .globl thread$xchg32
 .globl _thread$xchg32
 thread$xchg32:
--- a/lib/thread/atomic.myr
+++ b/lib/thread/atomic.myr
@@ -10,6 +10,7 @@
 		xchg	: (p : @a#, new : @a -> @a)
 	;;
 
+	impl atomic bool
 	impl atomic int32
 	impl atomic int64
 	impl atomic uint32
@@ -20,25 +21,38 @@
 	generic xcasptr : (p : @a##, old : std.option(@a#), new : std.option(@a#) -> std.option(@a#))
 	generic xchgptr : (p : @a##, new : std.option(@a#) -> std.option(@a#))
 
+	pkglocal extern const xget8	: (p : uint8# -> uint8)
 	pkglocal extern const xget32	: (p : uint32# -> uint32)
 	pkglocal extern const xget64	: (p : uint64# -> uint64)
 	pkglocal extern const xgetp	: (p : std.intptr# -> std.intptr)
 
+	pkglocal extern const xset8	: (p : uint8#, v : uint8 -> void)
 	pkglocal extern const xset32	: (p : uint32#, v : uint32 -> void)
 	pkglocal extern const xset64	: (p : uint64#, v : uint64 -> void)
 	pkglocal extern const xsetp	: (p : std.intptr#, v : std.intptr -> void)
 
+	pkglocal extern const xadd8	: (p : uint8#, v : uint8 -> uint8)
 	pkglocal extern const xadd32	: (p : uint32#, v : uint32 -> uint32)
 	pkglocal extern const xadd64	: (p : uint64#, v : uint64 -> uint64)
 	pkglocal extern const xaddp	: (p : std.intptr#, v : std.intptr -> std.intptr)
 
+	pkglocal extern const xcas8	: (p : uint8#, old: uint8, new : uint8 -> uint8)
 	pkglocal extern const xcas32	: (p : uint32#, old: uint32, new : uint32 -> uint32)
 	pkglocal extern const xcas64	: (p : uint64#, old: uint64, new : uint64 -> uint64)
 	pkglocal extern const xcasp	: (p : std.intptr#, old: std.intptr, new : std.intptr -> std.intptr)
 
+	pkglocal extern const xchg8	: (p : uint8#, v : uint8 -> uint8)
 	pkglocal extern const xchg32	: (p : uint32#, v : uint32 -> uint32)
 	pkglocal extern const xchg64	: (p : uint64#, v : uint64 -> uint64)
 	pkglocal extern const xchgp	: (p : std.intptr#, v : std.intptr -> std.intptr)
+;;
+
+impl atomic bool =
+	xget	= {p; -> (xget8((p : uint8#)) : bool)}
+	xset	= {p, v; xset8((p : uint8#), (v : uint8))}
+	xadd	= {p, v; -> (xadd8((p : uint8#), (v : uint8)) : bool)}
+	xcas	= {p, old, new; -> (xcas8((p : uint8#), (old : uint8), (new : uint8)) : bool)}
+	xchg	= {p, v; -> (xchg8((p : uint8#), (v : uint8)) : bool)}
 ;;
 
 impl atomic int32 =
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -2,6 +2,10 @@
 	common.myr
 	hookstd.myr	# install thread hooks
 
+	# higher level apis
+	future.myr
+	do.myr
+
 	# generic fallbacks
 	condvar.myr
 	mutex.myr
--- /dev/null
+++ b/lib/thread/do.myr
@@ -1,0 +1,17 @@
+use std
+
+use "future"
+use "spawn"
+
+pkg thread =
+	generic do	: (fn : (-> @a) -> future(@a)#)
+;;
+
+generic do = {fn
+	var r
+
+	r = mkfut()
+	spawn({; futput(r, fn()) })
+	-> r
+}
+
--- a/lib/thread/future.myr
+++ b/lib/thread/future.myr
@@ -1,63 +1,55 @@
 use std
 
 use "mutex"
+use "condvar"
+use "atomic"
 
 pkg thread =
 	type future(@a) = struct
 		mtx	: mutex
+		cv	: cond
 		set	: bool
 		val	: @a
 	;;
 
-	generic mkfut	: (-> future(@a))
-	generic futset	: (fut : future(@a)#, val : @a -> bool)
+	generic mkfut	: (-> future(@a)#)
+	generic futput	: (fut : future(@a)#, val : @a -> void)
 	generic futget	: (fut : future(@a)# -> @a)
-	generic futtryget	: (fut : future(@a)# -> std.option(@a))
-	generic futclear	: (fut : future(@a)# -> void)
+	generic futpeek	: (fut : future(@a)# -> @a)
 ;;
 
-const Unset = 0
-const Waiting = 1
-const Set = 2
-
 generic mkfut = {
 	var fut
 
-	fut = [.mtx = mkmtx() ]
-	mtxlock(&fut.mtx)
+	fut = std.alloc()
+	fut.mtx = mkmtx()
+	fut.cv = mkcond(&fut.mtx)
+	fut.set = false
 	-> fut
 }
 
-generic futset = {fut, val
-	if fut.set
-		-> false
-	;;
-	/* compiler doesn't reorder shit */
+generic futput = {fut, val
 	fut.val = val
-	fut.set = true
-	mtxunlock(&fut.mtx)
-	-> true
+	xset(&fut.set, true)
+	condsignal(&fut.cv)
 }
 
-generic futtryget = {fut
-	var val
-
-	if !fut.set
-		-> `std.None
-	;;
-	mtxlock(&fut.mtx)
-	val = fut.val
-	mtxunlock(&fut.mtx)
-	-> `std.Some val
-}
-
 generic futget = {fut
 	var val
-
-	mtxlock(&fut.mtx)
-	val = fut.val
-	mtxunlock(&fut.mtx)
+	
+	val = futpeek(fut)
+	std.free(fut)
 	-> val
 }
 
+generic futpeek = {fut
+	if !xget(&fut.set)
+		mtxlock(&fut.mtx)
+		if !xget(&fut.set)
+			condwait(&fut.cv)
+		;;
+		mtxunlock(&fut.mtx)
+	;;
+	-> fut.val
+}
 
--- /dev/null
+++ b/lib/thread/test/do.myr
@@ -1,0 +1,30 @@
+use std
+use thread
+
+const main = {
+	match std.espork(["echo", "hello"][:])
+	| `std.Err e:
+		std.fatal("could not spork\n")
+	| `std.Ok (pid, in, out, err):
+		std.close(in)
+		var w = thread.do({;-> std.wait(pid)})
+		var o = thread.do({;-> std.fslurp(out)})
+		var e = thread.do({;-> std.fslurp(err)})
+
+		match thread.futget(w)
+		| `std.Wsuccess:	/* ok */
+		| bad:	std.fatal("bad wait: {}\n", bad)
+		;;
+
+		match thread.futget(o)
+		| `std.Ok "hello\n":	/* ok */
+		| bad:	std.fatal("bad out: {}\n", bad)
+		;;
+
+		match thread.futget(e)
+		| `std.Ok "":	/* ok */
+		| bad:	std.fatal("bad err: {}\n", bad)
+		;;
+	;;
+}
+
--- a/lib/thread/test/future.myr
+++ b/lib/thread/test/future.myr
@@ -2,49 +2,18 @@
 use sys
 use thread
 
-use "util"
-
-var fut
-var nready : int32
-var ndone : int32
-
 const main = {
-	nready = 0
-	ndone = 0
-	fut = thread.mkfut()
-	/* set after we have some waiters */
-	mkherd(100, getfuture)
-	while nready != 100
-		/* spin */
-	;;
-	std.put("done waiting for ready\n")
-	std.assert(ndone == 0, "thread proceeded too soon\n")
-	thread.futset(&fut, 666)
-	std.assert(thread.futset(&fut, 1) == false, "double set future\n")
-	while ndone != 100
-		/* spin */
-	;;
-	std.put("double set future ok")
-	/* start up a few more to make sure we can still read */
-	mkherd(50, getfuture)
-	while ndone != 150
-		/* spin */
-	;;
+	var f
 
-	
-	/* set ahead of time */
-	ndone = 0
-	fut = thread.mkfut()
-	thread.futset(&fut, 666)
-	std.assert(thread.futset(&fut, 666) == false, "double set future\n")
-	mkherd(100, getfuture)
-	while ndone != 100
-		/* spin */
-	;;
-}
+	f = thread.mkfut()
+	thread.futput(f, 123)
+	std.assert(thread.futget(f) == 123, "the future is broken\n")
 
-const getfuture = {
-	thread.xadd(&nready, 1)
-	std.assert(thread.futget(&fut) == 666, "wrong value gotten from future")
-	thread.xadd(&ndone, 1)
+	f = thread.mkfut()
+	thread.spawn({
+		std.usleep(10_000)
+		thread.futput(f, 321)
+	})
+	std.assert(thread.futget(f) == 321, "the future is broken\n")
 }
+