ref: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
dir: /appl/cmd/ctlfs.b/
include "dial.m"; dial: Dial; include "security.m"; auth: Auth; include "keyring.m"; keyring: Keyring; include "styx.m"; styx: Styx; Tmsg, Rmsg: import Styx; include "styxservers.m"; styxservers: Styxservers; nametree: Nametree; Tree: import nametree; Styxserver, Fid, Navigator, Navop, Eperm, Ecount, Eoffset: import styxservers; # Database features dbfeatures: list of string; # Initial fs files Qroot, Qctl, Qname, Qstatus: con big iota; # create ctlfs and the appropriate listeners run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string) { sys->fprint(stderr, "setting up ctlfs\n"); dbfeatures = DBVER :: dbfeatures; styx = load Styx Styx->PATH; styxservers = load Styxservers Styxservers->PATH; nametree = load Nametree Nametree->PATH; if(debug) sys->fprint(stderr, "ctlfs: checking if modules are loaded\n"); if(styx == nil) error("ctlfs: styx module not found"); if(styxservers == nil) error("ctlfs: styxservers module not found"); if(nametree == nil) error("ctlfs: nametree module not found"); if(debug) sys->fprint(stderr, "ctlfs: initializing modules\n"); auth->init(); styx->init(); styxservers->init(styx); nametree->init(); # authinfo init authinfo: ref Keyring->Authinfo; if (keyfile == nil) keyfile = "/usr/" + user() + "/keyring/default"; if(debug) sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile); authinfo = keyring->readauthinfo(keyfile); if (authinfo == nil) error(sys->sprint("ctlfs: cannot read %s: %r", keyfile)); # announcing if(debug) sys->fprint(stderr, "ctlfs: announcing dddbctl\n"); # addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl"); c := dial->announce(cfg.addr); if(c == nil) error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr)); # bootstrapping if(debug) sys->fprint(stderr, "ctlfs: bootstrapping\n"); sys->unmount(nil, "/mnt/keys"); sys->unmount(nil, "/mnt"); # nametree; this is shared across all attachees (tree, treeop) := nametree->start(); tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot)); tree.create(Qroot, dir("ctl", 8r666, Qctl)); tree.create(Qroot, dir("status", 8r444, Qstatus)); sys->fprint(stderr, "ctlfs: finished setting up; starting\n"); # listener entrypoint ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs); tree.quit(); } # dddbctl listener loop ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string) { loop: for (;;) { nc := dial->listen(c); if (nc == nil) error(sys->sprint("listen failed: %r")); if (debug) sys->fprint(stderr, "ctlfs: got connection from %s", readfile(nc.dir + "/remote")); dfd := dial->accept(nc); if (dfd == nil) continue loop; if(nc.cfd != nil) sys->fprint(nc.cfd, "keepalive"); hostname: string; hostname = readfile(nc.dir + "/remote"); if(hostname != nil) hostname = hostname[0:len hostname - 1]; regchan := dbreg.changen(); spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname); } } # authenticate a connection and set the user id. ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string, hostname: string) { # authenticate and change user id appropriately (fd, err) := auth->server(algs, authinfo, dfd, 1); if (fd == nil) { if (debug) sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err); return; } if (debug) sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err); spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname); } # filesystem loop; nb: hostname will be later used for stats ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string) { (tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot); # registry rx/tx (tx, rx) := regchan; # Primary server loop loop: while((tm := <-tc) != nil) { # Switch on operations being performed on a given Fid pick t := tm { # Open operation Open => (f, mode, d, err) := srv.canopen(t); if(f == nil){ srv.reply(ref Rmsg.Error(t.tag, err)); continue loop; } f.open(mode, d.qid); case f.path { # Qroot Qroot => if(t.mode != Sys->OREAD) { srv.reply(ref Rmsg.Error(t.tag, Eperm)); continue loop; } srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit())); # Qctl Qctl => if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) { srv.reply(ref Rmsg.Error(t.tag, Eperm)); continue loop; } srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit())); # Qname Qname => if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) { srv.reply(ref Rmsg.Error(t.tag, Eperm)); continue loop; } srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit())); # Qstatus Qstatus => if(t.mode != Sys->OREAD) { srv.reply(ref Rmsg.Error(t.tag, Eperm)); continue loop; } srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit())); # Default reply * => srv.default(t); } # Read operation Read => (f, err) := srv.canread(t); if(f == nil) { srv.reply(ref Rmsg.Error(t.tag, err)); break; } if(f.qtype & Sys->QTDIR){ srv.read(t); continue loop; } case f.path { # Qctl Qctl => ctlmsg := joinstr(dbfeatures, "\n") + "\n"; ctlmsgbuf := array of byte ctlmsg; rend := int t.offset + t.count; if(rend > len ctlmsg) rend = len ctlmsg; srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend])); # Qname Qname => namemsg := cfg.name + "\n"; namemsgbuf := array of byte namemsg; rend := int t.offset + t.count; if(rend > len namemsg) rend = len namemsg; srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend])); # Qstatus Qstatus => info: list of string; info = "name " + cfg.name :: info; info = "sysname " + cfg.sysn :: info; info = "addr " + cfg.addr :: info; info = "storage " + cfg.storage :: info; info = "fsworkers " + sys->sprint("%d", cfg.fswrks) :: info; info = "" :: info; info = "nodes" :: info; tx <-= ref RegTMsg.GetNodes(); reply := <- rx; pick r := reply { Error => srv.reply(ref Rmsg.Error(t.tag, r.err)); NodeList => names := lists->reverse(r.names); while(len names != 0) { node := hd names; sline := ""; tx <-= ref RegTMsg.Check(node); crep := <- rx; pick cr := crep { Error => sline = cr.err; Status => up := cr.count; ps := cr.poolsize; sline = sys->sprint("%d %d", up, ps); * => sline = "unsupported message"; } info = node + " " + sline :: info; names = tl names; } * => srv.reply(ref Rmsg.Error(t.tag, "unsupported version")); } statusmsg := joinstr(lists->reverse(info), "\n") + "\n"; statusmsgbuf := array of byte statusmsg; rend := int t.offset + t.count; if(rend > len statusmsg) rend = len statusmsg; srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend])); # Default reply * => srv.default(t); } # Write operation Write => srv.default(t); # Default action * => srv.default(t); } } }