shithub: mq

Download patch

ref: 111d42550f50f79b9b357d130b8bcdbafc5165ee
parent: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
author: Ori Bernstein <[email protected]>
date: Sat Dec 31 13:31:05 EST 2022

mq: fix subscription ids.

--- a/mq.c
+++ b/mq.c
@@ -5,6 +5,7 @@
 #include <9p.h>
 
 typedef struct Mq Mq;
+typedef struct Rd Rd;
 typedef struct Msg Msg;
 typedef struct Aux Aux;
 
@@ -21,9 +22,18 @@
 	Ref;
 	Msg	*next;
 	int	count;
-	char	data[];
+	char	*data;
+	char	buf[];
 };
 
+struct Rd {
+	int	id;
+	int	off;
+	Msg	*hd;
+	Msg	*tl;
+	Req	*wait;
+};
+
 struct Mq {
 	Qid	qid;
 	int	count;
@@ -34,10 +44,7 @@
 	Msg	*tl;
 
 	int	nrd;
-	int	*rd;
-	Msg	**rhd;
-	Msg	**rtl;
-	Req	**wait;
+	Rd	*rd;
 
 	char	*name;
 	char	*user;
@@ -125,21 +132,28 @@
 subscribe(Mq *q)
 {
 	Msg *m;
+	Rd *rd;
 	int i;
 
-	for(i = 0; i < q->nrd; i++)
-		if(q->rd[i] != -1)
-			return i;
-	q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
-	q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
-	q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
-	q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
-	q->wait[q->nrd] = nil;
-	q->rhd[q->nrd] = q->loghd;
-	q->rtl[q->nrd] = q->logtl;
+	rd = nil;
+	for(i = 0; i < q->nrd; i++){
+		if(q->rd[i].id == -1){
+			rd = &q->rd[i];
+			break;
+		}
+	}
+	if(rd == nil){
+		q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd));
+		rd = &q->rd[q->nrd - 1];
+	}
+	rd->id = i;
+	rd->wait = nil;
+	rd->off = 0;
+	rd->hd = q->loghd;
+	rd->tl = q->logtl;
 	for(m = q->loghd; m != nil; m = m->next)
 		msgref(m);
-	return q->nrd++;
+	return rd->id;
 }
 
 Mq*
@@ -243,10 +257,10 @@
 	Aux *a;
 
 	if((a = r->oldreq->fid->aux) != nil){
-		w = a->q->wait[a->id];
+		w = a->q->rd[a->id].wait;
 		if(w != nil)
 			respond(w, "interrupted");
-		a->q->wait[a->id] = nil;
+		a->q->rd[a->id].wait = nil;
 	}
 	respond(r, nil);
 }
@@ -273,24 +287,32 @@
 	}
 	q = a->q;
 	m = emalloc(sizeof(Msg) + r->ifcall.count);
+	m->data = m->buf;
 	m->count = r->ifcall.count;
-	memcpy(m->data, r->ifcall.data, m->count);
+	memmove(m->data, r->ifcall.data, m->count);
 	m->next = nil;
 	for(i = 0; i < q->nrd; i++){
-		rr = q->wait[i];
+		if(q->rd[i].id == -1)
+			continue;
+		rr = q->rd[i].wait;
+		q->rd[i].wait = nil;
 		if(rr != nil){
-			rr->ofcall.data = r->ifcall.data;
-			rr->ofcall.count = r->ifcall.count;
+			rr->ofcall.data = m->data;
+			rr->ofcall.count = m->count;
+			if(rr->ifcall.count > m->count)
+				rr->ofcall.count = m->count;
 			respond(rr, nil);
-			q->wait[i] = nil;
-		}else{
-			if(q->rhd[i] == nil)
-				q->rhd[i] = m;
-			if(q->rtl[i] != nil)
-				q->rtl[i]->next = m;
-			q->rtl[i] = m;
-			msgref(m);
+			if(rr->ofcall.count == m->count)
+				continue;
+			m->count -= rr->ofcall.count;
+			m->data += rr->ofcall.count;
 		}
+		if(q->rd[i].hd == nil)
+			q->rd[i].hd = m;
+		if(q->rd[i].tl != nil)
+			q->rd[i].tl->next = m;
+		q->rd[i].tl = m;
+		msgref(m);
 	}
 	if(q->loghd == nil)
 		q->loghd = m;
@@ -311,6 +333,7 @@
 {
 	Aux *a;
 	Msg *m;
+	Rd *rd;
 	Mq *q;
 
 	if(r->fid->qid.path == Qroot){
@@ -323,18 +346,32 @@
 		return;
 	}
 	q = a->q;
-	if(q->rhd[a->id] != nil){
-		m = q->rhd[a->id];
-		r->ofcall.data = m->data;
+
+	/* no messages: enqueue until next one comes */
+	if(q->rd[a->id].hd == nil){
+		q->rd[a->id].wait = r;
+		return;
+	}
+
+	/* queued messages: pop data off */
+	rd = &q->rd[a->id];
+	m = rd->hd;
+	r->ofcall.data = m->data + rd->off;
+	r->ofcall.count = r->ifcall.count + rd->off;
+	if(r->ofcall.count > m->count)
 		r->ofcall.count = m->count;
-		respond(r, nil);
-		q->rhd[a->id] = m->next;
-		if(q->rhd[a->id] == nil)
-			q->rtl[a->id] = nil;
+	respond(r, nil);
+
+	/* adjust offsets */
+	if(m->count > r->ifcall.count)
+		rd->off += r->ifcall.count;
+	else{
+		rd->off = 0;
+		rd->hd = m->next;
+		if(rd->hd == nil)
+			rd->tl = nil;
 		msgunref(m);
-		return;
 	}
-	q->wait[a->id] = r;
 }
 
 void
@@ -403,7 +440,7 @@
 	if(m != OREAD && m != ORDWR && m != OEXEC)
 		return;
 	if(a != nil)
-		a->q->rd[a->id] = -1;
+		a->q->rd[a->id].id = -1;
 }
 
 void
@@ -441,7 +478,7 @@
 {
 	char *srvname, *mntpt;
 
-	srvname = nil;
+	srvname = "mq";
 	mntpt = "/mnt/mq";
 	ARGBEGIN{
 	case 'd':