ref: 111d42550f50f79b9b357d130b8bcdbafc5165ee
dir: /mq.c/
#include <u.h> #include <libc.h> #include <fcall.h> #include <thread.h> #include <9p.h> typedef struct Mq Mq; typedef struct Rd Rd; typedef struct Msg Msg; typedef struct Aux Aux; enum { Qroot, }; struct Aux { Mq *q; int id; }; struct Msg { Ref; Msg *next; int count; char *data; char buf[]; }; struct Rd { int id; int off; Msg *hd; Msg *tl; Req *wait; }; struct Mq { Qid qid; int count; usize logsz; Msg *loghd; Msg *logtl; Msg *hd; Msg *tl; int nrd; Rd *rd; char *name; char *user; char *group; int mode; }; Mq **queues; int nqueues; vlong maxlog = -1; vlong queueid = Qroot + 1; char Ebaduse[] = "invalid use of fd"; char Einuse[] = "fid in use"; char Eexist[] = "file already exists"; char Enoexist[] = "file does not exists"; void * emalloc(ulong n) { void *v; v = mallocz(n, 1); if(v == nil) sysfatal("malloc: %r"); setmalloctag(v, getcallerpc(&n)); return v; } void * erealloc(void *p, ulong n) { void *v; v = realloc(p, n); if(v == nil) sysfatal("realloc: %r"); setmalloctag(v, getcallerpc(&p)); return v; } char* estrdup(char *s) { s = strdup(s); if(s == nil) sysfatal("strdup: %r"); setmalloctag(s, getcallerpc(&s)); return s; } Msg* msgref(Msg *m) { incref(m); return m; } void msgunref(Msg *m) { if(decref(m) == 0) free(m); } void trimlog(Mq *q) { Msg *m, *n; if(maxlog < 0) return; n = nil; for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){ n = m->next; q->logsz -= m->count; msgunref(m); } q->loghd = n; if(m == nil) q->logtl = nil; } int subscribe(Mq *q) { Msg *m; Rd *rd; int i; rd = nil; for(i = 0; i < q->nrd; i++){ if(q->rd[i].id == -1){ rd = &q->rd[i]; break; } } if(rd == nil){ q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd)); rd = &q->rd[q->nrd - 1]; } rd->id = i; rd->wait = nil; rd->off = 0; rd->hd = q->loghd; rd->tl = q->logtl; for(m = q->loghd; m != nil; m = m->next) msgref(m); return rd->id; } Mq* lookup(char *name) { int i; for(i = 0; i < nqueues; i++) if(strcmp(queues[i]->name, name) == 0) return queues[i]; return nil; } void qstat(Dir *d, Mq *q) { d->name = estrdup9p(q->name); d->uid = estrdup9p("glenda"); d->gid = estrdup9p("glenda"); d->muid = estrdup9p("glenda"); d->qid = q->qid; d->mtime = 0; d->atime = 0; d->mode = q->mode; } int rootgen(int i, Dir *d, void *) { if(i >= nqueues) return -1; qstat(d, queues[i]); return 0; } char* mqclone(Fid *old, Fid *new) { Aux *o, *n; o = old->aux; if(o != nil){ n = emalloc(sizeof(Aux)); n->q = o->q; n->id = subscribe(o->q); new->aux = n; } return nil; } char* mqwalk1(Fid *f, char *name, Qid *qid) { Mq *q; switch(f->qid.path){ case Qroot: if(strcmp(name, "..") == 0){ *qid = f->qid; return nil; } if((q = lookup(name)) == nil) return Enoexist; f->qid = q->qid; *qid = f->qid; return nil; default: if(strcmp(name, "..") == 0){ f->qid = (Qid){Qroot, 0, QTDIR}; *qid = f->qid; return nil; } return "not a dir"; } } void mqstat(Req *r) { switch(r->fid->qid.path){ case Qroot: r->d.uid = estrdup9p("glenda"); r->d.gid = estrdup9p("glenda"); r->d.muid = estrdup9p("glenda"); r->d.qid = r->fid->qid; r->d.mtime = 0; r->d.atime = 0; r->d.mode = 0755; break; default: qstat(&r->d, ((Aux*)r->fid->aux)->q); } respond(r, nil); } void mqflush(Req *r) { Req *w; Aux *a; if((a = r->oldreq->fid->aux) != nil){ w = a->q->rd[a->id].wait; if(w != nil) respond(w, "interrupted"); a->q->rd[a->id].wait = nil; } respond(r, nil); } void mqremove(Req *r) { USED(r); abort(); } void mqwrite(Req *r) { Msg *m; Req *rr; Aux *a; Mq *q; int i; if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; m = emalloc(sizeof(Msg) + r->ifcall.count); m->data = m->buf; m->count = r->ifcall.count; memmove(m->data, r->ifcall.data, m->count); m->next = nil; for(i = 0; i < q->nrd; i++){ if(q->rd[i].id == -1) continue; rr = q->rd[i].wait; q->rd[i].wait = nil; if(rr != nil){ rr->ofcall.data = m->data; rr->ofcall.count = m->count; if(rr->ifcall.count > m->count) rr->ofcall.count = m->count; respond(rr, nil); if(rr->ofcall.count == m->count) continue; m->count -= rr->ofcall.count; m->data += rr->ofcall.count; } if(q->rd[i].hd == nil) q->rd[i].hd = m; if(q->rd[i].tl != nil) q->rd[i].tl->next = m; q->rd[i].tl = m; msgref(m); } if(q->loghd == nil) q->loghd = m; if(q->logtl != nil) q->logtl->next = m; q->logtl = msgref(m); q->logsz += m->count; q->logtl = m; trimlog(q); q->qid.vers++; r->ofcall.count = r->ifcall.count; respond(r, nil); } void mqread(Req *r) { Aux *a; Msg *m; Rd *rd; Mq *q; if(r->fid->qid.path == Qroot){ dirread9p(r, rootgen, nil); respond(r, nil); return; } if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; /* no messages: enqueue until next one comes */ if(q->rd[a->id].hd == nil){ q->rd[a->id].wait = r; return; } /* queued messages: pop data off */ rd = &q->rd[a->id]; m = rd->hd; r->ofcall.data = m->data + rd->off; r->ofcall.count = r->ifcall.count + rd->off; if(r->ofcall.count > m->count) r->ofcall.count = m->count; respond(r, nil); /* adjust offsets */ if(m->count > r->ifcall.count) rd->off += r->ifcall.count; else{ rd->off = 0; rd->hd = m->next; if(rd->hd == nil) rd->tl = nil; msgunref(m); } } void mqcreate(Req *r) { Aux *a; Mq *q; int m; if(lookup(r->ifcall.name) != nil){ respond(r, Eexist); return; } m = r->ifcall.mode & OMASK; q = emalloc(sizeof(Mq)); q->name = estrdup(r->ifcall.name); q->mode = r->ifcall.mode; q->qid.path = queueid++; q->qid.vers = 0; q->qid.type = QTFILE; a = emalloc(sizeof(Aux)); a->q = q; if(m == OREAD || m == ORDWR || m == OEXEC) a->id = subscribe(q); r->ofcall.qid = q->qid; r->fid->qid = q->qid; r->fid->aux = a; queues = erealloc(queues, ++nqueues*sizeof(Mq*)); queues[nqueues-1] = q; respond(r, nil); } void mqopen(Req *r) { Aux *a; vlong p; int m; if(r->fid->aux != nil){ respond(r, Einuse); return; } m = r->ifcall.mode & OMASK; p = r->fid->qid.path; a = emalloc(sizeof(Aux)); if(p != Qroot){ a->q = queues[p-1]; if(m == OREAD || m == ORDWR || m == OMASK) a->id = subscribe(a->q); r->fid->aux = a; } r->ofcall.qid = r->fid->qid; respond(r, nil); } void destroyfid(Fid *f) { Aux *a; int m; a = f->aux; m = f->omode & OMASK; if(m != OREAD && m != ORDWR && m != OEXEC) return; if(a != nil) a->q->rd[a->id].id = -1; } void mqattach(Req *r) { r->ofcall.qid = (Qid){Qroot, 0, QTDIR}; r->fid->qid = r->ofcall.qid; r->fid->aux = nil; respond(r, nil); } Srv mq = { .attach=mqattach, .open=mqopen, .create=mqcreate, .read=mqread, .write=mqwrite, .remove=mqremove, .flush=mqflush, .stat=mqstat, .walk1=mqwalk1, .clone=mqclone, .destroyfid=destroyfid, }; void usage(void) { fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0); exits("usage"); } void main(int argc, char **argv) { char *srvname, *mntpt; srvname = "mq"; mntpt = "/mnt/mq"; ARGBEGIN{ case 'd': chatty9p++; break; case 's': srvname=EARGF(usage()); break; case 'm': mntpt = EARGF(usage()); break; case 'r': maxlog = atoi(EARGF(usage())); break; default: usage(); }ARGEND; postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL); }