shithub: dddb

ref: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
dir: /appl/cmd/ctlfs.b/

View raw version
include "dial.m";
	dial: Dial;

include "security.m";
	auth: Auth;

include "keyring.m";
	keyring: Keyring;

include "styx.m";
	styx: Styx;
	Tmsg, Rmsg: import Styx;

include "styxservers.m";
	styxservers: Styxservers;
	nametree: Nametree;
	Tree: import nametree;
	Styxserver, Fid, Navigator, Navop,
	Eperm, Ecount, Eoffset: import styxservers;

# Database features
dbfeatures: list of string;

# Initial fs files
Qroot, Qctl, Qname, Qstatus: con big iota;

# create ctlfs and the appropriate listeners
run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
	sys->fprint(stderr, "setting up ctlfs\n");

	dbfeatures = DBVER :: dbfeatures;

	styx = load Styx Styx->PATH;
	styxservers = load Styxservers Styxservers->PATH;
	nametree = load Nametree Nametree->PATH;

	if(debug)
		sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");

	if(styx == nil)
		error("ctlfs: styx module not found");
	if(styxservers == nil)
		error("ctlfs: styxservers module not found");
	if(nametree == nil)
		error("ctlfs: nametree module not found");

	if(debug)
		sys->fprint(stderr, "ctlfs: initializing modules\n");

	auth->init();

	styx->init();
	styxservers->init(styx);

	nametree->init();

	# authinfo init

	authinfo: ref Keyring->Authinfo;
	if (keyfile == nil)
		keyfile = "/usr/" + user() + "/keyring/default";
	if(debug)
		sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
	authinfo = keyring->readauthinfo(keyfile);
	if (authinfo == nil)
		error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));

	# announcing
	if(debug)
		sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
	# addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
	c := dial->announce(cfg.addr);
	if(c == nil)
		error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));

	# bootstrapping
	if(debug)
		sys->fprint(stderr, "ctlfs: bootstrapping\n");
	sys->unmount(nil, "/mnt/keys");
	sys->unmount(nil, "/mnt");

	# nametree; this is shared across all attachees
	(tree, treeop) := nametree->start();
	tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
	tree.create(Qroot, dir("ctl", 8r666, Qctl));
	tree.create(Qroot, dir("status", 8r444, Qstatus));

	sys->fprint(stderr, "ctlfs: finished setting up; starting\n");

	# listener entrypoint
	ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs);
	tree.quit();
}

# dddbctl listener loop
ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string)
{
	loop: for (;;) {
		nc := dial->listen(c);
		if (nc == nil)
			error(sys->sprint("listen failed: %r"));
		if (debug)
			sys->fprint(stderr, "ctlfs: got connection from %s",
						readfile(nc.dir + "/remote"));
		dfd := dial->accept(nc);
		if (dfd == nil)
			continue loop;

		if(nc.cfd != nil)
			sys->fprint(nc.cfd, "keepalive");

		hostname: string;
		hostname = readfile(nc.dir + "/remote");
		if(hostname != nil)
			hostname = hostname[0:len hostname - 1];

		regchan := dbreg.changen();
		spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname);
	}
}

# authenticate a connection and set the user id.
ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo,
		algs: list of string, hostname: string)
{
	# authenticate and change user id appropriately
	(fd, err) := auth->server(algs, authinfo, dfd, 1);
	if (fd == nil) {
		if (debug)
			sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
		return;
	}
	if (debug)
		sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);

	spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname);
}

# filesystem loop; nb: hostname will be later used for stats
ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string)
{
	(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);

	# registry rx/tx
	(tx, rx) := regchan;

	# Primary server loop
	loop:
	while((tm := <-tc) != nil) {
		# Switch on operations being performed on a given Fid
		pick t := tm {
		# Open operation
		Open =>
			(f, mode, d, err) := srv.canopen(t);
			if(f == nil){
				srv.reply(ref Rmsg.Error(t.tag, err));
				continue loop;
			}
			f.open(mode, d.qid);

			case f.path {

			# Qroot
			Qroot =>
				if(t.mode != Sys->OREAD) {
					srv.reply(ref Rmsg.Error(t.tag, Eperm));
					continue loop;
				}
				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));

			# Qctl
			Qctl =>
				if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
					srv.reply(ref Rmsg.Error(t.tag, Eperm));
					continue loop;
				}
				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
			# Qname
			Qname =>
				if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
					srv.reply(ref Rmsg.Error(t.tag, Eperm));
					continue loop;
				}
				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));

			# Qstatus
			Qstatus => 
				if(t.mode != Sys->OREAD) {
					srv.reply(ref Rmsg.Error(t.tag, Eperm));
					continue loop;
				}
				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));

			# Default reply
			* => srv.default(t);
			}

		# Read operation
		Read =>
			(f, err) := srv.canread(t);
			if(f == nil) {
				srv.reply(ref Rmsg.Error(t.tag, err));
				break;
			}
			if(f.qtype & Sys->QTDIR){
				srv.read(t);
				continue loop;
			}

			case f.path {

			# Qctl
			Qctl =>
				ctlmsg := joinstr(dbfeatures, "\n") + "\n";
				ctlmsgbuf := array of byte ctlmsg;
				rend := int t.offset + t.count;
				if(rend > len ctlmsg)
					rend = len ctlmsg;
				srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));

			# Qname
			Qname =>
				namemsg := cfg.name + "\n";
				namemsgbuf := array of byte namemsg;
				rend := int t.offset + t.count;
				if(rend > len namemsg)
					rend = len namemsg;
				srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
			# Qstatus
			Qstatus =>
				info: list of string;
				info = "name		" + cfg.name :: info;
				info = "sysname		" + cfg.sysn :: info;
				info = "addr		" + cfg.addr :: info;
				info = "storage		" + cfg.storage :: info;
				info = "fsworkers	" + sys->sprint("%d", cfg.fswrks) :: info;
				info = "" :: info;
				info = "nodes" :: info;

				tx <-= ref RegTMsg.GetNodes();
				reply := <- rx;

				pick r := reply {
					Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
					NodeList =>
						names := lists->reverse(r.names);
						while(len names != 0) {
							node := hd names;
							sline := "";

							tx <-= ref RegTMsg.Check(node);
							crep := <- rx;
							pick cr := crep {
								Error => sline = cr.err;
								Status =>
									up := cr.count;
									ps := cr.poolsize;
									sline = sys->sprint("%d	%d", up, ps);
								* => sline = "unsupported message";
							}

							info = node + "		" +  sline :: info;
							names = tl names;
						}
					* => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
				}

				statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
				statusmsgbuf := array of byte statusmsg;
				rend := int t.offset + t.count;
				if(rend > len statusmsg)
					rend = len statusmsg;
				srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));

			# Default reply
			* => srv.default(t);
			}
		# Write operation
		Write =>
			srv.default(t);

		# Default action
		* => srv.default(t);
		}
	}
}