shithub: gefs

Download patch

ref: 2354024d13b1d899cca1f0bce8f953e8b1a093c1
parent: 8edbe1a13c00f0c2f3460e182aa9664292963d32
author: Ori Bernstein <[email protected]>
date: Tue Sep 21 22:36:14 EDT 2021

fs: handle truncation a bit better, accumulate messages correctly

--- a/blk.c
+++ b/blk.c
@@ -148,7 +148,6 @@
 	if(r == nil || off + len > r->off + r->len)
 		abort();
 
-	print("\tmerge (%llx,%llx) (%llx,%llx)\n", off, len, r->off, r->len);
 	if(off == r->off){
 		r->off += len;
 		r->len -= len;
@@ -260,7 +259,6 @@
 	bp = a->log;
 
 Nextblk:
-	dprint("block: %llx\n", bp);
 	if((b = readblk(bp, 0)) == nil)
 		return -1;
 	p = b->data;
@@ -333,9 +331,6 @@
 	Blk *hd, *ab, *b;
 	char *p;
 
-showfree("precompress");
-fprint(2, "compress start\n");
-
 	/*
 	 * Sync the current log to disk, and
 	 * set up a new block log tail.  While
@@ -375,7 +370,6 @@
 		}
 	}
 	a->logtl = b;
-print("\tnew log block: %llx\n", b->off);
 
 	/*
 	 * Prepare what we're writing back.
@@ -444,7 +438,6 @@
 					break;
 				}
 			}
-			fprint(2, "\tpostscan: freeing %llx\n", bp);
 			if(blkdealloc(bp) == -1)
 				return -1;
 		}
@@ -488,7 +481,6 @@
 		avldelete(t, r);
 		free(r);
 	}
-fprint(2, "\talloc %llx\n", b);
 	return b;
 }
 
@@ -560,8 +552,9 @@
 
 	if((bp = blkalloc(-1)) == -1)
 		return nil;
-	if((b = mallocz(sizeof(Blk), 1)) == nil)
-		return nil;
+	if((b = lookupblk(bp)) == nil)
+		if((b = mallocz(sizeof(Blk), 1)) == nil)
+			return nil;
 	b->type = t;
 	b->flag = Bdirty;
 	b->off = bp;
@@ -600,7 +593,6 @@
 	/* FIXME: better hash. */
 	assert(b->off != 0);
 	h = ihash(b->off);
-//	dprint("cache %lld (h=%xm, bkt=%d) => %p\n", b->off, h%fs->cmax, h, b);
 	ainc(&b->ref);
 	bkt = &fs->cache[h % fs->cmax];
 	lock(bkt);
@@ -702,12 +694,12 @@
 void
 enqueue(Blk *b)
 {
-	print("sync %llx\n", b->off);
 	assert(b->flag&Bdirty);
 	finalize(b);
 	if(syncblk(b) == -1){
 		ainc(&fs->broken);
 		fprint(2, "write: %r");
+		abort();
 	}
 }
 
@@ -777,11 +769,12 @@
 	if((b = lookupblk(bp)) == nil){
 		if((b = readblk(bp, flg)) == nil)
 			return nil;
-		if(siphash(b->buf, Blksz) != bh){
-			werrstr("corrupt block %llx", bp);
+		if(blkhash(b) != bh){
+			werrstr("corrupt block %llx: %llx != %llx", bp, blkhash(b), bh);
 			return nil;
 		}
 	}
+	assert(b->off == bp);
 	return cacheblk(b);
 }
 
@@ -819,8 +812,8 @@
 {
 	if(b == nil)
 		return;
-	assert((b->flag & Bqueued) || !(b->flag & Bdirty));
 	if(adec(&b->ref) == 0){
+		assert((b->flag & Bqueued) || !(b->flag & Bdirty));
 		cachedel(b->off);
 		free(b);
 	}
@@ -846,7 +839,6 @@
 	int i, r;
 	Blk *b;
 
-	dprint("syncing\n");
 	r = 0;
 	for(i = 0; i < fs->narena; i++){
 		b = fs->arenas[i].logtl;
@@ -856,7 +848,6 @@
 	}
 	/* FIXME: hit it with a big hammer -- flush the whole cache */
 	for(b = fs->chead; b != nil; b = b->cnext){
-//		dprint("sync %p\n", b);
 		if(!(b->flag & Bdirty))
 			continue;
 		if(syncblk(b) == -1)
--- a/check.c
+++ b/check.c
@@ -151,12 +151,12 @@
 	if(b->type == Tpivot){
 		for(i = 0; i < b->nbuf; i++){
 			getmsg(b, i, &m);
-			fprint(fd, "%.*s|%M\n", 4*indent, spc, &m);
+			fprint(fd, "%.*s[%03d]|%M\n", 4*indent, spc, i, &m);
 		}
 	}
 	for(i = 0; i < b->nval; i++){
 		getval(b, i, &kv);
-		fprint(fd, "%.*s|%P\n", 4*indent, spc, &kv);
+		fprint(fd, "%.*s[%03d]|%P\n", 4*indent, spc, i, &kv);
 		if(b->type == Tpivot){
 			if((c = getblk(kv.bp, kv.bh, 0)) == nil)
 				sysfatal("failed load: %r");
--- a/dat.h
+++ b/dat.h
@@ -40,9 +40,10 @@
 	 * maximally filled tree.
 	 */
 	Loghdsz	= 8,			/* log hash */
-	Keymax	= 32,			/* key data limit */
-	Inlmax	= 128,			/* inline data limit */
+	Keymax	= 128,			/* key data limit */
+	Inlmax	= 256,			/* inline data limit */
 	Ptrsz	= 18,			/* off, hash, fill */
+	Offsz	= 17,			/* type, qid, off */
 	Kvmax	= Keymax + Inlmax,	/* Key and value */
 	Kpmax	= Keymax + Ptrsz,	/* Key and pointer */
 	
@@ -396,9 +397,11 @@
 struct Scan {
 	vlong	offset;	/* last read offset */
 	Tree	root;
+	Dir	dir;
 
 	int	done;
 	int	overflow;
+	int	present;
 	Kvp	kv;
 	Key	pfx;
 	char	kvbuf[Kvmax];
--- a/fns.h
+++ b/fns.h
@@ -60,7 +60,7 @@
 
 #define dprint(...) \
 	do{ \
-		if(1) fprint(2, __VA_ARGS__); \
+		if(debug) fprint(2, __VA_ARGS__); \
 	}while(0)
 
 char	*pack8(int*, char*, char*, uchar);
--- a/fs.c
+++ b/fs.c
@@ -3,6 +3,7 @@
 #include <fcall.h>
 #include <avl.h>
 #include <bio.h>
+#include <pool.h>
 
 #include "dat.h"
 #include "fns.h"
@@ -131,12 +132,15 @@
 	if(!debug)
 		return;
 	fprint(2, "fids:---\n");
+	lock(&fs->fidtablk);
 	for(i = 0; i < Nfidtab; i++)
 		for(f = fs->fidtab[i]; f != nil; f = f->next){
 			rlock(f->dent);
 			fprint(2, "\tfid[%d]: %d [refs=%ld, k=%K]\n", i, f->fid, f->dent->ref, &f->dent->Key);
 			runlock(f->dent);
-		}		
+		}
+	unlock(&fs->fidtablk);
+
 }
 
 Fid*
@@ -422,8 +426,6 @@
 		rerror(m, Enomem);
 		return;
 	}
-checkfs();
-showfids();
 	r.qid = d.qid;
 	respond(m, &r);
 	return;
@@ -632,6 +634,10 @@
 	f->mode = m->mode;
 	f->qpath = d.qid.path;
 	f->dent = dent;
+	wlock(f->dent);
+//	freeb(dent, 0, dent->length);
+	dent->length = 0;
+	wunlock(f->dent);
 	unlock(f);
 
 	r.type = Rcreate;
@@ -648,10 +654,6 @@
 	Msg mb;
 	Fid *f;
 
-	if(okname(m->name) == -1){
-		rerror(m, Ename);
-		return;
-	}
 	if((f = getfid(m->fid)) == nil){
 		rerror(m, "no such fid");
 		return;
@@ -730,6 +732,12 @@
 //		refblk(fs->root.bp);
 		unlock(&fs->root.lk);
 	}
+	if(f->mode & OTRUNC){
+		wlock(f->dent);
+//		freeb(f->dent, 0, dent->length);
+		f->dent->length = 0;
+		wunlock(f->dent);
+	}
 	unlock(f);
 	respond(m, &r);
 }
@@ -773,7 +781,7 @@
 	p = r->data;
 	n = m->count;
 	if(s->overflow){
-		if((ns = kv2statbuf(&s->kv, p, n)) == -1)
+		if((ns = convD2M(&s->dir, (uchar*)p, n)) <= BIT16SZ)
 			return Edscan;
 		s->overflow = 0;
 		p += ns;
@@ -784,9 +792,8 @@
 			return e;
 		if(done)
 			break;
-		if((ns = kv2statbuf(&s->kv, p, n)) == -1){
+		if((ns = convD2M(&s->dir, (uchar*)p, n)) <= BIT16SZ){
 			s->overflow = 1;
-			fprint(2, "** could not fill buf: %r\n");
 			break;
 		}
 		fprint(2, "*** nscan: %d\n", ns);
@@ -801,7 +808,7 @@
 readb(Fid *f, char *d, vlong o, vlong n, int sz)
 {
 	char *e, buf[17];
-	vlong bp, bh, bo;
+	vlong fb, fo, bp, bh;
 	Blk *b;
 	Key k;
 	Kvp kv;
@@ -809,14 +816,14 @@
 	if(o >= sz)
 		return 0;
 
-	bp = o & ~(Blksz-1);
-	bo = o & (Blksz-1);
+	fb = o & ~(Blksz-1);
+	fo = o & (Blksz-1);
 
 	k.k = buf;
 	k.nk = sizeof(buf);
 	k.k[0] = Kdat;
 	PBIT64(k.k+1, f->qpath);
-	PBIT64(k.k+9, bp);
+	PBIT64(k.k+9, fb);
 
 	e = fslookup(f, &k, &kv, &b, 0);
 	if(e != nil && e != Eexist){
@@ -831,12 +838,11 @@
 
 	if((b = getblk(bp, bh, GBraw)) == nil)
 		return -1;
-	fprint(2, "\treading(%lld+%d) from %llx (%llx) %s %s\n", o, n, bp, b->off, b->buf, b->data);
-	if(bo+n > Blksz)
-		n = Blksz-bo;
+	if(fo+n > Blksz)
+		n = Blksz-fo;
 	if(b != nil){
 		fprint(2, "\tcopying %lld to resp %p\n", n, d);
-		memcpy(d, b->buf+bo, n);
+		memcpy(d, b->buf+fo, n);
 		putblk(b);
 	}else
 		memset(d, 0, n);
@@ -867,11 +873,10 @@
 	o = m->offset;
 	if(m->offset + m->count > e->length)
 		c = e->length - m->offset;
-//showfs("pre-readb");
 	while(c != 0){
 		n = readb(f, p, o, c, e->length);
-print("after readb: p[%d]=%.*s\n", n, n, p);
 		if(n == -1){
+			fprint(2, "read: %r\n");
 			runlock(e);
 			return Efs;
 		}
@@ -921,7 +926,7 @@
 }
 
 int
-writeb(Fid *f, Msg *m, char *s, vlong o, vlong n, int sz)
+writeb(Fid *f, Msg *m, char *s, vlong o, vlong n, vlong sz)
 {
 	vlong fb, fo, bp, bh;
 	Blk *b, *t;
@@ -935,7 +940,6 @@
 	PBIT64(m->k+9, fb);
 
 
-print("%lld < %d && (%lld != 0 || %lld != %lld\n", fb, sz, fo, n, Blksz);
 	b = newblk(Traw);
 	if(b == nil)
 		return -1;
@@ -959,14 +963,14 @@
 	if(fo+n > Blksz)
 		n = Blksz-fo;
 	memcpy(b->buf+fo, s, n);
-print("blk contents{{%.*s}}\n", (int)(fo+n), b->data);
 	enqueue(b);
-	putblk(b);
-	fprint(2, "\twrote to new blk %llx at offset %lld\n", b->off, o);
+
 	bh = blkhash(b);
 	PBIT64(m->v+0, b->off);
 	PBIT64(m->v+8, bh);
-	fprint(2, "\tkv: %M", m);
+	putblk(b);
+	checkfs();
+	poolcheck(mainmem);
 	return n;
 }
 
@@ -973,7 +977,7 @@
 void
 fswrite(Fmsg *m)
 {
-	char sbuf[8], offbuf[4][13+16], *p;
+	char sbuf[8], offbuf[4][Ptrsz+Offsz], *p;
 	vlong n, o, c;
 	Msg kv[4];
 	Fcall r;
@@ -990,6 +994,7 @@
 		return;
 	}
 
+	wlock(f->dent);
 	p = m->data;
 	o = m->offset;
 	c = m->count;
@@ -996,11 +1001,10 @@
 	for(i = 0; i < nelem(kv)-1 && c != 0; i++){
 		kv[i].op = Oinsert;
 		kv[i].k = offbuf[i];
-		kv[i].nk = 17;
-		kv[i].v = offbuf[i]+17;
+		kv[i].nk = Offsz;
+		kv[i].v = offbuf[i]+Offsz;
 		kv[i].nv = 16;
 		n = writeb(f, &kv[i], p, o, c, f->dent->length);
-		btupsert(&fs->root, &kv[i], 1);
 		if(n == -1){
 			// badwrite(f, i);
 			// FIXME: free pages
@@ -1011,7 +1015,6 @@
 		c -= n;
 	}
 
-	wlock(f->dent);
 	kv[i].op = Owstat;
 	kv[i].k = f->dent->k;
 	kv[i].nk = f->dent->nk;
@@ -1023,8 +1026,7 @@
 		PBIT64(kv[i].v, m->offset+m->count);
 		f->dent->length = m->offset+m->count;
 	}
-	btupsert(&fs->root, &kv[i], 1);
-//	btupsert(&fs->root, kv, i+1);
+	btupsert(&fs->root, kv, i+1);
 	wunlock(f->dent);
 
 	r.type = Rwrite;
--- a/main.c
+++ b/main.c
@@ -206,7 +206,7 @@
 		fs->rdchan = mkchan(128);
 		fs->wrchan = mkchan(128);
 		srvfd = postfd(srvname, "");
-		ctlfd = postfd(srvname, ".ctl");
+		ctlfd = postfd(srvname, ".cmd");
 		loadfs(argv[0]);
 		launch(runctl, (void*)ctlfd, "ctl");
 		launch(runwrite, nil, "writeio");
--- a/pack.c
+++ b/pack.c
@@ -198,8 +198,11 @@
 	err = 0;
 	k = kv->k + 9;
 	ek = kv->k + kv->nk;
-dprint("unpacking... [%d %d]\n", k[0], k[1]);
 	k = unpackstr(&err, k, ek, &d->name);
+	if(err){
+		werrstr("key too small [%d]", kv->nk);
+		return -1;
+	}
 
 	v = kv->v;
 	ev = v + kv->nv;
@@ -214,8 +217,8 @@
 	v = unpackstr(&err, v, ev, &d->gid);
 	v = unpackstr(&err, v, ev, &d->muid);
 	if(err){
-		abort();
-		werrstr("kv too small");
+		print("fucked: %P\n", kv);
+		werrstr("val too small [%s]", d->name);
 		return -1;
 	}
 	if(k != ek){
--- a/tree.c
+++ b/tree.c
@@ -334,10 +334,10 @@
 }
 
 int
-filledbuf(Blk *b, int needed)
+filledbuf(Blk *b, int nmsg, int needed)
 {
 	assert(b->type == Tpivot);
-	return 2*(b->nbuf+1) + b->bufsz + needed > Bufspc;
+	return 2*(b->nbuf+nmsg) + b->bufsz + needed > Bufspc;
 }
 
 
@@ -596,7 +596,6 @@
 			PBIT32(kv.v+25, v);
 		}
 		if(m->op & Owsize){
-			fprint(2, "wstat: incrementing size");
 			v = GBIT64(p);
 			p += 8;
 			PBIT64(kv.v+33, v);
@@ -607,7 +606,7 @@
 			PBIT32(kv.v+33, v);
 		}
 		if(m->op & Owname){
-			fprint(2, "renames not yet supported");
+			fprint(2, "renames not yet supported\n");
 			abort();
 		}
 		if(p != m->v + m->nv)
@@ -862,7 +861,7 @@
 	if(rb->type == Tleaf && !filledleaf(rb, sz))
 		for(i = 0; i < nmsg; i++)
 			apply(rb, &msg[i]);
-	else if(rb->type == Tpivot && !filledbuf(rb, sz))
+	else if(rb->type == Tpivot && !filledbuf(rb, nmsg, sz))
 		for(i = 0; i < nmsg; i++)
 			bufinsert(rb, &msg[i]);
 	else
@@ -945,11 +944,11 @@
 				goto error;
 			for(i = p[-1].lo; i < p[-1].hi; i++){
 				getmsg(p[-1].b, i, &m);
-				if(filledbuf(p->n, msgsz(&m)))
+				if(filledbuf(p->n, 1, msgsz(&m)))
 					break;
 				bufinsert(p->n, &m);
 			}
-			if(p == oldroot && !filledbuf(p->n, path[0].sz)){
+			if(p == oldroot && !filledbuf(p->n, nmsg, path[0].sz)){
 				r = p->n;
 				*redo = insertmsg(r, msg, nmsg, path[0].sz);
 			}
@@ -963,7 +962,7 @@
 				getmsg(p[-1].b, i, &m);
 				if(keycmp(&m, &mid) >= 0)
 					b = p->r;
-				if(filledbuf(b, msgsz(&m)))
+				if(filledbuf(b, 1, msgsz(&m)))
 					continue;
 				bufinsert(b, &m);
 			}
@@ -1018,7 +1017,6 @@
 	Msg m;
 
 	j = 0;
-	lo = 0;
 	maxsz = 0;
 	p->b = b;
 	/* 
@@ -1030,6 +1028,7 @@
 		if(i < b->nval)
 			getval(b, i, &kv);
 		cursz = 0;
+		lo = j;
 		for(; j < b->nbuf; j++){
 			getmsg(b, j, &m);
 			if(i < b->nval && keycmp(&m, &kv) >= 0)
@@ -1043,7 +1042,6 @@
 			p->hi = j;
 			p->sz = maxsz;
 			p->idx = i - 1;
-			lo = j;
 		}
 	}
 }
@@ -1088,7 +1086,7 @@
 
 	path[0].sz = sz;
 	while(b->type == Tpivot){
-		if(!filledbuf(b, path[npath - 1].sz))
+		if(!filledbuf(b, nmsg, path[npath - 1].sz))
 			break;
 		victim(b, &path[npath]);
 		getval(b, path[npath].idx, &sep);
@@ -1258,7 +1256,6 @@
 
 	lock(&t->lk);
 	s->root = *t;
-//dprint("height %d\n", s->root.ht);
 	unlock(&t->lk);
 	if((s->path = calloc(s->root.ht, sizeof(Scanp))) == nil){
 		free(s);
@@ -1284,14 +1281,61 @@
 			assert(i == s->root.ht-1);
 		}
 	}
-//dprint("inited\n");
-//for(i = 0; i < s->root.ht; i++){
-//dprint("\t%p", p[i].b);
-//dprint(" (%d %d)\n", p[i].vi, p[i].bi);
-//}
 	return nil;
 }
 
+int
+accum(Scan *s, Msg *m)
+{
+	vlong v;
+	char *p;
+	Dir *d;
+
+	d = &s->dir;
+	switch(m->op&0xf){
+	case Onop:
+	case Oinsert:
+		s->present = 1;
+		kv2dir(m, d);
+		fprint(2, "name: %s\n", d->name);
+		break;
+	case Odelete:
+		s->present = 0;
+		break;
+	case Owstat:
+		p = m->v;
+		d->qid.vers++;
+		if(m->op & Owmtime){
+			v = GBIT64(p);
+			p += 8;
+			d->mtime = v;
+		}
+		if(m->op & Owsize){
+			v = GBIT64(p);
+			p += 8;
+			d->length = v;
+		}
+		if(m->op & Owmode){
+			v = GBIT32(p);
+			p += 4;
+			d->mode = v;
+		}
+		if(m->op & Owname){
+			fprint(2, "renames not yet supported\n");
+			abort();
+		}
+		if(p != m->v + m->nv){
+			fprint(2, "malformed wstat message");
+			abort();
+		}
+		break;
+	default:
+		abort();
+	}
+	return 0;
+
+}
+
 char *
 btnext(Scan *s, Kvp *r, int *done)
 {
@@ -1301,16 +1345,12 @@
 	Kvp kv;
 
 Again:
+	/* load up the correct blocks for the scan */
 	p = s->path;
 	h = s->root.ht;
 	*done = 0;
 	start = h;
 	for(i = h-1; i > 0; i--){
-//dprint("advancing (i=%d)\n", i);
-//for(j = 0; j < h; j++){
-//dprint("\t%p", p[j].b);
-//dprint(" (%d %d)\n", p[j].vi, p[j].bi);
-//}
 		if(p[i].vi < p[i].b->nval || p[i].bi < p[i].b->nbuf)
 			break;
 		if(i == 0){
@@ -1327,6 +1367,8 @@
 		if((p[i].b = getblk(kv.bp, kv.bh, 0)) == nil)
 			return "error reading block";
 	}
+
+	/* find the minimum key along the path up */
 	m.op = Onop;
 	getval(p[h-1].b, p[h-1].vi, &m);
 	for(i = h-2; i >= 0; i--){
@@ -1333,7 +1375,7 @@
 		if(p[i].bi == p[i].b->nbuf)
 			continue;
 		getmsg(p[i].b, p[i].bi, &n);
-		if(keycmp(&m, &n) >= 0)
+		if(keycmp(&n, &m) < 0)
 			m = n;
 	}
 	if(m.nk < s->pfx.nk || memcmp(m.k, s->pfx.k, s->pfx.nk) != 0){
@@ -1340,14 +1382,20 @@
 		*done = 1;
 		return nil;
 	}
+
+	/* scan all messages applying to the message */
 	getval(p[h-1].b, p[h-1].vi, &t);
-	if(keycmp(&m, &t) == 0)
+	if(keycmp(&m, &t) == 0){
+		t.op = Onop;
+		accum(s, &t);
 		p[h-1].vi++;
+	}
 	for(i = h-2; i >= 0; i--){
 		for(j = p[i].bi; j < p[i].b->nbuf; j++){
 			getmsg(p[i].b, j, &t);
 			if(keycmp(&m, &t) != 0)
 				break;
+			accum(s, &t);
 			p[i].bi++;
 			m = t;
 		}