shithub: rtmp

Download patch

ref: 354c3b1554f91af178c28cf6e34ac1585df5b757
parent: e95674db1eb82210f12fb429cc848613e24e704f
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Mon Aug 2 14:05:12 EDT 2021

stream publish

--- a/main.c
+++ b/main.c
@@ -58,8 +58,17 @@
 	if((r = rtmpdial(argv[0])) == nil)
 		sysfatal("%r");
 	ulong sid;
-	if(rtmpstream(r, &sid) == 0)
+	fprint(2, "asking for a stream\n");
+
+	if(rtmpstream(r, &sid) == 0){
 		fprint(2, "stream: %lud\n", sid);
+		if(rtmppublish(r, sid, PubLive, "live") == 0)
+			fprint(2, "stream published\n");
+		else
+			fprint(2, "stream publish failed: %r\n");
+	}else{
+		fprint(2, "stream failed\n");
+	}
 
 	while(1)
 		sleep(100);
--- a/rtmp.c
+++ b/rtmp.c
@@ -65,6 +65,7 @@
 	Bufsz = 64*1024,
 };
 
+typedef struct Buffer Buffer;
 typedef struct Command Command;
 typedef struct Message Message;
 
@@ -90,20 +91,25 @@
 	Command cmd;
 };
 
+struct Buffer {
+	Message msg;
+	u8int *b, *p, *e;
+	int bsz;
+};
+
 struct RTMP {
 	Biobufhdr;
 	QLock;
+	Buffer i;
+	Buffer o;
 	Channel *c;
 	char *app;
 	char *path;
 	char *tcurl;
-	Message msg;
-	u8int *b, *p, *e;
 	int chunkin;
 	int chunkout;
 	int mode;
-	int bsz;
-	int i;
+	int fd;
 	int winacksz;
 	int bw;
 	u8int bwlimit;
@@ -114,24 +120,24 @@
 	u8int biobuf[Biobufsz];
 };
 
-#define putnull() do{ r->p = a₀null(r->p, r->e); }while(0)
-#define puti16(i) do{ r->p = a₀i16(r->p, r->e, i); }while(0)
-#define puti24(i) do{ r->p = a₀i24(r->p, r->e, i); }while(0)
-#define puti32(i) do{ r->p = a₀i32(r->p, r->e, i); }while(0)
-#define putnum(v) do{ r->p = a₀num(r->p, r->e, v); }while(0)
-#define putstr(s) do{ r->p = a₀str(r->p, r->e, s); }while(0)
-#define putarr() do{ r->p = a₀arr(r->p, r->e); }while(0)
-#define putobj() do{ r->p = a₀obj(r->p, r->e); }while(0)
-#define putend() do{ r->p = a₀end(r->p, r->e); }while(0)
-#define putkvnum(name, v) do{ r->p = a₀kvnum(r->p, r->e, name, v); }while(0)
-#define putkvstr(name, s) do{ r->p = a₀kvstr(r->p, r->e, name, s); }while(0)
-#define putkvbool(name, s) do{ r->p = a₀kvbool(r->p, r->e, name, s); }while(0)
+#define putnull() do{ r->o.p = a₀null(r->o.p, r->o.e); }while(0)
+#define puti16(i) do{ r->o.p = a₀i16(r->o.p, r->o.e, i); }while(0)
+#define puti24(i) do{ r->o.p = a₀i24(r->o.p, r->o.e, i); }while(0)
+#define puti32(i) do{ r->o.p = a₀i32(r->o.p, r->o.e, i); }while(0)
+#define putnum(v) do{ r->o.p = a₀num(r->o.p, r->o.e, v); }while(0)
+#define putstr(s) do{ r->o.p = a₀str(r->o.p, r->o.e, s); }while(0)
+#define putarr() do{ r->o.p = a₀arr(r->o.p, r->o.e); }while(0)
+#define putobj() do{ r->o.p = a₀obj(r->o.p, r->o.e); }while(0)
+#define putend() do{ r->o.p = a₀end(r->o.p, r->o.e); }while(0)
+#define putkvnum(name, v) do{ r->o.p = a₀kvnum(r->o.p, r->o.e, name, v); }while(0)
+#define putkvstr(name, s) do{ r->o.p = a₀kvstr(r->o.p, r->o.e, name, s); }while(0)
+#define putkvbool(name, s) do{ r->o.p = a₀kvbool(r->o.p, r->o.e, name, s); }while(0)
 
 #define putcommand(name, cb_) do { \
 	putstr(name); \
