ref: 111d42550f50f79b9b357d130b8bcdbafc5165ee
parent: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
author: Ori Bernstein <[email protected]>
date: Sat Dec 31 13:31:05 EST 2022
mq: fix subscription ids.
--- a/mq.c
+++ b/mq.c
@@ -5,6 +5,7 @@
#include <9p.h>
typedef struct Mq Mq;
+typedef struct Rd Rd;
typedef struct Msg Msg;
typedef struct Aux Aux;
@@ -21,9 +22,18 @@
Ref;
Msg *next;
int count;
- char data[];
+ char *data;
+ char buf[];
};
+struct Rd {
+ int id;
+ int off;
+ Msg *hd;
+ Msg *tl;
+ Req *wait;
+};
+
struct Mq {
Qid qid;
int count;
@@ -34,10 +44,7 @@
Msg *tl;
int nrd;
- int *rd;
- Msg **rhd;
- Msg **rtl;
- Req **wait;
+ Rd *rd;
char *name;
char *user;
@@ -125,21 +132,28 @@
subscribe(Mq *q)
{
Msg *m;
+ Rd *rd;
int i;
- for(i = 0; i < q->nrd; i++)
- if(q->rd[i] != -1)
- return i;
- q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
- q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
- q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
- q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
- q->wait[q->nrd] = nil;
- q->rhd[q->nrd] = q->loghd;
- q->rtl[q->nrd] = q->logtl;
+ rd = nil;
+ for(i = 0; i < q->nrd; i++){
+ if(q->rd[i].id == -1){
+ rd = &q->rd[i];
+ break;
+ }
+ }
+ if(rd == nil){
+ q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd));
+ rd = &q->rd[q->nrd - 1];
+ }
+ rd->id = i;
+ rd->wait = nil;
+ rd->off = 0;
+ rd->hd = q->loghd;
+ rd->tl = q->logtl;
for(m = q->loghd; m != nil; m = m->next)
msgref(m);
- return q->nrd++;
+ return rd->id;
}
Mq*
@@ -243,10 +257,10 @@
Aux *a;
if((a = r->oldreq->fid->aux) != nil){
- w = a->q->wait[a->id];
+ w = a->q->rd[a->id].wait;
if(w != nil)
respond(w, "interrupted");
- a->q->wait[a->id] = nil;
+ a->q->rd[a->id].wait = nil;
}
respond(r, nil);
}
@@ -273,24 +287,32 @@
}
q = a->q;
m = emalloc(sizeof(Msg) + r->ifcall.count);
+ m->data = m->buf;
m->count = r->ifcall.count;
- memcpy(m->data, r->ifcall.data, m->count);
+ memmove(m->data, r->ifcall.data, m->count);
m->next = nil;
for(i = 0; i < q->nrd; i++){
- rr = q->wait[i];
+ if(q->rd[i].id == -1)
+ continue;
+ rr = q->rd[i].wait;
+ q->rd[i].wait = nil;
if(rr != nil){
- rr->ofcall.data = r->ifcall.data;
- rr->ofcall.count = r->ifcall.count;
+ rr->ofcall.data = m->data;
+ rr->ofcall.count = m->count;
+ if(rr->ifcall.count > m->count)
+ rr->ofcall.count = m->count;
respond(rr, nil);
- q->wait[i] = nil;
- }else{
- if(q->rhd[i] == nil)
- q->rhd[i] = m;
- if(q->rtl[i] != nil)
- q->rtl[i]->next = m;
- q->rtl[i] = m;
- msgref(m);
+ if(rr->ofcall.count == m->count)
+ continue;
+ m->count -= rr->ofcall.count;
+ m->data += rr->ofcall.count;
}
+ if(q->rd[i].hd == nil)
+ q->rd[i].hd = m;
+ if(q->rd[i].tl != nil)
+ q->rd[i].tl->next = m;
+ q->rd[i].tl = m;
+ msgref(m);
}
if(q->loghd == nil)
q->loghd = m;
@@ -311,6 +333,7 @@
{
Aux *a;
Msg *m;
+ Rd *rd;
Mq *q;
if(r->fid->qid.path == Qroot){
@@ -323,18 +346,32 @@
return;
}
q = a->q;
- if(q->rhd[a->id] != nil){
- m = q->rhd[a->id];
- r->ofcall.data = m->data;
+
+ /* no messages: enqueue until next one comes */
+ if(q->rd[a->id].hd == nil){
+ q->rd[a->id].wait = r;
+ return;
+ }
+
+ /* queued messages: pop data off */
+ rd = &q->rd[a->id];
+ m = rd->hd;
+ r->ofcall.data = m->data + rd->off;
+ r->ofcall.count = r->ifcall.count + rd->off;
+ if(r->ofcall.count > m->count)
r->ofcall.count = m->count;
- respond(r, nil);
- q->rhd[a->id] = m->next;
- if(q->rhd[a->id] == nil)
- q->rtl[a->id] = nil;
+ respond(r, nil);
+
+ /* adjust offsets */
+ if(m->count > r->ifcall.count)
+ rd->off += r->ifcall.count;
+ else{
+ rd->off = 0;
+ rd->hd = m->next;
+ if(rd->hd == nil)
+ rd->tl = nil;
msgunref(m);
- return;
}
- q->wait[a->id] = r;
}
void
@@ -403,7 +440,7 @@
if(m != OREAD && m != ORDWR && m != OEXEC)
return;
if(a != nil)
- a->q->rd[a->id] = -1;
+ a->q->rd[a->id].id = -1;
}
void
@@ -441,7 +478,7 @@
{
char *srvname, *mntpt;
- srvname = nil;
+ srvname = "mq";
mntpt = "/mnt/mq";
ARGBEGIN{
case 'd':