shithub: rtmp

Download patch

ref: c296529d65687f301af4f36aa4db5a0978a6f059
parent: 0ec0e4fde657c442431548180c91018078c2bf3c
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Thu Jul 22 10:28:45 EDT 2021

more rtmp logic

--- a/amf.c
+++ b/amf.c
@@ -4,23 +4,46 @@
 
 enum {
 	Anum,
+	Abool,
+	Astr,
+	Aobj,
 	Aarr = 8,
 	Aend,
 	Alstr = 12,
 };
 
+#define atleast(x) do{ \
+	if(p == nil) \
+		return nil; \
+	if(e-p < x){ \
+		werrstr("buffer short"); \
+		return nil; \
+	} \
+}while(0)
+
 u8int *
+amfbool(u8int *p, u8int *e, int v)
+{
+	atleast(2);
+	*p++ = Abool;
+	*p++ = !!v;
+	return p;
+}
+
+u8int *
+amfbyte(u8int *p, u8int *e, u8int byte)
+{
+	atleast(1);
+	*p++ = byte;
+	return p;
+}
+
+u8int *
 amfi16(u8int *p, u8int *e, s16int i)
 {
-	if(p == nil)
-		return nil;
-	if(e-p < 2){
-		werrstr("buffer short");
-		return nil;
-	}
+	atleast(2);
 	*p++ = i >> 8;
 	*p++ = i;
-
 	return p;
 }
 
@@ -27,16 +50,10 @@
 u8int *
 amfi24(u8int *p, u8int *e, s32int i)
 {
-	if(p == nil)
-		return nil;
-	if(e-p < 3){
-		werrstr("buffer short");
-		return nil;
-	}
+	atleast(3);
 	*p++ = i >> 16;
 	*p++ = i >> 8;
 	*p++ = i;
-
 	return p;
 }
 
@@ -43,17 +60,11 @@
 u8int *
 amfi32(u8int *p, u8int *e, s32int i)
 {
-	if(p == nil)
-		return nil;
-	if(e-p < 4){
-		werrstr("buffer short");
-		return nil;
-	}
+	atleast(4);
 	*p++ = i >> 24;
 	*p++ = i >> 16;
 	*p++ = i >> 8;
 	*p++ = i;
-
 	return p;
 }
 
@@ -65,12 +76,8 @@
 		u64int u;
 	}x;
 
-	if(p == nil)
-		return nil;
-	if(p+8 > e){
-		werrstr("buffer short");
-		return nil;
-	}
+	atleast(1+8);
+	*p++ = Anum;
 	x.v = v;
 	*p++ = x.u >> 56;
 	*p++ = x.u >> 48;
@@ -85,59 +92,64 @@
 }
 
 u8int *
-amfkvnum(u8int *p, u8int *e, char *name, double v)
+amfstr(u8int *p, u8int *e, char *s)
 {
 	int n;
 
-	if(p == nil)
-		return nil;
-	if((n = strlen(name)) > 0xffff){
-		werrstr("string too long");
-		return nil;
-	}
-	if(p+2+n+8 > e){
-		werrstr("buffer short");
-		return nil;
-	}
-	p = amfi16(p, e, n);
-	p = (u8int*)memmove(p, name, n) + n;
+	n = strlen(s);
+	atleast(1+4+n);
+	*p++ = Alstr;
 
-	return amfnum(p, e, v);
+	return (u8int*)memmove(amfi32(p, e, n), s, n) + n;
 }
 
 u8int *
