ref: b73fa273322688000ae7c71d7582b019c0265671
parent: af48e0e4130a9c7a0340fa61814d4f6d98359959
author: Ori Bernstein <[email protected]>
date: Sun Nov 21 12:10:44 EST 2021
splitleaf: switch blocks in the same loop this cleans up the merge code later.
--- a/blk.c
+++ b/blk.c
@@ -2,7 +2,6 @@
#include <libc.h>
#include <fcall.h>
#include <avl.h>
-#include <pool.h>
#include "dat.h"
#include "fns.h"
@@ -584,7 +583,7 @@
b->cprev = nil;
b->hnext = nil;
- print("new block %B from %p, flag=%x\n", b->bp, getcallerpc(&t), b->flag);
+ dprint("new block %B from %p, flag=%x\n", b->bp, getcallerpc(&t), b->flag);
return cacheblk(b);
}
@@ -745,7 +744,7 @@
wlock(b);
b->flag |= Bzombie;
wunlock(b);
- fprint(2, "freeing block %B @ %ld, from 0x%p\n", b->bp, b->ref, getcallerpc(&b));
+ dprint("freeing block %B @ %ld, from 0x%p\n", b->bp, b->ref, getcallerpc(&b));
assert((b->flag & Bqueued) == 0);
a = getarena(b->bp.addr);
--- a/cache.c
+++ b/cache.c
@@ -74,7 +74,7 @@
}
static void
-cachedel_(vlong del)
+cachedel(vlong del)
{
Bucket *bkt;
Blk *b, **p;
--- a/check.c
+++ b/check.c
@@ -28,6 +28,7 @@
badblk(Blk *b, int h, Kvp *lo, Kvp *hi)
{
Kvp x, y;
+ Msg mx, my;
int i, r;
Blk *c;
int fail;
@@ -53,8 +54,11 @@
fail++;
}
getval(b, 0, &x);
- if(lo && keycmp(lo, &x) != 0)
+ if(lo && keycmp(lo, &x) > 0){
fprint(2, "out of range keys %P != %P\n", lo, &x);
+ showblk(b, "wut", 1);
+ fail++;
+ }
for(i = 1; i < b->nval; i++){
getval(b, i, &y);
if(hi && keycmp(&y, hi) >= 0){
@@ -94,6 +98,54 @@
}
x = y;
}
+ if(b->type == Tpivot){
+ getval(b, b->nval-1, &y);
+ if((c = getblk(x.bp, 0)) == nil){
+ fprint(2, "corrupt block: %r\n");
+ fail++;
+ }
+ if(c != nil && badblk(c, h - 1, &y, nil))
+ fail++;
+ }
+ if(b->type == Tpivot){
+ if(b->nbuf > 0){
+ getmsg(b, 0, &mx);
+ if(hi && keycmp(&mx, hi) >= 0){
+ fprint(2, "out of range messages %P != %M\n", hi, &mx);
+ fail++;
+ }
+ }
+ for(i = 1; i < b->nbuf; i++){
+ getmsg(b, i, &my);
+ switch(my.op){
+ case Oinsert: /* new kvp */
+ case Odelete: /* delete kvp */
+ case Oqdelete: /* delete kvp if exists */
+ break;
+ case Owstat: /* kvp dirent */
+ if((my.statop & ~(Owsize|Owname|Owmode|Owmtime)) != 0){
+ fprint(2, "invalid stat op %d\n", my.statop);
+ fail++;
+ }
+ break;
+ default:
+ fprint(2, "invalid message op %d\n", my.op);
+ fail++;
+ break;
+ }
+ if(hi && keycmp(&y, hi) > 0){
+ fprint(2, "out of range keys %P >= %P\n", &y, hi);
+ fail++;
+ }
+ if(keycmp(&mx, &my) == 1){
+ fprint(2, "misordered keys %P, %P\n", &x, &y);
+ fail++;
+ break;
+ }
+ mx = my;
+ }
+
+ }
return fail;
}
@@ -239,15 +291,18 @@
#define A(b) (b ? b->bp.addr : -1)
for(i = 0; i < np; i++){
print("\t[%d] ==>\n"
- "\t\t%s: b(%p)=%llx\n"
+ "\t\t%s: b(%p)=%llx [%s]\n"
"\t\tnl(%p)=%llx, nr(%p)=%llx\n"
- "\t\tidx=%d, midx=%d\n",
+ "\t\tidx=%d, midx=%d\n"
+ "\t\tpullsz=%d, npull=%d, \n"
+ "\t\tclear=(%d. %d)\n",
i, op[p[i].op],
- p[i].b, A(p[i].b),
+ p[i].b, A(p[i].b), (p[i].b == nil) ? "nil" : (p[i].b->type == Tleaf ? "leaf" : "pivot"),
p[i].nl, A(p[i].nl),
p[i].nr, A(p[i].nr),
- p[i].idx, p[i].midx);
- print("\t\tclear=(%d, %d):%d\n", p[i].lo, p[i].hi, p[i].sz);
+ p[i].idx, p[i].midx,
+ p[i].pullsz, p[i].npull,
+ p[i].lo, p[i].hi);
}
}
@@ -257,8 +312,6 @@
Arange *r;
int i;
- if(!debug)
- return;
print("=== %s\n", m);
for(i = 0; i < fs->narena; i++){
print("arena %d:\n", i);
--- a/cons.c
+++ b/cons.c
@@ -40,7 +40,7 @@
}
}else if(strcmp(arg[0], "check") == 0)
checkfs();
- else if(strcmp(arg[0], "dbg") && narg == 2)
+ else if(strcmp(arg[0], "dbg") == 0 && narg == 2)
debug = atoi(arg[1]);
else
fprint(fd, "unknown command %s\n", arg[0]);
--- a/dat.h
+++ b/dat.h
@@ -191,7 +191,7 @@
Onop,
Oinsert, /* new kvp */
Odelete, /* delete kvp */
- Odeletex, /* delete kvp if exists */
+ Oqdelete, /* delete kvp if exists */
Owstat, /* kvp dirent */
/* wstat flags */
Owsize = 1<<4,
@@ -335,6 +335,7 @@
struct Msg {
char op;
+ char statop;
Kvp;
};
@@ -378,7 +379,8 @@
struct Path {
/* Flowing down for flush */
- Blk *b; /* insertion */
+ Msg *ins; /* inserted values, bounded by lo..hi */
+ Blk *b; /* to shadow */
int idx; /* insert at */
int lo; /* key range */
int hi; /* key range */
@@ -390,6 +392,8 @@
Blk *nl; /* new left */
Blk *nr; /* new right, if we split or rotated */
int midx; /* modification index */
+ int npull; /* number of messages successfully pulled */
+ int pullsz; /* size of pulled messages */
};
struct Scanp {
@@ -401,7 +405,6 @@
struct Scan {
vlong offset; /* last read offset */
Tree root;
- Dir dir;
int done;
int overflow;
--- a/dump.c
+++ b/dump.c
@@ -51,7 +51,7 @@
n = fmtprint(fmt, "blk:%llx, hash:%llx", GBIT64(v->v), GBIT64(v->v+8));
break;
case Kent: /* pqid[8] name[n] => dir[n]: serialized Dir */
- switch(op&0xf){
+ switch(op){
case Onop:
case Oinsert:
if(kv2dir(v, &d) == -1)
@@ -63,6 +63,7 @@
break;
case Odelete:
n = fmtprint(fmt, "delete");
+ break;
case Owstat:
p = v->v;
if(op & Owmtime){
@@ -79,6 +80,7 @@
}
if(p != v->v + v->nv)
abort();
+ break;
}
break;
case Ksnap: /* name[n] => dent[16] ptr[16]: snapshot root */
@@ -110,6 +112,7 @@
char *opname[] = {
[Oinsert] "Oinsert",
[Odelete] "Odelete",
+ [Oqdelete] "Oqdelete",
[Owstat] "Owstat",
};
Msg *m;
@@ -116,10 +119,12 @@
int n;
m = va_arg(fmt->args, Msg*);
- n = fmtprint(fmt, "Msg(%s, ", opname[m->op&0xf]);
+ if(m == nil)
+ return fmtprint(fmt, "Msg{nil}");
+ n = fmtprint(fmt, "Msg(%s, ", opname[m->op]);
n += showkey(fmt, m);
n += fmtprint(fmt, ") => (");
- n += showval(fmt, m, m->op);
+ n += showval(fmt, m, m->statop);
n += fmtprint(fmt, ")");
return n;
}
@@ -131,6 +136,8 @@
int n;
kv = va_arg(fmt->args, Kvp*);
+ if(kv == nil)
+ return fmtprint(fmt, "Kvp{nil}");
n = fmtprint(fmt, "Kvp(");
n += showkey(fmt, kv);
n += fmtprint(fmt, ") => (");
@@ -149,6 +156,8 @@
int n;
k = va_arg(fmt->args, Key*);
+ if(k == nil)
+ return fmtprint(fmt, "Key{nil}");
n = fmtprint(fmt, "Key(");
n += showkey(fmt, k);
n += fmtprint(fmt, ")");
@@ -165,4 +174,13 @@
return fmtprint(fmt, "<Arange:nil>");
else
return fmtprint(fmt, "Arange(%lld+%lld)", r->off, r->len);
+}
+
+int
+Qconv(Fmt *fmt)
+{
+ Qid q;
+
+ q = va_arg(fmt->args, Qid);
+ return fmtprint(fmt, "(%llx %ld %d)", q.path, q.vers, q.type);
}
--- a/fns.h
+++ b/fns.h
@@ -5,6 +5,7 @@
#pragma varargck type "B" Bptr
#pragma varargck type "R" Arange*
#pragma varargck type "X" char*
+#pragma varargck type "Q" Qid
extern Gefs *fs;
extern int debug;
@@ -32,17 +33,19 @@
int loadarena(Arena*, vlong);
void loadfs(char*);
int sync(void);
-int loadlog(Arena *a);
+int loadlog(Arena*);
int endfs(void);
-int compresslog(Arena *a);
+int compresslog(Arena*);
+void setval(Blk*, int, Kvp*);
int btupsert(Tree*, Msg*, int);
-char *btlookup(Tree*, Key*, Kvp*, Blk**);
-char *btlookupat(Blk*, Key*, Kvp*, Blk**);
+char *btlookup(Tree*, Key*, Kvp*, char*, int);
+char *btlookupat(Blk*, Key*, Kvp*, char*, int);
char *btscan(Tree*, Scan*, char*, int);
char *btnext(Scan*, Kvp*, int*);
void btdone(Scan*);
+
void setflag(Blk *b, int);
int chkflag(Blk *b, int);
@@ -94,12 +97,12 @@
int Pconv(Fmt*);
int Rconv(Fmt*);
int Kconv(Fmt*);
+int Qconv(Fmt*);
/* scratch */
void setmsg(Blk *, int, Msg *);
void bufinsert(Blk *, Msg *);
void victim(Blk *b, Path *p);
-void blkinsert(Blk *b, Kvp *kv);
Chan *mkchan(int);
Fmsg *chrecv(Chan*);
--- a/fs.c
+++ b/fs.c
@@ -3,7 +3,6 @@
#include <fcall.h>
#include <avl.h>
#include <bio.h>
-#include <pool.h>
#include "dat.h"
#include "fns.h"
@@ -38,7 +37,7 @@
}
static char*
-fslookup(Fid *f, Key *k, Kvp *kv, Blk **bp, int lk)
+fslookup(Fid *f, Key *k, Kvp *kv, char *buf, int nbuf, int lk)
{
char *e;
Blk *b;
@@ -52,7 +51,7 @@
if(lk)
rlock(f->dent);
- e = btlookupat(b, k, kv, bp);
+ e = btlookupat(b, k, kv, buf, nbuf);
if(lk)
runlock(f->dent);
putblk(b);
@@ -71,7 +70,6 @@
lock(&fs->dtablk);
for(e = fs->dtab[h]; e != nil; e = e->next){
if(e->qid.path == d->qid.path && e->rootb == root){
- dprint("found %p [%K]\n", e, &e->Key);
ainc(&e->ref);
unlock(&fs->dtablk);
return e;
@@ -95,7 +93,6 @@
e->nk = ek - e->buf;
e->next = fs->dtab[h];
fs->dtab[h] = e;
- dprint("created %p [%K]\n", e, &e->Key);
unlock(&fs->dtablk);
return e;
@@ -135,7 +132,8 @@
for(i = 0; i < Nfidtab; i++)
for(f = fs->fidtab[i]; f != nil; f = f->next){
rlock(f->dent);
- fprint(fd, "\tfid[%d]: %d [refs=%ld, k=%K]\n", i, f->fid, f->dent->ref, &f->dent->Key);
+ fprint(fd, "\tfid[%d]: %d [refs=%ld, k=%K, qid=%Q]\n",
+ i, f->fid, f->dent->ref, &f->dent->Key, f->dent->qid);
runlock(f->dent);
}
unlock(&fs->fidtablk);
@@ -366,13 +364,12 @@
void
fsattach(Fmsg *m, int iounit)
{
- char *p, *ep, buf[128];
+ char *p, *ep, buf[Kvmax], kvbuf[Kvmax];
int err;
Dent *e;
Fcall r;
Kvp kv;
Key dk;
- Blk *b;
Fid f;
Dir d;
@@ -384,7 +381,7 @@
p = packstr(&err, p, ep, "");
dk.k = buf;
dk.nk = p - buf;
- if(btlookup(&fs->root, &dk, &kv, &b) != nil){
+ if(btlookup(&fs->root, &dk, &kv, kvbuf, sizeof(kvbuf)) != nil){
rerror(m, Efs);
return;
}
@@ -391,15 +388,12 @@
r.type = Rattach;
if(kv2dir(&kv, &d) == -1){
rerror(m, Efs);
- putblk(b);
return;
}
if((e = getdent(-1, -1, &d)) == nil){
rerror(m, Efs);
- putblk(b);
return;
}
- putblk(b);
/*
* A bit of a hack; we're duping a fid
@@ -429,13 +423,12 @@
void
fswalk(Fmsg *m)
{
- char *p, *e, *estr, kbuf[Maxent];
- int i, nwalk, err;
+ char *p, *e, *estr, kbuf[Maxent], kvbuf[Kvmax];
vlong up, prev;
+ int i, err;
Fid *o, *f;
Dent *dent;
Fcall r;
- Blk *b;
Kvp kv;
Key k;
Dir d;
@@ -450,11 +443,9 @@
}
err = 0;
estr = nil;
- nwalk = 0;
up = o->qpath;
prev = o->qpath;
r.type = Rwalk;
- r.nwqid = 0;
for(i = 0; i < m->nwname; i++){
up = prev;
p = kbuf;
@@ -468,22 +459,17 @@
}
k.k = kbuf;
k.nk = p - kbuf;
-//showfs("walking");
-//dprint("looking up %K\n", &k);
- if((estr = fslookup(o, &k, &kv, &b, 0)) != nil){
+ if((estr = fslookup(o, &k, &kv, kvbuf, sizeof(kvbuf), 0)) != nil){
break;
}
if(kv2dir(&kv, &d) == -1){
rerror(m, Efs);
- putblk(b);
return;
}
- nwalk = i;
prev = d.qid.path;
- putblk(b);
- r.wqid[r.nwqid] = d.qid;
- r.nwqid++;
+ r.wqid[i] = d.qid;
}
+ r.nwqid = i;
if(i == 0 && m->nwname != 0){
rerror(m, estr);
return;
@@ -496,8 +482,6 @@
}
}
if(i > 0){
- d.name = m->wname[nwalk];
- d.qid = m->wqid[nwalk];
dent = getdent(f->root.bp.addr, up, &d);
if(dent == nil){
if(m->fid != m->newfid)
@@ -516,11 +500,10 @@
void
fsstat(Fmsg *m)
{
- char *err, buf[STATMAX];
+ char *err, buf[STATMAX], kvbuf[Kvmax];
Fcall r;
Fid *f;
Kvp kv;
- Blk *b;
int n;
if((f = getfid(m->fid)) == nil){
@@ -527,8 +510,7 @@
rerror(m, "no such fid");
return;
}
- print("stat %K\n", &f->dent->Key);
- if((err = btlookup(&fs->root, f->dent, &kv, &b)) != nil){
+ if((err = btlookup(&fs->root, f->dent, &kv, kvbuf, sizeof(kvbuf))) != nil){
rerror(m, err);
return;
}
@@ -540,7 +522,6 @@
r.stat = (uchar*)buf;
r.nstat = n;
respond(m, &r);
- putblk(b);
}
void
@@ -615,6 +596,7 @@
d.gid = "glenda";
d.muid = "glenda";
mb.op = Oinsert;
+ mb.statop = 0;
if(dir2kv(f->qpath, &d, &mb, buf, sizeof(buf)) == -1){
rerror(m, "%r");
return;
@@ -693,11 +675,10 @@
void
fsopen(Fmsg *m)
{
+ char *e, buf[Kvmax];
Fcall r;
- char *e;
Dir d;
Fid *f;
- Blk *b;
Kvp kv;
if((f = getfid(m->fid)) == nil){
@@ -704,17 +685,16 @@
rerror(m, Efid);
return;
}
- if((e = fslookup(f, f->dent, &kv, &b, 0)) != nil){
+ if((e = fslookup(f, f->dent, &kv, buf, sizeof(buf), 0)) != nil){
rerror(m, e);
return;
}
if(kv2dir(&kv, &d) == -1){
rerror(m, Efs);
- putblk(b);
+ return;
}
if(fsaccess(&d, m->mode) == -1){
rerror(m, Eperm);
- putblk(b);
return;
}
wlock(f->dent);
@@ -723,7 +703,6 @@
r.type = Ropen;
r.qid = d.qid;
r.iounit = f->iounit;
- putblk(b);
lock(f);
if(f->mode != -1){
@@ -755,12 +734,12 @@
int n, ns, done;
Tree *t;
Scan *s;
+ Dir d;
s = f->scan;
if(s != nil && s->offset != 0 && s->offset != m->offset)
return Edscan;
if(s == nil || m->offset == 0){
- print("scan starting\n");
if((s = mallocz(sizeof(Scan), 1)) == nil)
return Enomem;
@@ -787,7 +766,8 @@
p = r->data;
n = m->count;
if(s->overflow){
- if((ns = convD2M(&s->dir, (uchar*)p, n)) <= BIT16SZ)
+ kv2dir(&s->kv, &d);
+ if((ns = convD2M(&d, (uchar*)p, n)) <= BIT16SZ)
return Edscan;
s->overflow = 0;
p += ns;
@@ -798,7 +778,8 @@
return e;
if(done)
break;
- if((ns = convD2M(&s->dir, (uchar*)p, n)) <= BIT16SZ){
+ kv2dir(&s->kv, &d);
+ if((ns = convD2M(&d, (uchar*)p, n)) <= BIT16SZ){
s->overflow = 1;
break;
}
@@ -812,7 +793,7 @@
int
readb(Fid *f, char *d, vlong o, vlong n, int sz)
{
- char *e, buf[17];
+ char *e, buf[17], kvbuf[17+32];
vlong fb, fo;
Bptr bp;
Blk *b;
@@ -831,22 +812,19 @@
PBIT64(k.k+1, f->qpath);
PBIT64(k.k+9, fb);
- e = fslookup(f, &k, &kv, &b, 0);
+ e = fslookup(f, &k, &kv, kvbuf, sizeof(kvbuf), 0);
if(e != nil && e != Eexist){
fprint(2, "!!! error: %s", e);
werrstr(e);
return -1;
}
- fprint(2, "\treadb: key=%K, val=%P\n", &k, &kv);
- bp = unpackbp(kv.v);
- putblk(b);
+ bp = unpackbp(kv.v);
if((b = getblk(bp, GBraw)) == nil)
return -1;
if(fo+n > Blksz)
n = Blksz-fo;
if(b != nil){
- fprint(2, "\tcopying %lld to resp %p\n", n, d);
memcpy(d, b->buf+fo, n);
putblk(b);
}else
@@ -907,7 +885,6 @@
rerror(m, Efid);
return;
}
- fprint(2, "\n");
r.type = Rread;
r.count = 0;
if((r.data = malloc(m->count)) == nil){
@@ -914,7 +891,6 @@
rerror(m, Emem);
return;
}
- fprint(2, "\nread{{{{\n");
if(f->dent->qid.type & QTDIR)
e = fsreaddir(m, f, &r);
else
@@ -926,12 +902,12 @@
respond(m, &r);
free(r.data);
}
- fprint(2, "\n}}}read\n");
}
int
writeb(Fid *f, Msg *m, char *s, vlong o, vlong n, vlong sz)
{
+ char buf[Kvmax];
vlong fb, fo;
Bptr bp;
Blk *b, *t;
@@ -949,16 +925,13 @@
if(b == nil)
return -1;
if(fb < sz && (fo != 0 || n != Blksz)){
- fprint(2, "\tappending to block %B\n", b->bp);
- if(fslookup(f, m, &kv, &t, 0) != nil){
- putblk(b);
+ dprint("\tappending to block %B\n", b->bp);
+ if(fslookup(f, m, &kv, buf, sizeof(buf), 0) != nil){
return -1;
}
bp = unpackbp(kv.v);
- putblk(t);
if((t = getblk(bp, GBraw)) == nil){
- putblk(b);
return -1;
}
memcpy(b->buf, t->buf, Blksz);
@@ -974,8 +947,6 @@
assert(b->flag & Bfinal);
packbp(m->v, &b->bp);
putblk(b);
- checkfs();
- poolcheck(mainmem);
return n;
}
@@ -1023,12 +994,13 @@
}
kv[i].op = Owstat;
+ kv[i].statop = 0;
kv[i].k = f->dent->k;
kv[i].nk = f->dent->nk;
kv[i].v = sbuf;
kv[i].nv = 0;
if(m->offset+m->count > f->dent->length){
- kv[i].op |= Owsize;
+ kv[i].statop = Owsize;
kv[i].nv += 8;
PBIT64(kv[i].v, m->offset+m->count);
f->dent->length = m->offset+m->count;
--- a/main.c
+++ b/main.c
@@ -111,6 +111,7 @@
fmtinstall('K', Kconv);
fmtinstall('R', Rconv);
fmtinstall('F', fcallfmt);
+ fmtinstall('Q', Qconv);
if(ream){
reamfs(argv[0]);
exits(nil);
--- a/pack.c
+++ b/pack.c
@@ -242,7 +242,6 @@
if(kv2dir(kv, &d) == -1)
return -1;
-print("\tpackname: %s\n", d.name);
dprint("have %d bytes to pack into\n", nbuf);
if((n = convD2M(&d, (uchar*)buf, nbuf)) <= BIT16SZ){
fprint(2, "here...failed convert??, needed %d\n", GBIT16(buf));
--- a/ream.c
+++ b/ream.c
@@ -15,6 +15,7 @@
Kvp kv;
Dir d;
+ /* nb: values must be inserted in key order */
memset(&d, 0, sizeof(Dir));
d.qid = (Qid){fs->nextqid++, 0, QTDIR};
d.mode = 0755;
@@ -27,7 +28,7 @@
d.muid = "glenda";
if(dir2kv(-1, &d, &kv, buf, sizeof(buf)) == -1)
sysfatal("ream: pack root: %r");
- blkinsert(r, &kv);
+ setval(r, 0, &kv);
kv.k = buf;
kv.nk = 9;
@@ -34,9 +35,9 @@
kv.v = buf+9;
kv.nv = 8;
buf[0] = Ksuper;
- PBIT64(buf+1, 0);
- PBIT64(buf+9, 0);
- blkinsert(r, &kv);
+ PBIT64(buf+1, 0ULL);
+ PBIT64(buf+9, 0ULL);
+ setval(r, 1, &kv);
}
static void
--- a/tree.c
+++ b/tree.c
@@ -37,6 +37,7 @@
cpmsg(Msg *dst, Msg *src, char *buf, int nbuf)
{
dst->op = src->op;
+ dst->statop = src->statop;
cpkvp(dst, src, buf, nbuf);
}
@@ -59,7 +60,8 @@
int
msgsz(Msg *m)
{
- return 1 + 2 + m->nk + 2 + m->nv;
+ /* disp + op + klen + key + vlen + v */
+ return 2+1+2+m->nk +2+ m->nv;
}
int
@@ -66,9 +68,9 @@
valsz(Kvp *kv)
{
if(kv->type == Vref)
- return 2+kv->nk + Ptrsz + Fillsz;
+ return 2 + 2+kv->nk + Ptrsz + Fillsz;
else
- return 2+kv->nk + 2+kv->nv;
+ return 2 + 2+kv->nk + 2+kv->nv;
}
void
@@ -93,42 +95,21 @@
}
}
-static void
-setval(Blk *b, int i, Kvp *kv, int replace)
+void
+setval(Blk *b, int i, Kvp *kv)
{
- int o, nk, nv, ksz, vsz, spc;
+ int o, nk, nv, spc;
char *p;
- assert(i >= 0 && i <= b->nval);
+ assert(i == b->nval);
spc = (b->type == Tleaf) ? Leafspc : Pivspc;
p = b->data + 2*i;
nk = 2 + kv->nk;
nv = (kv->type == Vref) ? Ptrsz+Fillsz : 2 + kv->nv;
- if (i < 0)
- i = 0;
- if(!replace || b->nval == i){
- memmove(p + 2, p, 2*(b->nval - i));
- b->nval++;
- b->valsz += nk + nv;
- o = spc - b->valsz;
- }else{
- /*
- * If we can't put it where it was before,
- * we need to allocate new space for the
- * key-value data. It'll get compacted when
- * we split or merge.
- */
- o = GBIT16(b->data + 2*i);
- ksz = 2 + GBIT16(b->data + o);
- if(b->type == Tleaf)
- vsz = 2 + GBIT16(b->data + o + ksz);
- else
- vsz = 16;
- if(ksz + vsz < nk + nv){
- b->valsz += nk + nv;
- o = spc - b->valsz;
- }
- }
+ memmove(p + 2, p, 2*(b->nval - i));
+ b->nval++;
+ b->valsz += nk + nv;
+ o = spc - b->valsz;
if(2*b->nval + b->valsz > spc){
dprint("2*%d + %d > %d [ksz: %d, vsz: %d]\n",
@@ -154,18 +135,6 @@
}
}
-static int
-delval(Blk *b, int i)
-{
- char *p;
-
- assert(i >= 0 && i <= b->nval);
- b->nval--;
- p = b->data + 2*i;
- memmove(p, p + 2, 2*(b->nval - i));
- return 0;
-}
-
void
setmsg(Blk *b, int i, Msg *m)
{
@@ -175,8 +144,9 @@
assert(b->type == Tpivot);
assert(i >= 0 && i <= b->nbuf);
b->nbuf++;
- b->bufsz += msgsz(m);
+ b->bufsz += msgsz(m)-2;
assert(2*b->nbuf + b->bufsz <= Bufspc);
+ assert(m->op >= 0 && m->op <= Owstat);
p = b->data + Pivspc;
o = Pivspc - b->bufsz;
@@ -185,6 +155,8 @@
p = b->data + Bufspc + o;
*p = m->op;
+ if(m->op == Owstat)
+ *p |= m->statop;
PBIT16(p + 1, m->nk);
memcpy(p + 3, m->k, m->nk);
PBIT16(p + 3 + m->nk, m->nv);
@@ -200,10 +172,10 @@
assert(b->type == Tpivot);
assert(i >= 0 && i < b->nbuf);
o = GBIT16(b->data + Pivspc + 2*i);
-
p = b->data + Pivspc + o;
m->type = Vinl;
- m->op = *p;
+ m->op = (*p & 0x0f);
+ m->statop = (*p & 0xf0);
m->nk = GBIT16(p + 1);
m->k = p + 3;
m->nv = GBIT16(p + 3 + m->nk);
@@ -210,51 +182,6 @@
m->v = p + 5 + m->nk;
}
-void
-blkinsert(Blk *b, Kvp *kv)
-{
- int lo, hi, mid, r;
- Kvp cmp;
-
- r = -1;
- lo = 0;
- hi = b->nval;
- while(lo < hi){
- mid = (hi + lo) / 2;
- getval(b, mid, &cmp);
- r = keycmp(kv, &cmp);
- if(r <= 0)
- hi = mid;
- else
- lo = mid + 1;
- }
- if(lo < b->nval){
- getval(b, lo, &cmp);
- r = keycmp(kv, &cmp);
- }
- setval(b, lo, kv, (r == 0));
-}
-
-void
-bufinsert(Blk *b, Msg *m)
-{
- int lo, hi, mid, r;
- Msg cmp;
-
- lo = 0;
- hi = b->nbuf;
- while(lo < hi){
- mid = (hi + lo) / 2;
- getmsg(b, mid, &cmp);
- r = keycmp(m, &cmp);
- if(r < 0)
- hi = mid;
- else
- lo = mid + 1;
- }
- setmsg(b, lo, m);
-}
-
static int
bufsearch(Blk *b, Key *k, Msg *m, int *same)
{
@@ -283,28 +210,6 @@
}
int
-blkdelete(Blk *b, Kvp *kv)
-{
- int lo, hi, mid, r;
- Kvp cmp;
-
- lo = 0;
- hi = b->nval - 1;
- while(lo <= hi){
- mid = (hi + lo) / 2;
- getval(b, mid, &cmp);
- r = keycmp(kv, &cmp);
- if(r == 0)
- delval(b, mid);
- if(r <= 0)
- hi = mid - 1;
- else
- lo = mid + 1;
- }
- return -1;
-}
-
-int
blksearch(Blk *b, Key *k, Kvp *rp, int *same)
{
int lo, hi, mid, r;
@@ -333,6 +238,13 @@
}
int
+buffill(Blk *b)
+{
+ assert(b->type == Tpivot);
+ return 2*b->nbuf + b->bufsz;
+}
+
+int
filledbuf(Blk *b, int nmsg, int needed)
{
assert(b->type == Tpivot);
@@ -339,7 +251,6 @@
return 2*(b->nbuf+nmsg) + b->bufsz + needed > Bufspc;
}
-
int
filledleaf(Blk *b, int needed)
{
@@ -363,6 +274,7 @@
copyup(Blk *n, int i, Path *pp, int *nbytes)
{
Kvp kv;
+ Msg m;
/*
* It's possible for the previous node to have
@@ -372,23 +284,31 @@
*/
if(pp->nl->nval > 0){
getval(pp->nl, 0, &kv);
+ if(pp->nl->nbuf > 0){
+ getmsg(pp->nl, 0, &m);
+ if(keycmp(&kv, &m) > 0)
+ kv.Key = m.Key;
+ }
kv.type = Vref;
kv.bp = pp->nl->bp;
kv.fill = blkfill(pp->nl);
- setval(n, i++, &kv, 1);
+ setval(n, i++, &kv);
if(nbytes != nil)
- *nbytes += 2 + valsz(&kv);
+ *nbytes += valsz(&kv);
}
- if(pp->nr != nil){
- if(pp->nr->nval > 0){
- getval(pp->nr, 0, &kv);
- kv.type = Vref;
- kv.bp = pp->nr->bp;
- kv.fill = blkfill(pp->nr);
- setval(n, i++, &kv, 0);
- if(nbytes != nil)
- *nbytes += 2 + valsz(&kv);
+ if(pp->nr != nil && pp->nr->nval > 0){
+ getval(pp->nr, 0, &kv);
+ if(pp->nr->nbuf > 0){
+ getmsg(pp->nr, 0, &m);
+ if(keycmp(&kv, &m) > 0)
+ kv.Key = m.Key;
}
+ kv.type = Vref;
+ kv.bp = pp->nr->bp;
+ kv.fill = blkfill(pp->nr);
+ setval(n, i++, &kv);
+ if(nbytes != nil)
+ *nbytes += valsz(&kv);
}
return i;
}
@@ -403,22 +323,22 @@
/* bump version */
v = GBIT32(kv->v+8);
PBIT32(kv->v+8, v+1);
- if(m->op & Owmtime){
+ if(m->statop & Owmtime){
v = GBIT64(p);
p += 8;
PBIT32(kv->v+25, v);
}
- if(m->op & Owsize){
+ if(m->statop & Owsize){
v = GBIT64(p);
p += 8;
PBIT64(kv->v+33, v);
}
- if(m->op & Owmode){
+ if(m->statop & Owmode){
v = GBIT32(p);
p += 4;
PBIT32(kv->v+33, v);
}
- if(m->op & Owname){
+ if(m->statop & Owname){
fprint(2, "renames not yet supported\n");
abort();
}
@@ -426,6 +346,39 @@
fprint(2, "malformed wstat message");
}
+int
+apply(Kvp *r, Msg *m, char *buf, int nbuf)
+{
+ switch(m->op){
+ case Odelete:
+ return 0;
+ case Oinsert:
+ cpkvp(r, m, buf, nbuf);
+ return 1;
+ case Owstat:
+ statupdate(r, m);
+ return 1;
+ }
+ abort();
+ return 0;
+}
+
+int
+pullmsg(Path *p, int i, Kvp *v, Msg *m, int *full, int spc)
+{
+ if(i < 0 || i >= p->hi || *full)
+ return -1;
+
+ if(p->ins != nil)
+ *m = p->ins[i];
+ else
+ getmsg(p->b, i, m);
+ if(msgsz(m) <= spc)
+ return (v == nil) ? 0 : keycmp(v, m);
+ *full = 1;
+ return -1;
+}
+
/*
* Creates a new block with the contents of the old
* block. When copying the contents, it repacks them
@@ -438,10 +391,10 @@
updateleaf(Path *up, Path *p)
{
char buf[Msgmax];
- int i, j, o, ok, slack;
+ int i, j, o, ok, full, spc;
Blk *b, *n;
- Kvp v;
Msg m;
+ Kvp v;
i = 0;
o = 0;
@@ -448,7 +401,7 @@
j = up->lo;
b = p->b;
/*
- * slack is the amount of room we have
+ * spc is the amount of room we have
* to copy data down from the parent; it's
* necessarily a bit conservative, because
* deletion messages don't take space -- but
@@ -455,18 +408,16 @@
* we don't know how what the types of all
* messages are.
*/
- slack = Blkspc - blkfill(b);
+ full = 0;
+ spc = Blkspc - blkfill(b);
if((n = newblk(b->type)) == nil)
return -1;
- while(i < b->nval && j < up->hi){
+ while(i < b->nval){
+ ok = 1;
getval(p->b, i, &v);
- getmsg(up->b, j, &m);
- if(msgsz(&m) >= slack)
- break;
-
- switch(keycmp(&v, &m)){
+ switch(pullmsg(up, j, &v, &m, &full, spc)){
case -1:
- setval(n, o++, &v, 0);
+ setval(n, o++, &v);
i++;
break;
case 1:
@@ -474,48 +425,48 @@
* new values must always start as
* an insertion, mutations come after.
*/
- assert(m.op == Oinsert);
- setval(n, o++, &m, 0);
- slack -= valsz(&m);
- j++;
- break;
+ if(m.op != Oinsert){
+ print("%d(/%d), %d: %M not insert\n", i, b->nval, j, &m);
+ abort();
+ }
+ spc -= valsz(&m);
+ goto Copy;
case 0:
- ok = 1;
+ i++;
while(j < up->hi){
- switch(m.op){
- case Oinsert:
- cpkvp(&v, &m, buf, sizeof(buf));
- ok = 1;
- break;
- case Odelete:
- ok = 0;
- break;
- case Owstat:
- statupdate(&v, &m);
- break;
- }
+ Copy:
+ ok = apply(&v, &m, buf, sizeof(buf));
j++;
- if(j < up->hi){
- getmsg(up->b, j, &m);
- if(keycmp(&v, &m) != 0)
- break;
- print("\tnext: %P, %M\n", &v, &m);
- }
+ p->pullsz += msgsz(&m);
+ if(j >= up->hi || pullmsg(up, j, &v, &m, &full, spc) != 0)
+ break;
}
if(ok)
- setval(n, o++, &v, 0);
- i++;
+ setval(n, o++, &v);
break;
}
}
- while(i < b->nval){
- getval(p->b, i++, &v);
- setval(n, o++, &v, 0);
- }
while(j < up->hi) {
- getmsg(up->b, j++, &m);
- setval(n, o++, &m, 0);
+ /* can't fail */
+ pullmsg(up, j++, nil, &m, &full, spc);
+ ok = 1;
+ cpkvp(&v, &m, buf, sizeof(buf));
+ p->pullsz += msgsz(&m);
+ if(m.op != Oinsert){
+ print("%d(/%d), %d: %M not insert\n", i, b->nval, j, &m);
+ showblk(up->b, "parent", 0);
+ showblk(p->b, "current", 0);
+ abort();
+ }
+ while(pullmsg(up, j, &v, &m, &full, spc) == 0){
+ ok = apply(&v, &m, buf, sizeof(buf));
+ p->pullsz += msgsz(&m);
+ j++;
+ }
+ if(ok)
+ setval(n, o++, &v);
}
+ p->npull = (j - up->lo);
p->nl = n;
return 0;
}
@@ -529,50 +480,69 @@
* When pidx != -1,
*/
int
-updatepiv(Path *p, Path *pp)
+updatepiv(Path *up, Path *p, Path *pp)
{
- int i, j;
+ char buf[Msgmax];
+ int i, j, o, sz, full, spc;
Blk *b, *n;
- Msg m;
+ Msg m, u;
- j = 0;
+ o = 0;
b = p->b;
if((n = newblk(b->type)) == nil)
return -1;
for(i = 0; i < b->nval; i++){
- if(i == p->midx){
- if(pp->nl->nval > 0){
- getval(pp->nl, 0, &m);
- m.type = Vref;
- m.bp = pp->nl->bp;
- m.fill = blkfill(pp->nl);
- setval(n, j++, &m, 0);
- }
- if(pp->nr != nil && pp->nr->nval > 0){
- getval(pp->nr, 0, &m);
- m.type = Vref;
- m.bp = pp->nr->bp;
- m.fill = blkfill(pp->nr);
- setval(n, j++, &m, 0);
- }
+ if(pp != nil && i == p->midx){
+ o = copyup(n, o, pp, nil);
if(pp->op == POrot || pp->op == POmerge)
i++;
}else{
getval(b, i, &m);
- setval(n, j++, &m, 0);
+ setval(n, o++, &m);
}
}
+ o = 0;
i = 0;
- j = 0;
+ j = up->lo;
+ sz = 0;
+ full = 0;
+ spc = Bufspc - buffill(b);
+ if(pp != nil)
+ spc += pp->pullsz;
while(i < b->nbuf){
if(i == p->lo)
- i = p->hi;
+ i += pp->npull;
if(i == b->nbuf)
break;
- getmsg(b, i++, &m);
- setmsg(n, j++, &m);
+ getmsg(b, i, &m);
+ switch(pullmsg(up, j, &m, &u, &full, spc - sz)){
+ case -1:
+ case 0:
+ setmsg(n, o++, &m);
+ i++;
+ break;
+ case 1:
+ cpkvp(&m, &u, buf, sizeof(buf));
+ while(pullmsg(up, j, &m, &u, &full, spc) == 0){
+ setmsg(n, o++, &u);
+ sz = msgsz(&u);
+ p->pullsz += sz;
+ spc -= sz;
+ j++;
+ }
+ }
}
- n->nbuf = j;
+ while(j < up->hi){
+ pullmsg(up, j, nil, &u, &full, spc);
+ if(full)
+ break;
+ setmsg(n, o++, &u);
+ sz = msgsz(&u);
+ p->pullsz += sz;
+ spc -= sz;
+ j++;
+ }
+ p->npull = (j - up->lo);
p->nl = n;
return 0;
}
@@ -583,11 +553,14 @@
* grow the total height of the
*/
int
-splitleaf(Path *p, Kvp *mid)
+splitleaf(Path *up, Path *p, Kvp *mid)
{
- int i, j, copied, halfsz;
- Blk *b, *l, *r;
- Kvp t;
+ char buf[Msgmax];
+ int full, copied, spc, ok, halfsz;
+ int i, j, o, c;
+ Blk *b, *d, *l, *r;
+ Msg m;
+ Kvp v;
/*
* If the block one entry up the
@@ -603,11 +576,18 @@
return -1;
}
- j = 0;
+ d = l;
+ o = 0;
+ i = 0;
+ j = up->lo;
+ full = 0;
copied = 0;
- halfsz = (2*b->nval + b->valsz)/2;
+ halfsz = (2*b->nval + b->valsz + up->sz) / 2;
+ if(halfsz > Blkspc/2)
+ halfsz = Blkspc/2;
+ spc = Blkspc - (halfsz + Msgmax);
assert(b->nval >= 4);
- for(i = 0; i < b->nval; i++){
+ while(i < b->nval){
/*
* We're trying to balance size,
* but we need at least 2 nodes
@@ -614,21 +594,50 @@
* in each half of the split if
* we want a valid tree.
*/
- if(i == b->nval-2)
+ if(d == l)
+ if((i == b->nval-2) || (i >= 2 && copied >= halfsz)){
+ o = 0;
+ d = r;
+ full = 0;
+ spc = Blkspc - (halfsz + Msgmax);
+ getval(b, i, mid);
+ }
+ ok = 1;
+ getval(b, i, &v);
+ c = pullmsg(up, j, &v, &m, &full, spc);
+ switch(c){
+ case -1:
+ setval(d, o++, &v);
+ copied += valsz(&v);
+ i++;
break;
- if(i >= 2 && copied >= halfsz)
+ case 1:
+ /*
+ * new values must always start as
+ * an insertion, mutations come after.
+ */
+ if(m.op != Oinsert){
+ print("%d(/%d), %d: %M not insert\n", i, b->nval, j, &m);
+ abort();
+ }
+ spc -= valsz(&m);
+ goto Copy;
+ case 0:
+ i++;
+ while(j < up->hi){
+ Copy:
+ ok = apply(&v, &m, buf, sizeof(buf));
+ p->pullsz += msgsz(&m);
+ j++;
+ if(j == up->hi || pullmsg(up, j, &v, &m, &full, spc) != 0)
+ break;
+ }
+ if(ok)
+ setval(d, o++, &v);
break;
-
- getval(b, i, &t);
- setval(l, j++, &t, 0);
- copied += 2 + valsz(&t);
+ }
}
- j = 0;
- getval(b, i, mid);
- for(; i < b->nval; i++){
- getval(b, i, &t);
- setval(r, j++, &t, 0);
- }
+ p->npull = (j - up->lo);
p->op = POsplit;
p->nl = l;
p->nr = r;
@@ -639,13 +648,14 @@
/*
* Splits a node, returning the block that msg
* would be inserted into. Split must never
- * grow the total height of the
+ * grow the total height of the tree by more
+ * than one.
*/
int
-splitpiv(Path *p, Path *pp, Kvp *mid)
+splitpiv(Path *, Path *p, Path *pp, Kvp *mid)
{
- int i, j, copied, halfsz;
- Blk *b, *l, *r;
+ int i, o, copied, halfsz;
+ Blk *b, *d, *l, *r;
Kvp t;
Msg m;
@@ -662,7 +672,8 @@
freeblk(r);
return -1;
}
- j = 0;
+ o = 0;
+ d = l;
copied = 0;
halfsz = (2*b->nval + b->valsz)/2;
assert(b->nval >= 4);
@@ -673,48 +684,34 @@
* in each half of the split if
* we want a valid tree.
*/
- if(i == b->nval-2)
- break;
- if(i >= 2 && copied >= halfsz)
- break;
-
- if(i == p->idx){
- j = copyup(l, j, pp, &copied);
- continue;
+ if(d == l)
+ if((i == b->nval-2) || (i >= 2 && copied >= halfsz)){
+ o = 0;
+ d = r;
+ getval(b, i, mid);
}
- getval(b, i, &t);
- setval(l, j++, &t, 0);
- copied += 2 + valsz(&t);
- }
- j = 0;
- getval(b, i, mid);
- for(; i < b->nval; i++){
if(i == p->idx){
- j = copyup(r, j, pp, nil);
+ o = copyup(d, o, pp, &copied);
continue;
}
getval(b, i, &t);
- setval(r, j++, &t, 0);
+ setval(d, o++, &t);
+ copied += valsz(&t);
}
- i = 0;
- for(j = 0; i < b->nbuf; i++, j++){
+ o = 0;
+ d = l;
+ for(i = 0; i < b->nbuf; i++){
if(i == p->lo)
- i = p->hi;
+ i += pp->npull;
if(i == b->nbuf)
break;
getmsg(b, i, &m);
- if(keycmp(&m, mid) >= 0)
- break;
- setmsg(l, j, &m);
+ if(d == l && keycmp(&m, mid) >= 0){
+ o = 0;
+ d = r;
+ }
+ setmsg(d, o++, &m);
}
- for(j = 0; i < b->nbuf; i++, j++){
- if(i == p->lo)
- i = p->hi;
- if(i == b->nbuf)
- break;
- getmsg(b, i, &m);
- setmsg(r, j, &m);
- }
p->op = POsplit;
p->nl = l;
p->nr = r;
@@ -723,34 +720,6 @@
}
int
-apply(Blk *b, Msg *m)
-{
- int idx, same;
- Kvp kv;
-
- assert(b->type == Tleaf);
- switch(m->op&0xf){
- case Oinsert:
- blkinsert(b, m);
- break;
- case Odelete:
- blkdelete(b, m);
- break;
- case Owstat:
- idx = blksearch(b, m, &kv, &same);
- if(idx == -1 || !same){
- fprint(2, "missing keyval %P [idx=%d, same=%d]\n", &kv, idx, same);
- abort();
- }
- statupdate(&kv, m);
- break;
- default:
- abort();
- }
- return 0;
-}
-
-int
merge(Path *p, Path *pp, int idx, Blk *a, Blk *b)
{
int i, o;
@@ -757,19 +726,16 @@
Msg m;
Blk *d;
-print("!!!! merge\n");
-showblk(a, "premerge left", 0);
-showblk(b, "premerge right", 0);
if((d = newblk(a->type)) == nil)
return -1;
o = 0;
for(i = 0; i < a->nval; i++){
getval(a, i, &m);
- setval(d, o++, &m, 0);
+ setval(d, o++, &m);
}
for(i = 0; i < b->nval; i++){
getval(b, i, &m);
- setval(d, o++, &m, 0);
+ setval(d, o++, &m);
}
if(a->type == Tpivot){
o = 0;
@@ -783,10 +749,9 @@
}
}
enqueue(d);
-showblk(d, "postmerge", 0);
p->midx = idx;
- pp->op = POmerge;
pp->nl = d;
+ pp->op = POmerge;
pp->nr = nil;
return 0;
}
@@ -803,15 +768,13 @@
Msg n;
used = 2*d->nbuf + d->bufsz;
- for(i = 0; i < b->nbuf; i++){
+ for(i = *idx; i < b->nbuf; i++){
getmsg(b, i, &n);
- print("check %P before %P: %d\n", &n.Kvp, &m->Kvp, keycmp(&n, m));
if(keycmp(m, &n) <= 0){
*idx = i + o;
return 0;
}
- print("would copy %P before %P: %d\n", &n.Kvp, &m->Kvp, keycmp(&n, m));
- used += 2 + msgsz(&n);
+ used += msgsz(&n);
if(used > Bufspc)
return 1;
}
@@ -844,9 +807,6 @@
Blk *d, *l, *r;
Msg m;
-showfs(2, "!!!! rotate\n");
-showblk(a, "prerotate left", 0);
-showblk(b, "prerotate right", 0);
l = newblk(a->type);
r = newblk(a->type);
if(l == nil || r == nil){
@@ -861,13 +821,12 @@
idx = 0;
for(i = 0; i < a->nval; i++){
getval(a, i, &m);
-
if(d == l && (cp >= halfpiv || spillsbuf(d, a, b, &m, &idx))){
sp = idx;
d = r;
o = 0;
}
- setval(d, o++, &m, 0);
+ setval(d, o++, &m);
cp += valsz(&m);
}
for(i = 0; i < b->nval; i++){
@@ -877,7 +836,7 @@
d = r;
o = 0;
}
- setval(d, o++, &m, 0);
+ setval(d, o++, &m);
cp += valsz(&m);
}
if(a->type == Tpivot){
@@ -902,8 +861,6 @@
}
enqueue(l);
enqueue(r);
-showblk(l, "postrotate left", 0);
-showblk(r, "postrotate right", 0);
p->midx = midx;
pp->op = POrot;
pp->nl = l;
@@ -931,16 +888,12 @@
if(imbalance < 0)
imbalance *= -1;
/* works for leaf, because 0 always < Bufspc */
- if(na + nb < Pivspc && ma + mb < Bufspc){
-print("!!!!!!! merging needed\n");
+ if(na + nb < (Pivspc - 4*Msgmax) && ma + mb < Bufspc)
return merge(p, pp, idx, a, b);
- }else if(imbalance > 4*Msgmax){
-print("!!!!!!! rotating needed\n");
+ else if(imbalance > 4*Msgmax)
return rotate(p, pp, idx, a, b, (na + nb)/2);
- }else{
-print("!!!!!!! no balancing needed\n");
+ else
return 0;
- }
}
int
@@ -955,6 +908,8 @@
ret = -1;
if(p->idx == -1 || pp == nil || pp->nl == nil)
return 0;
+ if(pp->op != POmod || pp->op != POmerge)
+ return 0;
m = refblk(pp->nl);
if(idx-1 >= 0){
@@ -961,10 +916,10 @@
getval(p->b, idx-1, &kl);
if(kl.fill + blkfill(m) < Blkspc){
if((l = getblk(kl.bp, 0)) == nil)
- goto out;
+ goto Out;
if(rotmerge(p, pp, idx-1, l, m) == -1)
- goto out;
- goto done;
+ goto Out;
+ goto Done;
}
}
if(idx+1 < p->b->nval){
@@ -971,15 +926,15 @@
getval(p->b, idx+1, &kr);
if(kr.fill + blkfill(m) < Blkspc){
if((r = getblk(kr.bp, 0)) == nil)
- goto out;
+ goto Out;
if(rotmerge(p, pp, idx, m, r) == -1)
- goto out;
- goto done;
+ goto Out;
+ goto Done;
}
}
-done:
+Done:
ret = 0;
-out:
+Out:
putblk(m);
putblk(l);
putblk(r);
@@ -986,31 +941,12 @@
return ret;
}
-int
-insertmsg(Blk *rb, Msg *msg, int nmsg, int sz)
-{
- int i;
-
- if(rb->type == Tleaf && !filledleaf(rb, sz))
- for(i = 0; i < nmsg; i++)
- apply(rb, &msg[i]);
- else if(rb->type == Tpivot && !filledbuf(rb, nmsg, sz))
- for(i = 0; i < nmsg; i++)
- bufinsert(rb, &msg[i]);
- else
- return 1;
- return 0;
-}
-
static Blk*
-flush(Path *path, int npath, Msg *msg, int nmsg, int *redo)
+flush(Path *path, int npath, int *redo)
{
- Path *p, *pp;
- Blk *b, *r;
+ Path *up, *p, *pp, *rp;
Kvp mid;
- Msg m;
- int i;
/*
* The path must contain at minimum two elements:
@@ -1019,94 +955,66 @@
* we put the new root into if the root gets split.
*/
assert(npath >= 2);
- r = nil;
+ rp = nil;
pp = nil;
p = &path[npath - 1];
- *redo = 1;
+ up = &path[npath - 2];
+ *redo = 0;
if(p->b->type == Tleaf){
- if(!filledleaf(p->b, p[-1].sz)){
+ if(!filledleaf(p->b, up->sz)){
if(updateleaf(p-1, p) == -1)
- goto error;
- if(p == &path[1]){
- r = p->nl;
- *redo = insertmsg(r, msg, nmsg, path[0].sz);
- }
+ goto Error;
enqueue(p->nl);
+ rp = p;
}else{
- if(splitleaf(p, &mid) == -1)
- goto error;
- b = p->nl;
- for(i = p[-1].lo; i < p[-1].hi; i++){
- getmsg(p[-1].b, i, &m);
- if(keycmp(&m, &mid) >= 0)
- b = p->nr;
- if(filledleaf(b, msgsz(&m)))
- continue;
- if(apply(b, &m) == -1)
- goto error;
- }
+
+ if(splitleaf(up, p, &mid) == -1)
+ goto Error;
enqueue(p->nl);
enqueue(p->nr);
}
- assert(p->nl || (p->nl && p->nr));
p->midx = -1;
pp = p;
+ up--;
p--;
}
- for(; p > path; p--){
+ while(p != path){
if(!filledpiv(p->b, 1)){
if(trybalance(p, pp, p->idx) == -1)
- goto error;
+ goto Error;
/* If we merged the root node, break out. */
- if(p == &path[1] && p->nl != nil && p->nr == nil && p->nl->nval == 2){
- *redo = insertmsg(p[1].nl, msg, nmsg, path[0].sz);
- freeblk(p[0].nl);
- putblk(p[0].nl);
- p[0].nl = nil;
- r = p[1].nl;
- break;
+ if(up == path && pp != nil && pp->op == POmerge && p->b->nval == 2){
+ rp = pp;
+ goto Out;
}
- if(updatepiv(p, pp) == -1)
- goto error;
- for(i = p[-1].lo; i < p[-1].hi; i++){
- getmsg(p[-1].b, i, &m);
- if(filledbuf(p->nl, 1, msgsz(&m)))
- break;
- bufinsert(p->nl, &m);
- }
- if(p == &path[1] && !filledbuf(p->nl, nmsg, path[0].sz)){
- r = p->nl;
- *redo = insertmsg(r, msg, nmsg, path[0].sz);
- }
+ if(updatepiv(up, p, pp) == -1)
+ goto Error;
enqueue(p->nl);
+ rp = p;
}else{
- dprint("-- split\n");
- if(splitpiv(p, pp, &mid) == -1)
- goto error;
- b = p->nl;
- for(i = p[-1].lo; i < p[-1].hi; i++){
- getmsg(p[-1].b, i, &m);
- if(keycmp(&m, &mid) >= 0)
- b = p->nr;
- if(filledbuf(b, 1, msgsz(&m)))
- continue;
- bufinsert(b, &m);
- }
+ if(splitpiv(up, p, pp, &mid) == -1)
+ goto Error;
enqueue(p->nl);
enqueue(p->nr);
}
pp = p;
+ up--;
+ p--;
}
- if(path[1].nl != nil && path[1].nr != nil){
- if((r = newblk(Tpivot)) == nil)
- goto error;
- path[0].nl = r;
- *redo = insertmsg(r, msg, nmsg, path[0].sz);
- copyup(path[0].nl, 0, &path[1], nil);
- enqueue(r);
+ if(pp->nl != nil && pp->nr != nil){
+ rp = &path[0];
+ rp->nl = newblk(Tpivot);
+ if(rp->nl == nil)
+ goto Error;
+ rp->npull = pp->npull;
+ rp->pullsz = pp->pullsz;
+ copyup(rp->nl, 0, pp, nil);
+ enqueue(rp->nl);
}
- return r;
-error:
+Out:
+ *redo = (rp->npull != (path[0].hi - path[0].lo));
+ return rp->nl;
+Error:
return nil;
}
@@ -1155,7 +1063,7 @@
if(i < b->nval && keycmp(&m, &kv) >= 0)
break;
/* 2 bytes for offset, plus message size in buffer */
- cursz += 2 + msgsz(&m);
+ cursz += msgsz(&m);
}
if(cursz > maxsz){
maxsz = cursz;
@@ -1165,11 +1073,19 @@
p->sz = maxsz;
p->idx = i - 1;
p->midx = i - 1;
+ p->npull = 0;
+ p->pullsz = 0;
}
}
}
int
+msgcmp(void *a, void *b)
+{
+ return keycmp((Msg*)a, (Msg*)b);
+}
+
+int
btupsert(Tree *t, Msg *msg, int nmsg)
{
int i, npath, redo, dh, sz, height;
@@ -1178,6 +1094,7 @@
Kvp sep;
sz = 0;
+ qsort(msg, nmsg, sizeof(Msg), msgcmp);
for(i = 0; i < nmsg; i++){
if(msg[i].nk + 2 > Keymax){
werrstr("overlong key");
@@ -1186,7 +1103,7 @@
sz += msgsz(&msg[i]);
}
-again:
+Again:
if((b = getroot(t, &height)) == nil){
werrstr("get root: %r");
return -1;
@@ -1207,6 +1124,9 @@
npath++;
path[0].sz = sz;
+ path[0].ins = msg;
+ path[0].lo = 0;
+ path[0].hi = nmsg;
while(b->type == Tpivot){
if(!filledbuf(b, nmsg, path[npath - 1].sz))
break;
@@ -1214,22 +1134,22 @@
getval(b, path[npath].idx, &sep);
b = getblk(sep.bp, 0);
if(b == nil)
- goto error;
+ goto Error;
npath++;
}
path[npath].b = b;
path[npath].idx = -1;
path[npath].midx = -1;
- path[npath].lo = 0;
- path[npath].hi = 0;
+ path[npath].lo = -1;
+ path[npath].hi = -1;
+ path[npath].npull = 0;
+ path[npath].pullsz = 0;
npath++;
dh = -1;
- rb = flush(path, npath, msg, nmsg, &redo);
- if(redo)
- goto again;
+ rb = flush(path, npath, &redo);
if(rb == nil)
- goto error;
+ goto Error;
if(path[0].nl != nil)
dh = 1;
@@ -1247,7 +1167,6 @@
t->bp = rb->bp;
fs->nextgen++;
unlock(&t->lk);
-
freepath(path, npath);
free(path);
if(!checkfs()){
@@ -1255,11 +1174,11 @@
showpath(path, npath);
abort();
}
- if(redo)
- goto again;
snapshot();
+ if(redo)
+ goto Again;
return 0;
-error:
+Error:
freepath(path, npath);
free(path);
return -1;
@@ -1280,7 +1199,7 @@
}
static char*
-collect(Blk *b, Key *k, Kvp *r, int *done)
+collect(Blk *b, Key *k, Kvp *r, char *buf, int nbuf, int *done)
{
int i, idx, same;
char *err;
@@ -1295,9 +1214,9 @@
getmsg(b, i, &m);
if(keycmp(&m, k) != 0)
break;
- switch(m.op&0xf){
+ switch(m.op){
case Oinsert:
- *r = m.Kvp;
+ cpkvp(r, &m, buf, nbuf);
*done = 1;
err = nil;
break;
@@ -1306,6 +1225,7 @@
err = Eexist;
break;
case Owstat:
+// statupdate(r, &m);
break;
default:
return Efs;
@@ -1315,22 +1235,19 @@
}
char*
-btlookupat(Blk *b0, Key *k, Kvp *r, Blk **bp)
+btlookupat(Blk *b0, Key *k, Kvp *r, char *buf, int nbuf)
{
int idx, same, done, r0;
- char *ret;
+ char *ret, *err;
Blk *b, *c;
- *bp = nil;
r0 = b0->ref;
b = refblk(b0);
assert(k != r);
while(b->type == Tpivot){
- ret = collect(b, k, r, &done);
- if(done){
- *bp = b;
+ ret = collect(b, k, r, buf, nbuf, &done);
+ if(done)
return ret;
- }
idx = blksearch(b, k, r, nil);
if(idx == -1){
assert(b0->ref == r0 + (b == b0) ? 1 : 0);
@@ -1343,27 +1260,25 @@
b = c;
}
assert(b->type == Tleaf);
+ err = Eexist;
blksearch(b, k, r, &same);
- if(!same){
- assert(b0->ref == r0 + (b == b0) ? 1 : 0);
- putblk(b);
- return Eexist;
+ if(same){
+ cpkvp(r, r, buf, nbuf);
+ err = nil;
}
- assert(b0->ref == r0 + (b == b0) ? 1 : 0);
- *bp = b;
- return nil;
+ putblk(b);
+ return err;
}
char*
-btlookup(Tree *t, Key *k, Kvp *r, Blk **bp)
+btlookup(Tree *t, Key *k, Kvp *r, char *buf, int nbuf)
{
char *ret;
Blk *b;
- *bp = nil;
if((b = getroot(t, nil)) == nil)
return Efs;
- ret = btlookupat(b, k, r, bp);
+ ret = btlookupat(b, k, r, buf, nbuf);
putblk(b);
return ret;
@@ -1418,101 +1333,62 @@
return nil;
}
-int
-accum(Scan *s, Msg *m)
-{
- vlong v;
- char *p;
- Dir *d;
-
- d = &s->dir;
- switch(m->op&0xf){
- case Onop:
- case Oinsert:
- s->present = 1;
- kv2dir(m, d);
- break;
- case Odelete:
- s->present = 0;
- break;
- case Owstat:
- p = m->v;
- d->qid.vers++;
- if(m->op & Owmtime){
- v = GBIT64(p);
- p += 8;
- d->mtime = v;
- }
- if(m->op & Owsize){
- v = GBIT64(p);
- p += 8;
- d->length = v;
- }
- if(m->op & Owmode){
- v = GBIT32(p);
- p += 4;
- d->mode = v;
- }
- if(m->op & Owname){
- fprint(2, "renames not yet supported\n");
- abort();
- }
- if(p != m->v + m->nv){
- fprint(2, "malformed wstat message");
- abort();
- }
- break;
- default:
- abort();
- }
- return 0;
-
-}
-
char *
btnext(Scan *s, Kvp *r, int *done)
{
Scanp *p;
- int i, j, h, start;
- Msg m, n, t;
+ int i, j, h, ok, start, srcbuf;
+ Msg m, n;
Kvp kv;
-Again:
/* load up the correct blocks for the scan */
+Again:
p = s->path;
h = s->root.ht;
*done = 0;
start = h;
-
- for(i = h-1; i > 0; i--){
- if(p[i].vi < p[i].b->nval || p[i].bi < p[i].b->nbuf)
+ srcbuf = -1;
+ for(i = h-1; i >= 0; i--){
+ if(p[i].b != nil
+ &&(p[i].vi < p[i].b->nval || p[i].bi < p[i].b->nbuf))
break;
if(i == 0){
*done = 1;
return nil;
}
- start = i;
- p[i-1].vi++;
- putblk(p[i].b);
+ if(p[i].b != nil)
+ putblk(p[i].b);
+ p[i].b = nil;
p[i].vi = 0;
p[i].bi = 0;
- p[i].b = nil;
+ p[i-1].vi++;
+ start = i;
}
- for(i = start; i < h; i++){
- getval(p[i-1].b, p[i-1].vi, &kv);
- if((p[i].b = getblk(kv.bp, 0)) == nil)
- return "error reading block";
+
+ if(p[start-1].vi < p[start-1].b->nval){
+ for(i = start; i < h; i++){
+ getval(p[i-1].b, p[i-1].vi, &kv);
+ if((p[i].b = getblk(kv.bp, 0)) == nil)
+ return "error reading block";
+ }
+
+ /* find the minimum key along the path up */
+ m.op = Onop;
+ getval(p[h-1].b, p[h-1].vi, &m);
+ }else{
+ getmsg(p[start-1].b, p[start-1].bi, &m);
+ assert(m.op == Oinsert);
+ srcbuf = start-1;
}
- /* find the minimum key along the path up */
- m.op = Onop;
- getval(p[h-1].b, p[h-1].vi, &m);
for(i = h-2; i >= 0; i--){
if(p[i].bi == p[i].b->nbuf)
continue;
getmsg(p[i].b, p[i].bi, &n);
- if(keycmp(&n, &m) < 0)
+ if(keycmp(&n, &m) < 0){
+ srcbuf = i;
m = n;
+ }
}
if(m.nk < s->pfx.nk || memcmp(m.k, s->pfx.k, s->pfx.nk) != 0){
*done = 1;
@@ -1520,25 +1396,23 @@
}
/* scan all messages applying to the message */
- getval(p[h-1].b, p[h-1].vi, &t);
- if(keycmp(&m, &t) == 0){
- t.op = Onop;
- accum(s, &t);
+ ok = 1;
+ cpkvp(r, &m, s->kvbuf, sizeof(s->kvbuf));
+ if(srcbuf == -1)
p[h-1].vi++;
- }
+ else
+ p[srcbuf].bi++;
for(i = h-2; i >= 0; i--){
for(j = p[i].bi; j < p[i].b->nbuf; j++){
- getmsg(p[i].b, j, &t);
- if(keycmp(&m, &t) != 0)
+ getmsg(p[i].b, j, &m);
+ if(keycmp(r, &m) != 0)
break;
- accum(s, &t);
+ ok = apply(r, &m, s->kvbuf, sizeof(s->kvbuf));
p[i].bi++;
- m = t;
}
}
- if(m.op == Odelete)
+ if(!ok)
goto Again;
- cpkvp(r, &m, s->kvbuf, sizeof(s->kvbuf));
return nil;
}