shithub: mq

ref: 111d42550f50f79b9b357d130b8bcdbafc5165ee
dir: /mq.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

typedef struct Mq Mq;
typedef struct Rd Rd;
typedef struct Msg Msg;
typedef struct Aux Aux;

enum {
	Qroot,
};

struct Aux {
	Mq	*q;
	int	id;
};

struct Msg {
	Ref;
	Msg	*next;
	int	count;
	char	*data;
	char	buf[];
};

struct Rd {
	int	id;
	int	off;
	Msg	*hd;
	Msg	*tl;
	Req	*wait;
};

struct Mq {
	Qid	qid;
	int	count;
	usize	logsz;
	Msg	*loghd;
	Msg	*logtl;
	Msg	*hd;
	Msg	*tl;

	int	nrd;
	Rd	*rd;

	char	*name;
	char	*user;
	char	*group;
	int	mode;
};

Mq	**queues;
int	nqueues;
vlong	maxlog = -1;
vlong	queueid = Qroot + 1;

char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
char Eexist[] = "file already exists";
char Enoexist[] = "file does not exists";

void *
emalloc(ulong n)
{
	void *v;
	
	v = mallocz(n, 1);
	if(v == nil)
		sysfatal("malloc: %r");
	setmalloctag(v, getcallerpc(&n));
	return v;
}

void *
erealloc(void *p, ulong n)
{
	void *v;
	
	v = realloc(p, n);
	if(v == nil)
		sysfatal("realloc: %r");
	setmalloctag(v, getcallerpc(&p));
	return v;
}

char*
estrdup(char *s)
{
	s = strdup(s);
	if(s == nil)
		sysfatal("strdup: %r");
	setmalloctag(s, getcallerpc(&s));
	return s;
}

Msg*
msgref(Msg *m)
{
	incref(m);
	return m;
}

void
msgunref(Msg *m)
{
	if(decref(m) == 0)
		free(m);
}

void
trimlog(Mq *q)
{
	Msg *m, *n;

	if(maxlog < 0)
		return;
	n = nil;
	for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){
		n = m->next;
		q->logsz -= m->count;
		msgunref(m);
	}
	q->loghd = n;
	if(m == nil)
		q->logtl = nil;
}

int
subscribe(Mq *q)
{
	Msg *m;
	Rd *rd;
	int i;

	rd = nil;
	for(i = 0; i < q->nrd; i++){
		if(q->rd[i].id == -1){
			rd = &q->rd[i];
			break;
		}
	}
	if(rd == nil){
		q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd));
		rd = &q->rd[q->nrd - 1];
	}
	rd->id = i;
	rd->wait = nil;
	rd->off = 0;
	rd->hd = q->loghd;
	rd->tl = q->logtl;
	for(m = q->loghd; m != nil; m = m->next)
		msgref(m);
	return rd->id;
}

Mq*
lookup(char *name)
{
	int i;

	for(i = 0; i < nqueues; i++)
		if(strcmp(queues[i]->name, name) == 0)
			return queues[i];
	return nil;
}

void
qstat(Dir *d, Mq *q)
{
	d->name = estrdup9p(q->name);
	d->uid = estrdup9p("glenda");
	d->gid = estrdup9p("glenda");
	d->muid = estrdup9p("glenda");
	d->qid = q->qid;
	d->mtime = 0;
	d->atime = 0;
	d->mode = q->mode;
}

int
rootgen(int i, Dir *d, void *)
{
	if(i >= nqueues)
		return -1;
	qstat(d, queues[i]);
	return 0;
}
	

char*
mqclone(Fid *old, Fid *new)
{
	Aux *o, *n;

	o = old->aux;
	if(o != nil){
		n = emalloc(sizeof(Aux));
		n->q = o->q;
		n->id = subscribe(o->q);
		new->aux = n;
	}
	return nil;
}

char*
mqwalk1(Fid *f, char *name, Qid *qid)
{
	Mq *q;

	switch(f->qid.path){
	case Qroot:
		if(strcmp(name, "..") == 0){
			*qid = f->qid;
			return nil;
		}
		if((q = lookup(name)) == nil)
			return Enoexist;
		f->qid = q->qid;
		*qid = f->qid;
		return nil;
	default:
		if(strcmp(name, "..") == 0){
			f->qid = (Qid){Qroot, 0, QTDIR};
			*qid = f->qid;
			return nil;
		}
		return "not a dir";
	}	
}

void
mqstat(Req *r)
{
	switch(r->fid->qid.path){
	case Qroot:
		r->d.uid = estrdup9p("glenda");
		r->d.gid = estrdup9p("glenda");
		r->d.muid = estrdup9p("glenda");
		r->d.qid = r->fid->qid;
		r->d.mtime = 0;
		r->d.atime = 0;
		r->d.mode = 0755;
		break;
	default:
		qstat(&r->d, ((Aux*)r->fid->aux)->q);
	}
	respond(r, nil);
}

void
mqflush(Req *r)
{
	Req *w;
	Aux *a;

	if((a = r->oldreq->fid->aux) != nil){
		w = a->q->rd[a->id].wait;
		if(w != nil)
			respond(w, "interrupted");
		a->q->rd[a->id].wait = nil;
	}
	respond(r, nil);
}

void
mqremove(Req *r)
{
	USED(r);
	abort();
}

