ref: 02ac617541ca1a7bf82b1615fb5a58235469b5d3
dir: /appl/lib/styxconv/new2old.b/
implement Styxconv; include "sys.m"; sys: Sys; include "osys.m"; include "nsys.m"; include "draw.m"; include "styx.m"; nstyx: Styx; Tmsg, Rmsg: import nstyx; include "ostyx.m"; ostyx: OStyx; OTmsg, ORmsg: import ostyx; include "styxconv.m"; # todo: map fids > ffff into 16 bits DEBUG: con 1; Fid: adt { fid: int; isdir: int; n: int; # size of last new client dirread request. soff: int; # dir offset on old server. coff: int; # dir offset on new client. next: cyclic ref Fid; }; Req: adt { tag: int; oldtag: int; # if it's a flush. rp: ref Reqproc; next: cyclic ref Req; flushes: list of ref Rmsg.Flush; # flushes awaiting req finish. }; Reqproc: adt { newtmsg: chan of ref Tmsg; # convproc -> reqproc, once per req. newrmsg: chan of ref Rmsg; # reqproc -> convproc, once per req oldtmsg: chan of ref OTmsg; # reqproc -> convproc oldrmsg: chan of ref ORmsg; # convproc -> reqproc flushable: int; new: fn(): ref Reqproc; rpc: fn(rp: self ref Reqproc, otm: ref OTmsg): ref ORmsg; }; tags: ref Req; avail: chan of ref Reqproc; fids: ref Fid; nprocs := 0; init() { sys = load Sys Sys->PATH; if(sys == nil) nomod("Sys", Sys->PATH); nstyx = load Styx Styx->PATH; if(nstyx == nil) nomod("Styx", Styx->PATH); ostyx = load OStyx OStyx->PATH; if(ostyx == nil) nomod("OStyx", OStyx->PATH); ostyx->init(); nstyx->init(); avail = chan of ref Reqproc; } styxconv(newclient: ref Sys->FD, oldsrv: ref Sys->FD) { newtmsg := chan of ref Tmsg; oldrmsg := chan of ref ORmsg; killpids := chan[2] of int; spawn readnewtmsgs(killpids, newclient, newtmsg); spawn readoldrmsgs(killpids, oldsrv, oldrmsg); converting: for(;;)alt{ ntm := <-newtmsg => if(DEBUG) sys->fprint(sys->fildes(2), "-> %s\n", ntm.text()); if(ntm == nil) break converting; ns2os(ntm, newclient, oldsrv); orm := <-oldrmsg => if(DEBUG) sys->fprint(sys->fildes(2), " <- %s\n", ostyx->rmsg2s(orm)); if(orm == nil) break converting; t := looktag(orm.tag); if(t == nil){ warning("reply by old-server to non-existent tag"); break; } pick rm := orm { Flush => ot := looktag(t.oldtag); # if it's an Rflush of a request-in-progress, # we send it to the reqproc, which # can then clean up as it likes. if(ot != nil){ if(ot.rp != nil){ if(ot.rp.flushable){ ot.rp.oldrmsg <-= rm; # reqproc is bound to finish after a flush reqreply(ot, newclient, oldsrv); }else { # hold flush reply for later ot.flushes = ref Rmsg.Flush(rm.tag) :: ot.flushes; } break; } deletetag(t.oldtag); } NRsend(newclient, ref Rmsg.Flush(rm.tag)); deletetag(rm.tag); * => if(t.rp != nil){ t.rp.oldrmsg <-= orm; reqreply(t, newclient, oldsrv); }else{ os2ns(orm, newclient); deletetag(orm.tag); } } } # kill off active reqprocs for(; tags != nil; tags = tags.next){ if(tags.rp != nil){ tags.rp.oldrmsg <-= nil; nprocs--; } } # kill off idle reqprocs while(nprocs > 0){ rp := <-avail; rp.newtmsg <-= nil; nprocs--; } # kill off message readers kill(<-killpids); kill(<-killpids); } # process one response from the request proc. # request proc can respond by sending a new tmsg to the old server # or by sending an rmsg to the new client, in which case # it implicitly signals that it has finished processing the request. # the actual reply might be an Rflush, signifying that # the request has been aborted. reqreply(t: ref Req, newclient: ref Sys->FD, oldsrv: ref Sys->FD) { rp := t.rp; alt{ nrm := <-rp.newrmsg => # request is done when process sends rmsg pick rm := nrm { Flush => deletetag(t.tag); } deletetag(nrm.tag); NRsend(newclient, nrm); for(; t.flushes != nil; t.flushes = tl t.flushes) NRsend(newclient, hd t.flushes); otm := <-rp.oldtmsg => OTsend(oldsrv, otm); } } # T messages: forward on, reply immediately, or start processing. ns2os(tm0: ref Tmsg, newclient, oldsrv: ref Sys->FD) { otm: ref OTmsg; t := ref Req(tm0.tag, -1, nil, nil, nil); pick tm := tm0{ Readerror => exit; Version => (s, v) := nstyx->compatible(tm, nstyx->MAXRPC, nil); NRsend(newclient, ref Rmsg.Version(tm.tag, s, v)); return; Auth => NRsend(newclient, ref Rmsg.Error(tm.tag, "authorization not required")); return; Walk => storetag(t); t.rp = Reqproc.new(); t.rp.newtmsg <-= tm; reqreply(t, newclient, oldsrv); return; Attach => otm = ref OTmsg.Attach(tm.tag, tm.fid, tm.uname, tm.aname); Flush => t.oldtag = tm.oldtag; otm = ref OTmsg.Flush(tm.tag, tm.oldtag); Open => otm = ref OTmsg.Open(tm.tag, tm.fid, tm.mode); Create => otm = ref OTmsg.Create(tm.tag, tm.fid, tm.perm, tm.mode, tm.name); Read => fp := findfid(tm.fid); count := tm.count; offset := tm.offset; if(fp != nil && fp.isdir){ fp.n = count; count = (count/OStyx->DIRLEN)*OStyx->DIRLEN; if(int offset != fp.coff){ NRsend(newclient, ref Rmsg.Error(tm.tag, "unexpected offset in dirread")); return; } offset = big fp.soff; } otm = ref OTmsg.Read(tm.tag, tm.fid, count, offset); Write => otm = ref OTmsg.Write(tm.tag, tm.fid, tm.offset, tm.data); Clunk => otm = ref OTmsg.Clunk(tm.tag, tm.fid); Remove => otm = ref OTmsg.Remove(tm.tag, tm.fid); Stat => otm = ref OTmsg.Stat(tm.tag, tm.fid); Wstat => otm = ref OTmsg.Wstat(tm.tag, tm.fid, nd2od(tm.stat)); * => fatal("bad T message"); } storetag(t); OTsend(oldsrv, otm); } # R messages: old to new os2ns(orm0: ref ORmsg, newclient: ref Sys->FD) { rm: ref Rmsg; rm = nil; pick orm := orm0 { Error => rm = ref Rmsg.Error(orm.tag, orm.err); Flush => rm = ref Rmsg.Flush(orm.tag); Clone => rm = ref Rmsg.Walk(orm.tag, nil); Walk => fatal("walk rmsgs should be dealt with be walkreqproc"); Open => setfid(orm.fid, orm.qid); rm = ref Rmsg.Open(orm.tag, oq2nq(orm.qid), 0); Create => setfid(orm.fid, orm.qid); rm = ref Rmsg.Create(orm.tag, oq2nq(orm.qid), 0); Read => fp := findfid(orm.fid); data := orm.data; if(fp != nil && fp.isdir) data = ods2nds(data, fp.n); fp.coff += len data; fp.soff += len orm.data; rm = ref Rmsg.Read(orm.tag, data); Write => rm = ref Rmsg.Write(orm.tag, orm.count); Clunk => rm = ref Rmsg.Clunk(orm.tag); deletefid(orm.fid); Remove => rm = ref Rmsg.Remove(orm.tag); deletefid(orm.fid); Stat => rm = ref Rmsg.Stat(orm.tag, od2nd(orm.stat)); Wstat => rm = ref Rmsg.Wstat(orm.tag); Attach => newfid(orm.fid, orm.qid.path & OSys->CHDIR); rm = ref Rmsg.Attach(orm.tag, oq2nq(orm.qid)); * => fatal("bad R message"); } NRsend(newclient, rm); } Reqproc.rpc(rp: self ref Reqproc, otm: ref OTmsg): ref ORmsg { rp.oldtmsg <-= otm; m := <-rp.oldrmsg; if(m == nil) exit; return m; } Reqproc.new(): ref Reqproc { alt{ rp := <-avail => return rp; * => rp := ref Reqproc( chan of ref Tmsg, chan of ref Rmsg, chan of ref OTmsg, chan of ref ORmsg, 1); spawn reqproc(rp); nprocs++; return rp; } } reqproc(rp: ref Reqproc) { for(;;){ tm := <-rp.newtmsg; if(tm == nil) return; rm: ref Rmsg; pick m := tm { Walk => rm = walkreq(m, rp); * => fatal("non-walk req passed to reqproc"); } rp.flushable = 1; rp.newrmsg <-= rm; avail <-= rp; } } # note that although this is in a separate process, # whenever it's not in Reqproc.rpc, the styxconv # process is blocked, so although state is shared, # there are no race conditions. walkreq(tm: ref Tmsg.Walk, rp: ref Reqproc): ref Rmsg { cloned := 0; n := len tm.names; if(tm.newfid != tm.fid){ cloned = 1; pick rm := rp.rpc(ref OTmsg.Clone(tm.tag, tm.fid, tm.newfid)) { Clone => ; Error => return ref Rmsg.Error(tm.tag, rm.err); Flush => return ref Rmsg.Flush(rm.tag); * => fatal("unexpected reply to OTmsg.Clone"); } cloned = 1; } qids := array[n] of NSys->Qid; finalqid: OSys->Qid; # make sure we don't get flushed in an unwindable state. rp.flushable = n == 1 || cloned; for(i := 0; i < n; i++){ pick rm := rp.rpc(ref OTmsg.Walk(tm.tag, tm.newfid, tm.names[i])) { Walk => qids[i] = oq2nq(rm.qid); finalqid = rm.qid; Flush => if(cloned){ rp.flushable = 0; rp.rpc(ref OTmsg.Clunk(tm.tag, tm.newfid)); } return ref Rmsg.Flush(rm.tag); Error => if(cloned){ rp.flushable = 0; rp.rpc(ref OTmsg.Clunk(tm.tag, tm.newfid)); } if(i == 0) return ref Rmsg.Error(tm.tag, rm.err); return ref Rmsg.Walk(tm.tag, qids[0:i]); } } if(cloned) clonefid(tm.fid, tm.newfid); if(n > 0) setfid(tm.newfid, finalqid); return ref Rmsg.Walk(tm.tag, qids); } storetag(t: ref Req) { t.next = tags; tags = t; } looktag(tag: int): ref Req { for(t := tags; t != nil; t = t.next) if(t.tag == tag) return t; return nil; } deletetag(tag: int) { prev: ref Req; t := tags; while(t != nil){ if(t.tag == tag){ next := t.next; t.next = nil; if(prev != nil) prev.next = next; else tags = next; t = next; }else{ prev = t; t = t.next; } } } newfid(fid: int, isdir: int): ref Fid { f := ref Fid; f.fid = fid; f.isdir = isdir; f.n = f.soff = f.coff = 0; f.next = fids; fids = f; return f; } clonefid(ofid: int, fid: int): ref Fid { if((f := findfid(ofid)) != nil){ nf := newfid(fid, f.isdir); return nf; } warning("clone of non-existent fid"); return newfid(fid, 0); } deletefid(fid: int) { lf: ref Fid; for(f := fids; f != nil; f = f.next){ if(f.fid == fid){ if(lf == nil) fids = f.next; else lf.next = f.next; return; } lf = f; } } findfid(fid: int): ref Fid { for(f := fids; f != nil; f = f.next) if(f.fid == fid) return f; return nil; } setfid(fid: int, qid: OSys->Qid) { f := findfid(fid); if(f == nil){ warning(sys->sprint("cannot find fid %d", fid)); }else{ f.isdir = qid.path & OSys->CHDIR; } } om2nm(om: int): int { # DMDIR == CHDIR return om; } nm2om(m: int): int { # DMDIR == CHDIR return m&~(NSys->DMAPPEND|NSys->DMEXCL|NSys->DMAUTH); } oq2nq(oq: OSys->Qid): NSys->Qid { q: NSys->Qid; isdir := oq.path&OSys->CHDIR; q.path = big (oq.path&~OSys->CHDIR); q.vers = oq.vers; q.qtype = 0; if(isdir) q.qtype |= NSys->QTDIR; return q; } nq2oq(q: NSys->Qid): OSys->Qid { oq: OSys->Qid; isdir := q.qtype&NSys->QTDIR; oq.path = int q.path; oq.vers = q.vers; if(isdir) oq.path |= OSys->CHDIR; return oq; } od2nd(od: OSys->Dir): NSys->Dir { d: NSys->Dir; d.name = od.name; d.uid = od.uid; d.gid = od.gid; d.muid = od.uid; d.qid = oq2nq(od.qid); d.mode = om2nm(od.mode); d.atime = od.atime; d.mtime = od.mtime; d.length = big od.length; d.dtype = od.dtype; d.dev = od.dev; return d; } nd2od(d: NSys->Dir): OSys->Dir { od: OSys->Dir; od.name = d.name; od.uid = d.uid; od.gid = d.gid; od.qid = nq2oq(d.qid); od.mode = nm2om(d.mode); od.atime = d.atime; od.mtime = d.mtime; od.length = int d.length; od.dtype = d.dtype; od.dev = d.dev; return od; } ods2nds(ob: array of byte, max: int): array of byte { od: OSys->Dir; m := len ob; if(m % OStyx->DIRLEN != 0) fatal(sys->sprint("bad dir len %d", m)); m /= OStyx->DIRLEN; n := 0; p := ob; for(i := 0; i < m; i++){ (p, od) = ostyx->convM2D(p); d := od2nd(od); nn := nstyx->packdirsize(d); if(n+nn > max) # might just happen with long file names break; n += nn; } m = i; b := array[n] of byte; n = 0; p = ob; for(i = 0; i < m; i++){ (p, od) = ostyx->convM2D(p); d := od2nd(od); q := nstyx->packdir(d); nn := len q; b[n: ] = q[0: nn]; n += nn; } return b; } OTsend(fd: ref Sys->FD, otm: ref OTmsg): int { if(DEBUG) sys->fprint(sys->fildes(2), " -> %s\n", ostyx->tmsg2s(otm)); s := array[OStyx->MAXRPC] of byte; n := ostyx->tmsg2d(otm, s); if(n < 0) return -1; return sys->write(fd, s, n); } NRsend(fd: ref Sys->FD, rm: ref Rmsg): int { if(DEBUG) sys->fprint(sys->fildes(2), "<- %s\n", rm.text()); s := rm.pack(); if(s == nil) return -1; return sys->write(fd, s, len s); } readnewtmsgs(pidc: chan of int, newclient: ref Sys->FD, newtmsg: chan of ref Tmsg) { pidc <-= sys->pctl(0, nil); for(;;){ newtmsg <-= Tmsg.read(newclient, nstyx->MAXRPC); } } readoldrmsgs(pidc: chan of int, oldsrv: ref Sys->FD, oldrmsg: chan of ref ORmsg) { pidc <-= sys->pctl(0, nil); for(;;){ oldrmsg <-= ORmsg.read(oldsrv); } } warning(err: string) { sys->fprint(sys->fildes(2), "warning: %s\n", err); } fatal(err: string) { sys->fprint(sys->fildes(2), "%s\n", err); exit; } nomod(mod: string, path: string) { fatal(sys->sprint("can't load %s(%s): %r", mod, path)); } kill(pid: int) { sys->fprint(sys->open("#p/"+string pid+"/ctl", Sys->OWRITE), "kill"); }