ref: f0874a14c3685a08e98c3fcb1ebd81514b1e3ded
parent: 2d6c8ab83106892e1215dd0c94d9f24d6525a3fc
author: Ori Bernstein <[email protected]>
date: Fri Apr 12 16:02:06 EDT 2024
mq: coalesce messages, add tailing attach spec
--- a/mq.c
+++ b/mq.c
@@ -23,6 +23,7 @@
struct Aux {
Mq *q;
int id;
+ int ntail;
};
struct Msg {
@@ -64,6 +65,7 @@
int nqueues;
vlong maxlog = -1;
vlong queueid = 0;
+int coalesce;
char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
@@ -131,10 +133,11 @@
}
int
-subscribe(Mq *q)
+subscribe(Mq *q, vlong ntail)
{
Msg *m;
Rd *rd;
+ vlong sz;
int i;
rd = nil;
@@ -151,10 +154,17 @@
rd->id = i;
rd->wait = nil;
rd->off = 0;
- rd->hd = q->loghd;
- rd->tl = q->logtl;
- for(m = q->loghd; m != nil; m = m->next)
+ rd->hd = nil;
+ rd->tl = nil;
+ sz = q->logsz;
+ m = q->loghd;
+ if(ntail != -1)
+ for(; m != nil && sz > ntail; m = m->next)
+ sz -= m->count;
+ rd->hd = m;
+ for(; m != nil; m = m->next)
incref(m);
+ rd->tl = m;
return rd->id;
}
@@ -170,7 +180,7 @@
}
void
-qstat(Dir *d, Mq *q)
+qstat(Dir *d, Mq *q, Aux *a)
{
d->name = estrdup9p(q->name);
d->uid = estrdup9p("glenda");
@@ -180,14 +190,17 @@
d->mtime = 0;
d->atime = 0;
d->mode = q->mode;
+ d->length = q->logsz;
+ if(a->ntail < d->length)
+ d->length = a->ntail;
}
int
-rootgen(int i, Dir *d, void *)
+rootgen(int i, Dir *d, void *a)
{
if(i >= nqueues)
return -1;
- qstat(d, queues[i]);
+ qstat(d, queues[i], a);
return 0;
}
@@ -198,12 +211,13 @@
Aux *o, *n;
o = old->aux;
- if(o != nil){
- n = emalloc(sizeof(Aux));
+ n = emalloc(sizeof(Aux));
+ if(o->q != nil){
n->q = o->q;
- n->id = subscribe(o->q);
- new->aux = n;
+ n->id = subscribe(o->q, o->ntail);
}
+ n->ntail = o->ntail;
+ new->aux = n;
return nil;
}
@@ -252,7 +266,7 @@
default:
r->d.mode = 0644;
incref(queues[QIDX(p)]);
- qstat(&r->d, queues[QIDX(p)]);
+ qstat(&r->d, queues[QIDX(p)], r->fid->aux);
decref(queues[QIDX(p)]);
}
respond(r, nil);
@@ -356,13 +370,15 @@
void
mqread(Req *r)
{
+ char *p, *e, *b;
Aux *a;
Msg *m;
Rd *rd;
Mq *q;
+ int n;
if(QTYPE(r->fid->qid.path) == Qroot){
- dirread9p(r, rootgen, nil);
+ dirread9p(r, rootgen, r->fid->aux);
respond(r, nil);
return;
}
@@ -381,23 +397,34 @@
/* queued messages: pop data off */
rd = &q->rd[a->id];
m = rd->hd;
- r->ofcall.data = m->data + rd->off;
- r->ofcall.count = r->ifcall.count + rd->off;
- if(r->ofcall.count > m->count)
- r->ofcall.count = m->count;
- respond(r, nil);
-
- /* adjust offsets */
- if(m->count > r->ifcall.count)
- rd->off += r->ifcall.count;
- else{
- rd->off = 0;
+ p = emalloc(r->ifcall.count);
+ e = p + r->ifcall.count;
+ r->ofcall.data = p;
+ while(1){
+ assert(rd->off >= 0 && rd->off < m->count);
+ b = m->data + rd->off;
+ if(e - p >= m->count - rd->off){
+ n = m->count - rd->off;
+ rd->hd = m->next;
+ rd->off = 0;
+ if(rd->hd == nil)
+ rd->tl = nil;
+ if(decref(m) == 0)
+ free(m);
+ }else{
+ n = e - p;
+ rd->off += (e - p);
+ }
+ memcpy(p, b, n);
+ p += n;
rd->hd = m->next;
- if(rd->hd == nil)
- rd->tl = nil;
- if(decref(m) == 0)
- free(m);
+ m = rd->hd;
+ if(!coalesce || m == nil || e - p < m->count)
+ break;
+
}
+ r->ofcall.count = p - r->ofcall.data;
+ respond(r, nil);
}
void
@@ -420,10 +447,11 @@
q->qid.type = QTFILE;
queueid++;
- a = emalloc(sizeof(Aux));
+ a = r->fid->aux;
+ assert(a->q == nil);
a->q = q;
if(m == OREAD || m == ORDWR || m == OEXEC)
- a->id = subscribe(q);
+ a->id = subscribe(q, a->ntail);
r->ofcall.qid = q->qid;
r->fid->qid = q->qid;
r->fid->aux = a;
@@ -440,19 +468,18 @@
vlong p;
int m;
- if(r->fid->aux != nil){
+ a = r->fid->aux;
+ if(a->q != nil){
respond(r, Einuse);
return;
}
m = r->ifcall.mode & OMASK;
p = r->fid->qid.path;
- a = emalloc(sizeof(Aux));
if(QTYPE(p) != Qroot){
incref(queues[QIDX(p)]);
a->q = queues[QIDX(p)];
if(m == OREAD || m == ORDWR || m == OMASK)
- a->id = subscribe(a->q);
- r->fid->aux = a;
+ a->id = subscribe(a->q, a->ntail);
}
r->ofcall.qid = r->fid->qid;
respond(r, nil);
@@ -468,7 +495,7 @@
m = f->omode & OMASK;
if(m != OREAD && m != ORDWR && m != OEXEC)
return;
- if(a != nil)
+ if(a != nil && a->q != nil)
a->q->rd[a->id].id = -1;
free(a);
}
@@ -476,9 +503,26 @@
void
mqattach(Req *r)
{
+ Aux *a;
+ char *n, *e;
+
+ n = r->ifcall.aname;
+ a = emalloc(sizeof(Aux));
r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
r->fid->qid = r->ofcall.qid;
- r->fid->aux = nil;
+ r->fid->aux = a;
+ a->ntail = -1;
+ if(n != nil && strncmp(n, "tail:", 5) == 0){
+ a->ntail = strtol(n+5, &e, 0);
+ while(*e){
+ switch(*e++){
+ case 'g': a->ntail *= 1024*1024*1024;
+ case 'm': a->ntail *= 1024*1024;
+ case 'k': a->ntail *= 1024;
+ default: respond(r, "bad scale");
+ }
+ }
+ }
respond(r, nil);
}
@@ -534,6 +578,9 @@
default: sysfatal("unknown suffix %c", *e);
}
}
+ break;
+ case 'c':
+ coalesce = 1;
break;
default:
usage();