ref: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
author: Ori Bernstein <[email protected]>
date: Tue Dec 6 18:28:07 EST 2022
initial commit
--- /dev/null
+++ b/mkfile
@@ -1,0 +1,6 @@
+</$objtype/mkfile
+
+TARG=mq
+OFILES=mq.$O
+
+</sys/src/cmd/mkone
--- /dev/null
+++ b/mq.c
@@ -1,0 +1,464 @@
+#include <u.h>
+#include <libc.h>
+#include <fcall.h>
+#include <thread.h>
+#include <9p.h>
+
+typedef struct Mq Mq;
+typedef struct Msg Msg;
+typedef struct Aux Aux;
+
+enum {
+ Qroot,
+};
+
+struct Aux {
+ Mq *q;
+ int id;
+};
+
+struct Msg {
+ Ref;
+ Msg *next;
+ int count;
+ char data[];
+};
+
+struct Mq {
+ Qid qid;
+ int count;
+ usize logsz;
+ Msg *loghd;
+ Msg *logtl;
+ Msg *hd;
+ Msg *tl;
+
+ int nrd;
+ int *rd;
+ Msg **rhd;
+ Msg **rtl;
+ Req **wait;
+
+ char *name;
+ char *user;
+ char *group;
+ int mode;
+};
+
+Mq **queues;
+int nqueues;
+vlong maxlog = -1;
+vlong queueid = Qroot + 1;
+
+char Ebaduse[] = "invalid use of fd";
+char Einuse[] = "fid in use";
+char Eexist[] = "file already exists";
+char Enoexist[] = "file does not exists";
+
+void *
+emalloc(ulong n)
+{
+ void *v;
+
+ v = mallocz(n, 1);
+ if(v == nil)
+ sysfatal("malloc: %r");
+ setmalloctag(v, getcallerpc(&n));
+ return v;
+}
+
+void *
+erealloc(void *p, ulong n)
+{
+ void *v;
+
+ v = realloc(p, n);
+ if(v == nil)
+ sysfatal("realloc: %r");
+ setmalloctag(v, getcallerpc(&p));
+ return v;
+}
+
+char*
+estrdup(char *s)
+{
+ s = strdup(s);
+ if(s == nil)
+ sysfatal("strdup: %r");
+ setmalloctag(s, getcallerpc(&s));
+ return s;
+}
+
+Msg*
+msgref(Msg *m)
+{
+ incref(m);
+ return m;
+}
+
+void
+msgunref(Msg *m)
+{
+ if(decref(m) == 0)
+ free(m);
+}
+
+void
+trimlog(Mq *q)
+{
+ Msg *m, *n;
+
+ if(maxlog < 0)
+ return;
+ n = nil;
+ for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){
+ n = m->next;
+ q->logsz -= m->count;
+ msgunref(m);
+ }
+ q->loghd = n;
+ if(m == nil)
+ q->logtl = nil;
+}
+
+int
+subscribe(Mq *q)
+{
+ Msg *m;
+ int i;
+
+ for(i = 0; i < q->nrd; i++)
+ if(q->rd[i] != -1)
+ return i;
+ q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
+ q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
+ q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
+ q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
+ q->wait[q->nrd] = nil;
+ q->rhd[q->nrd] = q->loghd;
+ q->rtl[q->nrd] = q->logtl;
+ for(m = q->loghd; m != nil; m = m->next)
+ msgref(m);
+ return q->nrd++;
+}
+
+Mq*
+lookup(char *name)
+{
+ int i;
+
+ for(i = 0; i < nqueues; i++)
+ if(strcmp(queues[i]->name, name) == 0)
+ return queues[i];
+ return nil;
+}
+
+void
+qstat(Dir *d, Mq *q)
+{
+ d->name = estrdup9p(q->name);
+ d->uid = estrdup9p("glenda");
+ d->gid = estrdup9p("glenda");
+ d->muid = estrdup9p("glenda");
+ d->qid = q->qid;
+ d->mtime = 0;
+ d->atime = 0;
+ d->mode = q->mode;
+}
+
+int
+rootgen(int i, Dir *d, void *)
+{
+ if(i >= nqueues)
+ return -1;
+ qstat(d, queues[i]);
+ return 0;
+}
+
+
+char*
+mqclone(Fid *old, Fid *new)
+{
+ Aux *o, *n;
+
+ o = old->aux;
+ if(o != nil){
+ n = emalloc(sizeof(Aux));
+ n->q = o->q;
+ n->id = subscribe(o->q);
+ new->aux = n;
+ }
+ return nil;
+}
+
+char*
+mqwalk1(Fid *f, char *name, Qid *qid)
+{
+ Mq *q;
+
+ switch(f->qid.path){
+ case Qroot:
+ if(strcmp(name, "..") == 0){
+ *qid = f->qid;
+ return nil;
+ }
+ if((q = lookup(name)) == nil)
+ return Enoexist;
+ f->qid = q->qid;
+ *qid = f->qid;
+ return nil;
+ default:
+ if(strcmp(name, "..") == 0){
+ f->qid = (Qid){Qroot, 0, QTDIR};
+ *qid = f->qid;
+ return nil;
+ }
+ return "not a dir";
+ }
+}
+
+void
+mqstat(Req *r)
+{
+ switch(r->fid->qid.path){
+ case Qroot:
+ r->d.uid = estrdup9p("glenda");
+ r->d.gid = estrdup9p("glenda");
+ r->d.muid = estrdup9p("glenda");
+ r->d.qid = r->fid->qid;
+ r->d.mtime = 0;
+ r->d.atime = 0;
+ r->d.mode = 0755;
+ break;
+ default:
+ qstat(&r->d, ((Aux*)r->fid->aux)->q);
+ }
+ respond(r, nil);
+}
+
+void
+mqflush(Req *r)
+{
+ Req *w;
+ Aux *a;
+
+ if((a = r->oldreq->fid->aux) != nil){
+ w = a->q->wait[a->id];
+ if(w != nil)
+ respond(w, "interrupted");
+ a->q->wait[a->id] = nil;
+ }
+ respond(r, nil);
+}
+
+void
+mqremove(Req *r)
+{
+ USED(r);
+ abort();
+}
+
+void
+mqwrite(Req *r)
+{
+ Msg *m;
+ Req *rr;
+ Aux *a;
+ Mq *q;
+ int i;
+
+ if((a = r->fid->aux) == nil){
+ respond(r, Ebaduse);
+ return;
+ }
+ q = a->q;
+ m = emalloc(sizeof(Msg) + r->ifcall.count);
+ m->count = r->ifcall.count;
+ memcpy(m->data, r->ifcall.data, m->count);
+ m->next = nil;
+ for(i = 0; i < q->nrd; i++){
+ rr = q->wait[i];
+ if(rr != nil){
+ rr->ofcall.data = r->ifcall.data;
+ rr->ofcall.count = r->ifcall.count;
+ respond(rr, nil);
+ q->wait[i] = nil;
+ }else{
+ if(q->rhd[i] == nil)
+ q->rhd[i] = m;
+ if(q->rtl[i] != nil)
+ q->rtl[i]->next = m;
+ q->rtl[i] = m;
+ msgref(m);
+ }
+ }
+ if(q->loghd == nil)
+ q->loghd = m;
+ if(q->logtl != nil)
+ q->logtl->next = m;
+ q->logtl = msgref(m);
+ q->logsz += m->count;
+ q->logtl = m;
+
+ trimlog(q);
+ q->qid.vers++;
+ r->ofcall.count = r->ifcall.count;
+ respond(r, nil);
+}
+
+void
+mqread(Req *r)
+{
+ Aux *a;
+ Msg *m;
+ Mq *q;
+
+ if(r->fid->qid.path == Qroot){
+ dirread9p(r, rootgen, nil);
+ respond(r, nil);
+ return;
+ }
+ if((a = r->fid->aux) == nil){
+ respond(r, Ebaduse);
+ return;
+ }
+ q = a->q;
+ if(q->rhd[a->id] != nil){
+ m = q->rhd[a->id];
+ r->ofcall.data = m->data;
+ r->ofcall.count = m->count;
+ respond(r, nil);
+ q->rhd[a->id] = m->next;
+ if(q->rhd[a->id] == nil)
+ q->rtl[a->id] = nil;
+ msgunref(m);
+ return;
+ }
+ q->wait[a->id] = r;
+}
+
+void
+mqcreate(Req *r)
+{
+ Aux *a;
+ Mq *q;
+ int m;
+
+ if(lookup(r->ifcall.name) != nil){
+ respond(r, Eexist);
+ return;
+ }
+ m = r->ifcall.mode & OMASK;
+ q = emalloc(sizeof(Mq));
+ q->name = estrdup(r->ifcall.name);
+ q->mode = r->ifcall.mode;
+ q->qid.path = queueid++;
+ q->qid.vers = 0;
+ q->qid.type = QTFILE;
+ a = emalloc(sizeof(Aux));
+ a->q = q;
+ if(m == OREAD || m == ORDWR || m == OEXEC)
+ a->id = subscribe(q);
+ r->ofcall.qid = q->qid;
+ r->fid->qid = q->qid;
+ r->fid->aux = a;
+ queues = erealloc(queues, ++nqueues*sizeof(Mq*));
+ queues[nqueues-1] = q;
+
+ respond(r, nil);
+}
+
+void
+mqopen(Req *r)
+{
+ Aux *a;
+ vlong p;
+ int m;
+
+ if(r->fid->aux != nil){
+ respond(r, Einuse);
+ return;
+ }
+ m = r->ifcall.mode & OMASK;
+ p = r->fid->qid.path;
+ a = emalloc(sizeof(Aux));
+ if(p != Qroot){
+ a->q = queues[p-1];
+ if(m == OREAD || m == ORDWR || m == OMASK)
+ a->id = subscribe(a->q);
+ r->fid->aux = a;
+ }
+ r->ofcall.qid = r->fid->qid;
+ respond(r, nil);
+}
+
+void
+destroyfid(Fid *f)
+{
+ Aux *a;
+ int m;
+
+ a = f->aux;
+ m = f->omode & OMASK;
+ if(m != OREAD && m != ORDWR && m != OEXEC)
+ return;
+ if(a != nil)
+ a->q->rd[a->id] = -1;
+}
+
+void
+mqattach(Req *r)
+{
+ r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
+ r->fid->qid = r->ofcall.qid;
+ r->fid->aux = nil;
+ respond(r, nil);
+}
+
+Srv mq = {
+ .attach=mqattach,
+ .open=mqopen,
+ .create=mqcreate,
+ .read=mqread,
+ .write=mqwrite,
+ .remove=mqremove,
+ .flush=mqflush,
+ .stat=mqstat,
+ .walk1=mqwalk1,
+ .clone=mqclone,
+ .destroyfid=destroyfid,
+};
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
+ exits("usage");
+}
+
+void
+main(int argc, char **argv)
+{
+ char *srvname, *mntpt;
+
+ srvname = nil;
+ mntpt = "/mnt/mq";
+ ARGBEGIN{
+ case 'd':
+ chatty9p++;
+ break;
+ case 's':
+ srvname=EARGF(usage());
+ break;
+ case 'm':
+ mntpt = EARGF(usage());
+ break;
+ case 'r':
+ maxlog = atoi(EARGF(usage()));
+ break;
+ default:
+ usage();
+ }ARGEND;
+
+ postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
+}