-	putnum(r->msg.cmd.tid); \
+	putnum(r->o.msg.cmd.tid); \
 	putobj(); \
-	r->msg.cmd.cb = cb_; \
+	r->o.msg.cmd.cb = cb_; \
 }while(0)
 
 static int szs[] = {
@@ -175,34 +181,48 @@
 	[LimitDynamic] = "dynamic",
 };
 
+static char *pubtype2s[] = {
+	[PubLive] = "live",
+	[PubAppend] = "append",
+	[PubRecord] = "record",
+};
+
 extern int debug;
 
 static void
 newmsg(RTMP *r, int type, int fmt, int cs)
 {
-	memset(&r->msg, 0, sizeof(r->msg));
+	memset(&r->o.msg, 0, sizeof(r->o.msg));
 
-	r->msg.type = type;
-	r->msg.fmt = fmt;
-	r->msg.cs = cs;
-	r->p = r->b;
+	r->o.msg.type = type;
+	r->o.msg.fmt = fmt;
+	r->o.msg.cs = cs;
+	r->o.p = r->o.b;
 	if(type == AMF0Command)
-		r->msg.cmd.tid = ++r->cmds.tid;
+		r->o.msg.cmd.tid = ++r->cmds.tid;
+	else
+		r->o.msg.cmd.tid = 0;
 }
 
 static void
-bextend(RTMP *r, int bsz)
+notransaction(RTMP *r)
 {
+	r->o.msg.cmd.tid = 0;
+}
+
+static void
+bextend(Buffer *b, int bsz)
+{
 	u8int *ob;
 
-	if(r->bsz >= bsz)
+	if(b->bsz >= bsz)
 		return;
-	ob = r->b;
-	r->b = erealloc(r->b, bsz*2);
+	ob = b->b;
+	b->b = erealloc(b->b, bsz*2);
 	if(ob != nil)
-		r->p = r->b + (intptr)(ob - r->p);
-	r->bsz = bsz*2;
-	r->e = r->b + r->bsz;
+		b->p = b->b + (intptr)(ob - b->p);
+	b->bsz = bsz*2;
+	b->e = b->b + b->bsz;
 }
 
 static int
@@ -212,30 +232,34 @@
 	u8int *h, *e, byte;
 	u32int ts;
 
-	memset(&r->msg, 0, sizeof(r->msg));
+	memset(&r->i.msg, 0, sizeof(r->i.msg));
 
-	r->p = r->b;
-	if(readn(r->i, r->p, 1) != 1)
-		goto eof;
-	r->msg.fmt = (r->p[0] & 0xc0)>>6;
-	r->msg.cs = r->p[0] & 0x3f;
-	n = r->msg.cs + 1;
+	r->i.p = r->i.b;
+	if((n = readn(r->fd, &byte, 1)) != 1){
+		if(n == 0)
+			werrstr("eof");
+		goto err;
+	}
+
+	r->i.msg.fmt = (byte & 0xc0)>>6;
+	r->i.msg.cs = byte & 0x3f;
+	n = r->i.msg.cs + 1;
 	if(n <= 2){
-		if(readn(r->i, r->p, n) != n)
-			goto eof;
-		r->msg.cs = 64 + r->p[0];
+		if(readn(r->fd, r->i.p, n) != n)
+			goto err;
+		r->i.msg.cs = 64 + r->i.p[0];
 		if(n == 2)
-			r->msg.cs += 256 * r->p[1];
+			r->i.msg.cs += 256 * r->i.p[1];
 	}
 
-	hsz = szs[r->msg.fmt];
-	if(readn(r->i, r->p, hsz) != hsz)
-		goto eof;
+	hsz = szs[r->i.msg.fmt];
+	if(readn(r->fd, r->i.p, hsz) != hsz)
+		goto err;
 
