ref: 151eed2c3aaa1975b3c7a0331bb3ee4730552e8d
parent: 89612b6c2f225ba7e172974abecd97d8a4703761
author: kvik <[email protected]>
date: Thu Nov 1 00:21:04 EDT 2018
stop the Channel abuse, adopt mischief's WaitGroups
--- a/clone.c
+++ b/clone.c
@@ -7,16 +7,19 @@
Nblkprocs = 16,
Blksz = 128*1024,
- Blkdone = 1,
-
- End = 1,
};
typedef struct {
+ Rendez;
+ QLock;
+ Ref;
+} WaitGroup;
+
+typedef struct {
Dir;
+ WaitGroup wg;
char *src, *dst;
int sfd, dfd;
- Channel *c;
} File;
typedef struct {
@@ -33,15 +36,21 @@
int fileprocs = Nfileprocs;
int blkprocs = Nblkprocs;
Dir *skipdir;
+WaitGroup filewg;
Channel *filechan; /* chan(File*) */
Channel *blkchan; /* chan(Blk*) */
-Channel *endchan; /* chan(ulong) */
void usage(void);
void *emalloc(ulong);
char *estrdup(char*);
+extern int cas(long *p, long ov, long nv);
+void wginit(WaitGroup*, long);
+void wgadd(WaitGroup*, long);
+void wgdone(WaitGroup*);
+void wgwait(WaitGroup*);
+
char *filename(char*);
Dir *mkdir(char*, Dir*, int);
int same(Dir*, Dir*);
@@ -84,6 +93,45 @@
return p;
}
+void
+wginit(WaitGroup *wg, long count)
+{
+ memset(wg, 0, sizeof(*wg));
+ wg->l = &wg->QLock;
+ if(cas(&wg->ref, 0, count) == 0)
+ sysfatal("wginit: cas failed");
+}
+
+void
+wgadd(WaitGroup *wg, long n)
+{
+ long v;
+
+ v = wg->ref;
+ while(cas(&wg->ref, v, v+n) == 0)
+ v = wg->ref;
+}
+
+void
+wgdone(WaitGroup *wg)
+{
+ if(decref(wg) < 0)
+ sysfatal("wgdone: negative WaitGroup count");
+ qlock(wg);
+ rwakeupall(wg);
+ qunlock(wg);
+}
+
+void
+wgwait(WaitGroup *wg)
+{
+ qlock(wg);
+ while(!(wg->ref == 0))
+ rsleep(wg);
+ qunlock(wg);
+}
+
+
char *
filename(char *s)
{
@@ -147,7 +195,6 @@
f->dst = estrdup(dst);
f->sfd = -1;
f->dfd = -1;
- f->c = nil;
return f;
}
@@ -211,6 +258,7 @@
dst = smprint("%s/%s", dst, filename(src));
f = filenew(src, dst, sd);
sendp(filechan, f);
+ wgadd(&filewg, 1);
return;
}
@@ -253,6 +301,7 @@
}else{
f = filenew(sn, dn, d);
sendp(filechan, f);
+ wgadd(&filewg, 1);
}
free(sn);
free(dn);
@@ -287,34 +336,16 @@
void
clonefile(File *f)
{
- vlong n, done;
+ vlong n;
Blk *blks, *b, *be;
- enum {Anext, Adone, Aend};
- Alt alts[] = {
- [Anext] {blkchan, &b, CHANSND},
- [Adone] {f->c, nil, CHANRCV},
- [Aend] {nil, nil, CHANEND},
- };
-
n = blklist(f, &blks);
if(n == 0)
return;
- b = blks;
- be = blks + n;
- done = 0;
- while(done < n){
- switch(alt(alts)){
- case Anext:
- ++b;
- if(b == be)
- alts[Anext].op = CHANNOP;
- break;
- case Adone:
- ++done;
- break;
- }
- }
+ wgadd(&f->wg, n);
+ for(b = blks, be = b + n; b != be; b++)
+ sendp(blkchan, b);
+ wgwait(&f->wg);
free(blks);
}
@@ -341,8 +372,7 @@
if(n > 0)
if(pwrite(dfd, buf, n, off) < n)
sysfatal("blkproc: write error: %r");
-
- sendul(b->f->c, Blkdone);
+ wgdone(&b->f->wg);
}
}
@@ -349,18 +379,15 @@
void
fileproc(void *)
{
- Channel *c;
File *f;
-
- c = chancreate(sizeof(ulong), blkprocs);
+
for(;;){
f = recvp(filechan);
- if(f == nil){
- sendul(endchan, End);
+ if(f == nil)
return;
- }
- f->c = c;
+ wginit(&f->wg, 0);
+
f->sfd = open(f->src, OREAD);
if(f->sfd < 0)
sysfatal("fileproc: can't open: %r");
@@ -371,6 +398,7 @@
clonefile(f);
cloneattr(f->dfd, f);
filefree(f);
+ wgdone(&filewg);
}
}
@@ -407,19 +435,15 @@
filechan = chancreate(sizeof(File*), fileprocs);
blkchan = chancreate(sizeof(Blk*), blkprocs);
- endchan = chancreate(sizeof(ulong), 0);
for(i = 0; i < fileprocs; i++)
proccreate(fileproc, nil, mainstacksize);
for(i = 0; i < blkprocs; i++)
proccreate(blkproc, nil, mainstacksize);
+ wginit(&filewg, 0);
for(i = 0; i < argc -1; i++)
clone(argv[i], dst);
-
- for(i = 0; i < fileprocs; i++){
- sendp(filechan, nil);
- recvul(endchan);
- }
+ wgwait(&filewg);
threadexitsall(nil);
}