shithub: gefs

Download patch

ref: 7119fdec0b6fd8b3d1b0c6034950644d108400a3
parent: 729a62cf4172100f394553f2c4d69f77148c8ce5
author: Ori Bernstein <[email protected]>
date: Thu Oct 19 19:36:32 EDT 2023

fs: sweep blocks in background

--- a/cons.c
+++ b/cons.c
@@ -26,14 +26,11 @@
 static void
 sendsync(int fd, int halt)
 {
-	Fmsg *m;
 	Amsg *a;
 
-	m = mallocz(sizeof(Fmsg), 1);
 	a = mallocz(sizeof(Amsg), 1);
-	if(m == nil || a == nil){
+	if(a == nil){
 		fprint(fd, "alloc sync msg: %r\n");
-		free(m);
 		free(a);
 		return;
 	}
@@ -40,8 +37,7 @@
 	a->op = AOsync;
 	a->halt = halt;
 	a->fd = fd;
-	m->a = a;
-	chsend(fs->wrchan, m);		
+	chsend(fs->admchan, a);		
 }
 
 static void
@@ -61,12 +57,10 @@
 static void
 snapfs(int fd, char **ap, int)
 {
-	Fmsg *m;
 	Amsg *a;
 
-	m = mallocz(sizeof(Fmsg), 1);
 	a = mallocz(sizeof(Amsg), 1);
-	if(m == nil || a == nil){
+	if(a == nil){
 		fprint(fd, "alloc sync msg: %r\n");
 		goto Error;
 	}
@@ -83,12 +77,10 @@
 	}
 	a->op = AOsnap;
 	a->fd = fd;
-	m->a = a;
 	sendsync(fd, 0);
-	chsend(fs->wrchan, m);
+	chsend(fs->admchan, a);
 	return;	
 Error:
-	free(m);
 	free(a);
 	return;
 }
--- a/dat.h
+++ b/dat.h
@@ -313,6 +313,7 @@
 	AOnone,
 	AOsnap,
 	AOsync,
+	AOclear,
 };
 
 struct Bptr {
@@ -375,6 +376,11 @@
 		struct {	/* AOsync */
 			int	halt;
 		};
+		struct {	/* AOclear */
+			Mount	*mnt;
+			vlong	qpath;
+			vlong	length;
+		};
 	};
 };
 
@@ -382,7 +388,6 @@
 	Fcall;
 	Conn	*conn;
 	int	sz;	/* the size of the message buf */
-	Amsg	*a;	/* admin messages */
 	uchar	buf[];
 };
 
@@ -494,8 +499,10 @@
 	Conn	*conns;
 
 	Chan	*wrchan;
+	Chan	*admchan;
 	Chan	**rdchan;
 
+	QLock	mutlk;
 	int	nworker;
 	long	epoch;
 	long	lepoch[32];
--- a/fns.h
+++ b/fns.h
@@ -187,3 +187,5 @@
 void	runcons(int, void*);
 void	runtasks(int, void*);
 void	runsync(int, void*);
+void	runsweep(int, void*);
+void	runsweep(int, void*);
--- a/fs.c
+++ b/fs.c
@@ -106,13 +106,6 @@
 	d->muid = -1;
 }
 
-static void
-freemsg(Fmsg *m)
-{
-	free(m->a);
-	free(m);
-}
-
 static int
 okname(char *name)
 {
@@ -669,7 +662,6 @@
 	}
 	m->conn = c;
 	m->sz = sz;
-	m->a = nil;
 	PBIT32(m->buf, sz);
 	*pm = m;
 	return 0;
@@ -1600,7 +1592,7 @@
 }
 
 static void
-fsremove(Fmsg *m, int id)
+fsremove(Fmsg *m, Amsg **ao)
 {
 	char upbuf[Upksz];
 	Fcall r;
@@ -1639,9 +1631,13 @@
 	if((e = upsert(f->mnt, mb, 1)) != nil)
 		goto Error;
 	if(f->dent->qid.type == QTFILE){
-		e = clearb(id, f->mnt, f->qpath, 0, f->dent->length);
-		if(e != nil)
+		if((*ao = malloc(sizeof(Amsg))) == nil)
 			goto Error;
+		aincl(&f->mnt->ref, 1);
+		(*ao)->op = AOclear;
+		(*ao)->mnt = f->mnt;
+		(*ao)->qpath = f->qpath;
+		(*ao)->length = f->dent->length;
 	}
 	f->dent->gone = 1;
 	wunlock(f->dent);
@@ -1658,7 +1654,7 @@
 }
 
 static void