-	h = r->p;
-	e = r->p + hsz;
+	h = r->i.p;
+	e = r->i.p + hsz;
 
-	r->msg.type = -1;
+	r->i.msg.type = -1;
 	msid = 0;
 	ts = 0;
 	len = 0;
@@ -244,7 +268,7 @@
 		if(hsz >= szs[Type1]){
 			h = a₀i24get(h, e, &len);
 			h = a₀byteget(h, e, &byte);
-			r->msg.type = byte;
+			r->i.msg.type = byte;
 			if(hsz >= szs[Type0])
 				h = a₀i32leget(h, e, &msid);
 		}
@@ -251,26 +275,26 @@
 	}
 
 	if(ts == 0xffffff){ /* exntended timestamp */
-		if(readn(r->i, h, 4) != 4)
+		if(readn(r->fd, h, 4) != 4)
 			goto err;
 		h = a₀i32get(h, h+4, (s32int*)&ts);
 	}
 
 	/* FIXME do all consecutive chunks use Type3? */
-	bextend(r, len);
-	r->msg.data = h;
-	r->msg.sz = len;
+	bextend(&r->i, len);
+	r->i.msg.data = h;
+	r->i.msg.sz = len;
 	for(;;){
 		n = min(len, r->chunkin);
-		if(readn(r->i, h, n) != n)
-			goto eof;
+		if(readn(r->fd, h, n) != n)
+			goto err;
 		len -= n;
 		h += n;
 		if(len < 1)
 			break;
-		if(readn(r->i, h, 1) != 1)
-			goto eof;
-		if((r->msg.cs | Type3<<6) != *h){
+		if(readn(r->fd, h, 1) != 1)
+			goto err;
+		if((r->i.msg.cs | Type3<<6) != *h){
 			werrstr("cs/fmt does not match: %02x", *h);
 			goto err;
 		}
@@ -277,8 +301,6 @@
 	}
 
 	return 0;
-eof:
-	werrstr("eof");
 err:
 	werrstr("rtmprecv: %r");
 	return -1;
@@ -290,24 +312,26 @@
 	u8int *p, *h, *e, hdata[24];
 	int len, n, hsz;
 	Command *c;
+	Message *m;
 
-	if(r->p == nil)
+	if(r->o.p == nil)
 		goto err;
 
-	r->msg.data = r->b;
-	r->msg.sz = r->p - r->b;
+	m = &r->o.msg;
+	m->data = r->o.b;
+	m->sz = r->o.p - r->o.b;
 
 	h = hdata;
-	*h++ = r->msg.fmt<<6 | r->msg.cs;
-	hsz = szs[r->msg.fmt];
+	*h++ = m->fmt<<6 | m->cs;
+	hsz = szs[m->fmt];
 	e = h + hsz;
 	if(hsz >= szs[Type2]){
 		h = a₀i24(h, e, 0); /* FIXME put actual timestamps? */
 		if(hsz >= szs[Type1]){
-			h = a₀i24(h, e, r->msg.sz);
-			h = a₀byte(h, e, r->msg.type);
+			h = a₀i24(h, e, m->sz);
+			h = a₀byte(h, e, m->type);
 			if(hsz >= szs[Type0])
-				h = a₀i32(h, e, r->msg.sid);
+				h = a₀i32(h, e, m->sid);
 		}
 	}
 	assert(h != nil);
@@ -315,7 +339,7 @@
 	if(Bwrite(r, hdata, h-hdata) < 0)
 		goto err;
 
-	for(p = r->msg.data, len = r->msg.sz; len > 0;){
+	for(p = m->data, len = m->sz; len > 0;){
 		n = min(len, r->chunkout);
 		if(Bwrite(r, p, n) < 0)
 			goto err;
@@ -322,7 +346,7 @@
 		p += n;
 		len -= n;
 		if(len > 0){
-			*h = r->msg.cs | Type3<<6;
+			*h = m->cs | Type3<<6;
 			Bputc(r, *h);
 		}
 	}
@@ -331,13 +355,13 @@
 		goto err;
 
 	if(debug){
-		fprint(2, "← %M", &r->msg);
-		if(r->msg.type == AMF0Command){
+		fprint(2, "← %M", m);
+		if(m->type == AMF0Command){
 			A₀ *a;
 			u8int *s, *e;
 			fprint(2, ":");
-			s = r->msg.data;
-			e = s + r->msg.sz;
+			s = m->data;
+			e = s + m->sz;
 			for(; s != nil && s != e;){
 				if((s = a₀parse(&a, s, e)) != nil)
 					fprint(2, " %A", a);
@@ -349,9 +373,9 @@
 		fprint(2, "\n");
 	}
 
-	if(r->msg.type == AMF0Command){
+	if(m->type == AMF0Command){
 		c = emalloc(sizeof(*c));
-		*c = r->msg.cmd;
+		*c = m->cmd;
 		assert(c->cb != nil);
 		if((c->next = r->cmds.w) != nil)
 			c->next->prev = c;
@@ -368,7 +392,8 @@
 rtmpfree(RTMP *r)
 {
 	free(r->app);
-	free(r->b);
+	free(r->i.b);
+	free(r->o.b);
 	free(r->path);
 	free(r->tcurl);
 	if(r->c != nil){
@@ -414,7 +439,7 @@
 	RTMP *r;
 
 	r = aux;
-	m = &r->msg;
+	m = &r->i.msg;
 	res = 0;
 	memset(a, 0, sizeof(a));
 	for(;;){
@@ -422,8 +447,6 @@
 			a₀free(a[n]);
 		memset(a, 0, sizeof(a));
 
-		qlock(r);
-
 		if(res != 0 || (res = rtmprecv(r)) != 0){
 			if(debug)
 				fprint(2, "rtmp loop: %r\n");
@@ -432,11 +455,13 @@
 			break;
 		}
 
-		s = r->msg.data;
-		e = s + r->msg.sz;
+		s = m->data;
+		e = s + m->sz;
 
+		qlock(r);
+
 		if(debug)
-			fprint(2, "→ %M", &r->msg);
+			fprint(2, "→ %M", m);
 
 		switch(m->type){
 		case AMF0Command:
@@ -461,23 +486,23 @@
 						goto err;
 					}
 					for(c = r->cmds.w; c != nil && c->tid != a[n]->num; c = c->next);
-					if(c == nil){
-						werrstr("response to non-existent transaction %d", (int)a[n]->num);
-						goto err;
-					}
+					if(c == nil)
+						fprint(2, "response to non-existent transaction %d", (int)a[n]->num);
 					break;
 				}
 			}
 			if(debug)
 				fprint(2, " tid=%A: %A %A %A\n", a[CbTransID], a[CbCommand], a[CbObject], a[CbResponse]);
-			if(c->prev != nil)
-				c->prev->next = c->next;
-			if(c->next != nil)
-				c->next->prev = c->prev;
-			if(r->cmds.w == c)
-				r->cmds.w = c->next;
-			c->cb(r, ok, a, c->aux);
-			free(c);
+			if(c != nil){
+				if(c->prev != nil)
+					c->prev->next = c->next;
+				if(c->next != nil)
+					c->next->prev = c->prev;
+				if(r->cmds.w == c)
+					r->cmds.w = c->next;
+				c->cb(r, ok, a, c->aux);
+				free(c);
+			}
 			break;
 
 		case SetChunkSize:
@@ -555,7 +580,6 @@
 		qunlock(r);
 	}
 
-	qunlock(r);
 	rtmpfree(r);
 
 	threadexitsall(res == 0 ? nil : "error");
@@ -617,16 +641,17 @@
 	Channel *c;
 	int n;
 
+	c = chancreate(sizeof(ulong), 0);
+
 	qlock(r);
 
 	newmsg(r, AMF0Command, Type0, CSCtl);
 	putstr("createStream");
-	putnum(r->msg.cmd.tid);
+	putnum(r->o.msg.cmd.tid);
 	putnull();
 
-	c = chancreate(sizeof(ulong), 0);
-	r->msg.cmd.cb = streamcreated;
-	r->msg.cmd.aux = c;
+	r->o.msg.cmd.cb = streamcreated;
+	r->o.msg.cmd.aux = c;
 	n = rtmpsend(r);
 
 	qunlock(r);
@@ -638,15 +663,82 @@
 }
 
 static void
+streampublished(RTMP *, int ok, A₀ *a[NumCb], void *aux)
+{
+	Channel *err;
+
+	err = aux;
+	if(strcmp(a[CbCommand]->str, "onStatus") != 0)
+		fprint(2, "streampublished: expected 'onStatus', got %#q\n", a[CbCommand]->str);
+	else if(a[CbResponse]->type != Tobj)
+		fprint(2, "streampublished: expected object, got something else\n");
+	else if(ok)
+		sendp(err, nil);
+
+	chanclose(err);
+}
+
+int
+rtmppublish(RTMP *r, ulong sid, int type, char *name)
+{
+	Channel *c;
+	char *e;
+	int n;
+
+	if(type < 0 || type >= nelem(pubtype2s)){
+		werrstr("invalid publish type %d", type);
+		return -1;
+	}
+	if(name == nil)
+		name = "";
+
+	c = chancreate(sizeof(char*), 0);
+
+	qlock(r);
+
+	newmsg(r, AMF0Command, Type0, CSCtl);
+	notransaction(r);
+	putstr("publish");
+	putnum(0);
+	putnull();
+	putstr(name);
+	putstr(pubtype2s[type]);
+
+	r->o.msg.cmd.cb = streampublished;
+	r->o.msg.cmd.aux = c;
+	r->o.msg.sid = sid;
+	n = rtmpsend(r);
+
+	qunlock(r);
+
+	e = nil;
+	n = (n == 0 && recv(c, &e) == 1) ? 0 : -1;
+	chanfree(c);
+
+	if(e != nil){
+		werrstr("%s", e);
+		free(c);
+	}
+
+	return (n == 0 && e == nil) ? 0 : -1;
+}
+
+static void
 connected(RTMP *r, int ok, A₀ *a[NumCb], void *)
 {
-	if(strcmp(a[CbCommand]->str, "_result") != 0)
-		sendp(r->c, smprint("expected '_result', got %#q", a[CbCommand]->str));
-	else{
-		sendp(r->c, ok ? nil : smprint("%A", a[CbResponse]));
-		if(ok)
+	char *s;
+
+	s = nil;
+	if(ok){
+		if(strcmp(a[CbCommand]->str, "_result") != 0)
+			s = smprint("expected '_result', got %#q", a[CbCommand]->str);
+		else
 			setchunksz(r, ChunkDesired);
+	}else{
+		s = smprint("%A", a[CbResponse]);
 	}
+
+	sendp(r->c, s);
 }
 
 static int
@@ -746,6 +838,7 @@
 		*s = 0;
 		path = s+1;
 	}else{
+		*s = 0;
 		path = nil;
 	}
 
@@ -753,7 +846,7 @@
 		goto err;
 
 	r = ecalloc(1, sizeof(*r));
-	r->i = f;
+	r->fd = f;
 	r->chunkin = ChunkDefault;
 	r->chunkout = ChunkDefault;
 	r->tcurl = url;
@@ -761,7 +854,8 @@
 	r->c = chancreate(sizeof(void*), 0);
 	r->app = estrdup(app);
 	r->path = path == nil ? nil : estrdup(path);
-	bextend(r, Bufsz);
+	bextend(&r->i, Bufsz);
+	bextend(&r->o, Bufsz);
 	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
 
 	if(connect(r) != 0)
@@ -794,8 +888,8 @@
 {
 	if(r == nil)
 		return;
-	if(r->i >= 0)
-		close(r->i);
+	if(r->fd >= 0)
+		close(r->fd);
 	if(r->c != nil)
 		chanclose(r->c);
 }
--- a/rtmp.h
+++ b/rtmp.h
@@ -4,5 +4,12 @@
 
 int rtmpstream(RTMP *r, ulong *sid);
 
+enum {
+	PubLive,
+	PubAppend,
+	PubRecord,
+};
+int rtmppublish(RTMP *r, ulong sid, int type, char *name);
+
 RTMP *rtmpdial(char *url);
 void rtmpclose(RTMP *r);