-amfstr(u8int *p, u8int *e, char *s)
+amfarr(u8int *p, u8int *e)
 {
+	return amfbyte(p, e, Aarr);
+}
+
+u8int *
+amfobj(u8int *p, u8int *e)
+{
+	return amfbyte(p, e, Aobj);
+}
+
+u8int *
+amfend(u8int *p, u8int *e)
+{
+	return amfi24(p, e, Aend);
+}
+
+static u8int *
+amfkv(u8int *p, u8int *e, char *name)
+{
 	int n;
 
-	if(p == nil)
-		return nil;
-	n = strlen(s);
-	if(p+1+4+n > e){
+	if((n = strlen(name)) > 0xffff){
 		werrstr("string too long");
 		return nil;
 	}
-	*p++ = Alstr;
+	atleast(2+n);
+	p = amfi16(p, e, n);
 
-	return (u8int*)memmove(amfi32(p, e, n), s, n) + n;
+	return (u8int*)memmove(p, name, n) + n;
 }
 
 u8int *
-amfarr(u8int *p, u8int *e)
+amfkvbool(u8int *p, u8int *e, char *name, int v)
 {
-	if(p == nil)
-		return nil;
-	if(p == e){
-		werrstr("buffer short");
-		return nil;
-	}
-	*p++ = Aarr;
+	return amfbool(amfkv(p, e, name), e, v);
+}
 
-	return p;
+u8int *
+amfkvnum(u8int *p, u8int *e, char *name, double v)
+{
+	return amfnum(amfkv(p, e, name), e, v);
 }
 
 u8int *
-amfend(u8int *p, u8int *e)
+amfkvstr(u8int *p, u8int *e, char *name, char *v)
 {
-	return amfi24(p, e, Aend);
+	return amfstr(amfkv(p, e, name), e, v);
 }
--- a/amf.h
+++ b/amf.h
@@ -1,8 +1,13 @@
 u8int *amfi16(u8int *p, u8int *e, s16int i);
+u8int *amfbool(u8int *p, u8int *e, int v);
+u8int *amfbyte(u8int *p, u8int *e, u8int byte);
 u8int *amfi24(u8int *p, u8int *e, s32int i);
 u8int *amfi32(u8int *p, u8int *e, s32int i);
 u8int *amfnum(u8int *p, u8int *e, double v);
-u8int *amfkvnum(u8int *p, u8int *e, char *name, double v);
 u8int *amfstr(u8int *p, u8int *e, char *s);
 u8int *amfarr(u8int *p, u8int *e);
+u8int *amfobj(u8int *p, u8int *e);
 u8int *amfend(u8int *p, u8int *e);
+u8int *amfkvnum(u8int *p, u8int *e, char *name, double v);
+u8int *amfkvstr(u8int *p, u8int *e, char *name, char *v);
+u8int *amfkvbool(u8int *p, u8int *e, char *name, int v);
--- a/main.c
+++ b/main.c
@@ -18,8 +18,8 @@
 void
 threadmain(int argc, char **argv)
 {
-	Biobuf *a, *v, o;
 	u8int *b, *p, *e;
+	Biobuf *a, *v;
 	int bufsz;
 	u64int ns;
 	IVFrame f;
@@ -50,7 +50,7 @@
 	if(strcmp(ivf.type, "AVC1") != 0)
 		sysfatal("not H.264");
 	srand(time(nil));
-	if((r = rtmpdial(argv[0])) < 0 || Binit(&o, fd, OWRITE) < 0)
+	if((r = rtmpdial(argv[0], ivf.w, ivf.h, a != nil)) == nil)
 		sysfatal("%r");
 
 	bufsz = 65536;
@@ -58,7 +58,7 @@
 		sysfatal("memory");
 	e = b + bufsz;
 
-	if((p = flvscript(b, e, ivf.w, ivf.h, 0)) == nil || Bwrite(&o, b, p-b) < 0)
+	if((p = flvscript(b, e, ivf.w, ivf.h, a != nil)) == nil)
 		sysfatal("%r");
 
 	memset(&f, 0, sizeof(f));
@@ -77,9 +77,6 @@
 		ns = ivfns(&ivf, f.ts);
 		if((p = flvdata(b, e, ns, ns, f.buf, f.sz, Fvideo, FlHdr)) == nil)
 			sysfatal("video: flvdata: %r");
-		if(Bwrite(&o, b, p-b) < 0)
-			sysfatal("%r");
-		Bflush(&o);
 	}
 
 	threadexitsall(nil);
--- a/rtmp.c
+++ b/rtmp.c
@@ -11,33 +11,76 @@
 	Port = 1935,
 
 	Sigsz = 1536,
+	Chunk = 128,
 
 	ChanCtl = 3,
 
-	SzLarge,
-	SzMedium,
-	SzSmall,
-	SzTiny,
+	SzTiny = 1,
+	SzSmall = 4,
+	SzMedium = 8,
+	SzLarge = 12,
 
 	PktInvoke = 20,
-};
 
-typedef struct RTMPacket RTMPacket;
-
-struct RTMP {
-	int f;
-	int invokes;
+	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
+	Bufsz = 64*1024,
 };
 