-fsopen(Fmsg *m, int id)
+fsopen(Fmsg *m, Amsg **ao)
 {
 	char *p, *e, buf[Kvmax];
 	int mbits;
@@ -1738,8 +1734,17 @@
 		mb.nk = f->dent->nk;
 		mb.v = buf;
 		mb.nv = p - buf;
-		clearb(id, f->mnt, f->qpath, 0, f->dent->length);
+		if((*ao = malloc(sizeof(Amsg))) == nil){
+			e = Enomem;
+			goto Error;
+		}
+		aincl(&f->mnt->ref, 1);
+		(*ao)->op = AOclear;
+		(*ao)->mnt = f->mnt;
+		(*ao)->qpath = f->qpath;
+		(*ao)->length = f->dent->length;
 		if((e = upsert(f->mnt, &mb, 1)) != nil){
+Error:
 			wunlock(f->dent);
 			rerror(m, e);
 			putfid(f);
@@ -2157,47 +2162,37 @@
 void
 runwrite(int id, void *)
 {
-	Mount *mnt;
 	Fmsg *m;
-	int ao;
+	Amsg *a;
 
 	while(1){
+		a = nil;
 		m = chrecv(fs->wrchan);
+		if(fs->rdonly){
+			rerror(m, Erdonly);
+			continue;
+ 		}
+		if(fs->broken){
+			rerror(m, Efs);
+			continue;
+		}
+
 		epochstart(id);
-		ao = (m->a == nil) ? AOnone : m->a->op;
-		switch(ao){
-		case AOnone:
-			if(fs->rdonly){
-				rerror(m, Erdonly);
-				continue;
-			}
-			if(fs->broken){
-				rerror(m, Efs);
-				continue;
-			}
-			switch(m->type){
-			case Tcreate:	fscreate(m);	break;
-			case Twrite:	fswrite(m);	break;
-			case Twstat:	fswstat(m);	break;
-			case Tremove:	fsremove(m,id);	break;
-			case Topen:	fsopen(m, id);	break;
-			}
-			break;
-		case AOsync:
-			if(m->a->halt)
-				ainc(&fs->rdonly);
-			for(mnt = fs->mounts; mnt != nil; mnt = mnt->next)
-				updatesnap(&mnt->root, mnt->root, mnt->name);
-			sync();
-			freemsg(m);
-			break;
-		case AOsnap:
-			snapfs(m->a->fd, m->a->old, m->a->new);
-			freemsg(m);
-			break;
+		qlock(&fs->mutlk);
+		switch(m->type){
+		case Tcreate:	fscreate(m);	break;
+		case Twrite:	fswrite(m);	break;
+		case Twstat:	fswstat(m);	break;
+		case Tremove:	fsremove(m,&a);	break;
+		case Topen:	fsopen(m, &a);	break;
+		default:	abort();	break;
 		}
-		epochend(id);
-		epochclean();
+ 		epochend(id);
+ 		epochclean();
+		qunlock(&fs->mutlk);
+
+		if(a != nil)
+			chsend(fs->admchan, a);
 	}
 }
 
@@ -2214,7 +2209,7 @@
 		case Twalk:	fswalk(m);	break;
 		case Tread:	fsread(m);	break;
 		case Tstat:	fsstat(m);	break;
-		case Topen:	fsopen(m, id);	break;
+		case Topen:	fsopen(m, nil);	break;
 		}
 		epochend(id);
 	}
@@ -2221,19 +2216,72 @@
 }
 
 void
+runsweep(int id, void*)
+{
+	char *e, buf[Offksz];
+	Mount *mnt;
+	vlong off;
+	Amsg *a;
+	Msg m;
+
+	while(1){
+		a = chrecv(fs->admchan);
+		switch(a->op){
+		case AOsync:
+			qlock(&fs->mutlk);
+			if(a->halt)
+				ainc(&fs->rdonly);
+			epochstart(id);
+			for(mnt = fs->mounts; mnt != nil; mnt = mnt->next)
+				updatesnap(&mnt->root, mnt->root, mnt->name);
+			sync();
+			epochend(id);
+			qunlock(&fs->mutlk);
+			break;
+		case AOsnap:
+			qlock(&fs->mutlk);
+			epochstart(id);
+			snapfs(a->fd, a->old, a->new);
+			epochend(id);
+			qunlock(&fs->mutlk);
+			break;
+		case AOclear:
+			qlock(&fs->mutlk);
+			for(off = 0; off < a->length; off += Blksz){
+				epochstart(id);
+				m.k = buf;
+				m.nk = sizeof(buf);
+				m.op = Oclearb;
+				m.k[0] = Kdat;
+				PACK64(m.k+1, a->qpath);
+				PACK64(m.k+9, off);
+				m.v = nil;
+				m.nv = 0;
+				if((e = upsert(a->mnt, &m, 1)) != nil){
+					fprint(2, "sweep: %s\n", e);
+					fs->broken++;
+				}
+				epochend(id);
+			}
+			qunlock(&fs->mutlk);
+			clunkmount(a->mnt);
+			break;
+		}
+		free(a);
+	}
+}
+
+void
 runtasks(int id, void *)
 {
 	int i, c;
-	Fmsg *m;
 	Amsg *a;
 
 	while(1){
 		sleep(5000);
-		m = mallocz(sizeof(Fmsg), 1);
 		a = mallocz(sizeof(Amsg), 1);
-		if(m == nil || a == nil){
+		if(a == nil){
 			fprint(2, "alloc sync msg: %r\n");
-			free(m);
 			free(a);
 			return;
 		}
@@ -2240,8 +2288,7 @@
 		a->op = AOsync;
 		a->halt = 0;
 		a->fd = -1;
-		m->a = a;
-		chsend(fs->wrchan, m);
+		chsend(fs->admchan, a);
 
 		/*
 		 * compresslog is designed to be concurrent with allocation,
--- a/main.c
+++ b/main.c
@@ -226,6 +226,7 @@
 	if(check && !checkfs(2))
 		sysfatal("fishy");
 	fs->wrchan = mkchan(32);
+	fs->admchan = mkchan(32);
 	fs->nsyncers = nproc/2;
 	fs->nreaders = 1;
 	if(fs->nsyncers > fs->narena)
@@ -242,6 +243,7 @@
 	ctlfd = postfd(srvname, ".cmd");
 	launch(runcons, (void*)ctlfd, "ctl");
 	launch(runwrite, nil, "mutate");
+	launch(runsweep, nil, "sweep");
 	launch(runtasks, nil, "tasks");
 	for(i = 0; i < fs->nreaders; i++)
 		launch(runread, fs->rdchan[i], "readio");