void
mqwrite(Req *r)
{
	Msg *m;
	Req *rr;
	Aux *a;
	Mq *q;
	int i;

	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;
	m = emalloc(sizeof(Msg) + r->ifcall.count);
	m->data = m->buf;
	m->count = r->ifcall.count;
	memmove(m->data, r->ifcall.data, m->count);
	m->next = nil;
	for(i = 0; i < q->nrd; i++){
		if(q->rd[i].id == -1)
			continue;
		rr = q->rd[i].wait;
		q->rd[i].wait = nil;
		if(rr != nil){
			rr->ofcall.data = m->data;
			rr->ofcall.count = m->count;
			if(rr->ifcall.count > m->count)
				rr->ofcall.count = m->count;
			respond(rr, nil);
			if(rr->ofcall.count == m->count)
				continue;
			m->count -= rr->ofcall.count;
			m->data += rr->ofcall.count;
		}
		if(q->rd[i].hd == nil)
			q->rd[i].hd = m;
		if(q->rd[i].tl != nil)
			q->rd[i].tl->next = m;
		q->rd[i].tl = m;
		msgref(m);
	}
	if(q->loghd == nil)
		q->loghd = m;
	if(q->logtl != nil)
		q->logtl->next = m;
	q->logtl = msgref(m);
	q->logsz += m->count;
	q->logtl = m;

	trimlog(q);
	q->qid.vers++;
	r->ofcall.count = r->ifcall.count;
	respond(r, nil);
}

void
mqread(Req *r)
{
	Aux *a;
	Msg *m;
	Rd *rd;
	Mq *q;

	if(r->fid->qid.path == Qroot){
		dirread9p(r, rootgen, nil);
		respond(r, nil);
		return;
	}
	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;

	/* no messages: enqueue until next one comes */
	if(q->rd[a->id].hd == nil){
		q->rd[a->id].wait = r;
		return;
	}

	/* queued messages: pop data off */
	rd = &q->rd[a->id];
	m = rd->hd;
	r->ofcall.data = m->data + rd->off;
	r->ofcall.count = r->ifcall.count + rd->off;
	if(r->ofcall.count > m->count)
		r->ofcall.count = m->count;
	respond(r, nil);

	/* adjust offsets */
	if(m->count > r->ifcall.count)
		rd->off += r->ifcall.count;
	else{
		rd->off = 0;
		rd->hd = m->next;
		if(rd->hd == nil)
			rd->tl = nil;
		msgunref(m);
	}
}

void
mqcreate(Req *r)
{
	Aux *a;
	Mq *q;
	int m;

	if(lookup(r->ifcall.name) != nil){
		respond(r, Eexist);
		return;
	}
	m = r->ifcall.mode & OMASK;
	q = emalloc(sizeof(Mq));
	q->name = estrdup(r->ifcall.name);
	q->mode = r->ifcall.mode;
	q->qid.path = queueid++;
	q->qid.vers = 0;
	q->qid.type = QTFILE;
	a = emalloc(sizeof(Aux));
	a->q = q;
	if(m == OREAD || m == ORDWR || m == OEXEC)
		a->id = subscribe(q);
	r->ofcall.qid = q->qid;
	r->fid->qid = q->qid;
	r->fid->aux = a;
	queues = erealloc(queues, ++nqueues*sizeof(Mq*));
	queues[nqueues-1] = q;

	respond(r, nil);
}

void
mqopen(Req *r)
{
	Aux *a;
	vlong p;
	int m;

	if(r->fid->aux != nil){
		respond(r, Einuse);
		return;
	}
	m = r->ifcall.mode & OMASK;
	p = r->fid->qid.path;
	a = emalloc(sizeof(Aux));
	if(p != Qroot){
		a->q = queues[p-1];
		if(m == OREAD || m == ORDWR || m == OMASK)
			a->id = subscribe(a->q);
		r->fid->aux = a;
	}
	r->ofcall.qid = r->fid->qid;
	respond(r, nil);
}

void
destroyfid(Fid *f)
{
	Aux *a;
	int m;

	a = f->aux;
	m = f->omode & OMASK;
	if(m != OREAD && m != ORDWR && m != OEXEC)
		return;
	if(a != nil)
		a->q->rd[a->id].id = -1;
}

void
mqattach(Req *r)
{
	r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
	r->fid->qid = r->ofcall.qid;
	r->fid->aux = nil;
	respond(r, nil);
}

Srv mq  = {
	.attach=mqattach,
	.open=mqopen,
	.create=mqcreate,
	.read=mqread,
	.write=mqwrite,
	.remove=mqremove,
	.flush=mqflush,
	.stat=mqstat,
	.walk1=mqwalk1,
	.clone=mqclone,
	.destroyfid=destroyfid,
};

void
usage(void)
{
	fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
	exits("usage");
}

void
main(int argc, char **argv)
{
	char *srvname, *mntpt;

	srvname = "mq";
	mntpt = "/mnt/mq";
	ARGBEGIN{
	case 'd':
		chatty9p++;
		break;
	case 's':
		srvname=EARGF(usage());
		break;
	case 'm':
		mntpt = EARGF(usage());
		break;
	case 'r':
		maxlog = atoi(EARGF(usage()));
		break;
	default:
		usage();
	}ARGEND;

	postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
}