-struct RTMPacket {
+typedef struct Packet Packet;
+
+struct Packet {
+	int type;
+	int hsz;
 	int chan;
-	int sztype;
-	int pktype;
 	u32int ts;
 	u8int *data;
 	int sz;
 };
 
+struct RTMP {
+	Biobufhdr;
+	char *app;
+	char *path;
+	char *tcurl;
+	Packet pk;
+	u8int *b, *p, *e;
+	int mode;
+	int bsz;
+	int invokes;
+	int i;
+	u8int biobuf[Biobufsz];
+};
+
+#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
+#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
+#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
+#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
+#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
+#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
+#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
+#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
+#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
+#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
+#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
+
+static void
+newpacket(RTMP *r, int type, int hsz, int chan)
+{
+	memset(&r->pk, 0, sizeof(r->pk));
+
+	r->pk.type = type;
+	r->pk.hsz = hsz;
+	r->pk.chan = chan;
+	r->p = r->b + hsz;
+	r->b[0] = chan;
+
+	switch(hsz){
+	case SzTiny: r->b[0] |= 3<<6; break;
+	case SzSmall: r->b[0] |= 2<<6; break;
+	case SzMedium: r->b[0] |= 1<<6; break;
+	}
+}
+
 static int
 handshake(int f)
 {
@@ -71,32 +114,67 @@
 }
 
 static int
-connect(int f, char *path)
+rtmpsend(RTMP *r)
 {
-	RTMPacket p;
+	u8int *h, *e;
+	int bodysz;
 
-	memset(&p, 0, sizeof(p));
-	p.chan = ChanCtl;
-	p.sztype = SzLarge;
-	p.pktype = PktInvoke;
+	bodysz = r->p - r->b - r->pk.hsz;
+	/* FIXME special case when bodysz is 0 */
+	h = r->b + 1;
+	e = h + r->pk.hsz;
+	if(r->pk.hsz >= SzSmall){
+		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
+		if(r->pk.hsz >= SzMedium){
+			h = amfi24(h, e, bodysz);
+			h = amfbyte(h, e, r->pk.type);
+			if(r->pk.hsz >= SzLarge)
+				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
+		}
+	}
+	if(h == nil)
+		goto err;
+	memset(h, 0, e-h);
 
+	return 0;
+err:
+	werrstr("rtmpsend: %r");
 	return -1;
 }
 
 static int
-rtmpsend(RTMP *r, RTMPacket *p)
+connect(RTMP *r)
 {
-	return -1;
+	newpacket(r, PktInvoke, SzLarge, ChanCtl);
+	putstr("connect");
+		putnum(++r->invokes);
+	putobj();
+		putkvstr("app", r->app);
+		putkvstr("tcUrl", r->tcurl);
+		if(r->mode & OWRITE)
+			putkvstr("type", "nonprivate");
+		else{
+			putkvbool("fpad", 0);
+			putkvnum("capabilities", 15);
+			putkvnum("audioCodecs", 3191);
+			putkvnum("videoCodecs", 252);
+			putkvnum("videoFunction", 1);
+		}
+	putend();
+
+	return rtmpsend(r);
 }
 
 RTMP *
-rtmpdial(char *url)
+rtmpdial(char *url, int w, int h, int withaudio)
 {
-	char *s, *e, *path;
+	char *s, *e, *path, *app;
 	int f, port, ctl;
 	RTMP *r;
 
+	r = nil;
 	f = -1;
+	url = strdup(url); /* since we're changing it in-place */
 	if(memcmp(url, "rtmp://", 7) != 0){
 		werrstr("invalid url");
 		goto err;
@@ -115,6 +193,7 @@
 	}else{
 		path = e;
 	}
+	while(*(++path) == '/');
 
 	s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
 	f = dial(s, nil, nil, &ctl);
@@ -122,18 +201,57 @@
 	if(f < 0)
 		goto err;
 
-	if(handshake(f) != 0 || connect(f, path) == 0)
+	app = path;
+	if((s = strchr(path, '/')) == nil){
+		werrstr("no path");
 		goto err;
+	}
+	if((e = strchr(s+1, '/')) != nil){
+		/* at this point it can be app instance if there is another slash following */
+		if((s = strchr(e+1, '/')) == nil){
+			/* no, just path leftovers */
+			s = e;
+		}
+		*s = 0;
+		path = s+1;
+	}else{
+		path = nil;
+	}
 
-	if((r = calloc(1, sizeof(*r))) == nil)
+	if(handshake(f) != 0)
+		goto err;
+	if((r = calloc(1, sizeof(*r))) == nil || (r->b = malloc(Bufsz)) == nil)
 		sysfatal("memory");
-	r->f = f;
+	if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil))
+		sysfatal("memory");
+	r->tcurl = url;
+	r->bsz = Bufsz;
+	r->e = r->b + r->bsz;
+	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
+	r->i = f;
+	if(connect(r) != 0)
+		goto err;
 
 	return r;
 
 err:
 	werrstr("rtmpdial: %r");
-	if(f >= 0)
+	if(r != nil)
+		rtmpclose(r);
+	else if(f >= 0)
 		close(f);
+	free(url);
 	return nil;
+}
+
+void
+rtmpclose(RTMP *r)
+{
+	if(r == nil)
+		return;
+	free(r->path);
+	free(r->b);
+	close(r->i);
+	Bterm(r);
+	free(r);
 }
--- a/rtmp.h
+++ b/rtmp.h
@@ -2,4 +2,5 @@
 
 #pragma incomplete RTMP
 
-RTMP *rtmpdial(char *url);
+RTMP *rtmpdial(char *url, int w, int h, int withaudio);
+void rtmpclose(RTMP *r);