shithub: rtmp

Download patch

ref: c6821e9c478e8c1e4effdba02cfe7d56a78c3cd9
parent: f6d84677e9cd1cd4725391a577295ff561e09443
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Tue Jul 27 09:11:57 EDT 2021

rework send/recv

--- a/rtmp.c
+++ b/rtmp.c
@@ -17,12 +17,24 @@
 
 	ChanCtl = 3,
 
-	SzTiny = 1,
-	SzSmall = 4,
-	SzMedium = 8,
-	SzLarge = 12,
+	SzLarge = 0,
+	SzMedium,
+	SzSmall,
+	SzTiny,
 
+	PktChunkSz = 1,
+	PktBytesReadReport,
+	PktControl,
+	PktServerBW,
+	PktClientBW,
+	PktAudio = 8,
+	PktVideo,
+	PktFlexStreamSend = 15,
+	PktFlexSharedObj,
+	PktFlexInfo,
+	PktSharedObj,
 	PktInvoke = 20,
+	PktFlashVideo = 22,
 
 	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
 	Bufsz = 64*1024,
@@ -32,7 +44,7 @@
 
 struct Packet {
 	int type;
-	int hsz;
+	int ht;
 	int chan;
 	u32int ts;
 	u8int *data;
@@ -69,9 +81,58 @@
 #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
 #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
 
+static szs[] = {
+	[SzTiny] = 1,
+	[SzSmall] = 4,
+	[SzMedium] = 8,
+	[SzLarge] = 12,
+};
+
+static char *pktype2s[] = {
+	[PktChunkSz] = "ChunkSz",
+	[PktBytesReadReport] = "BytesReadReport",
+	[PktControl] = "Control",
+	[PktServerBW] = "ServerBW",
+	[PktClientBW] = "ClientBW",
+	[PktAudio] = "Audio",
+	[PktVideo] = "Video",
+	[PktFlexStreamSend] = "FlexStreamSend",
+	[PktFlexSharedObj] = "FlexSharedObj",
+	[PktFlexInfo] = "FlexInfo",
+	[PktSharedObj] = "SharedObj",
+	[PktInvoke] = "Invoke",
+	[PktFlashVideo] = "FlashVideo",
+};
+
 int rtmpdump = 0;
 extern int debug;
 
+#pragma varargck type "T" int
+static int
+pktypefmt(Fmt *f)
+{
+	char *s;
+	int t;
+
+	if((t = va_arg(f->args, int)) >= 0 &&
+	   t < nelem(pktype2s) &&
+	   (s = pktype2s[t]) != nil)
+		return fmtprint(f, "%s", s);
+
+	return fmtprint(f, "%d", t);
+}
+
+#pragma varargck type "P" Packet*
+static int
+pkfmt(Fmt *f)
+{
+	Packet *p;
+
+	p = va_arg(f->args, Packet*);
+
+	return fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
+}
+
 static Packet *
 pk4chan(RTMP *r, int chan)
 {
@@ -105,21 +166,14 @@
 }
 
 static void
-newpacket(RTMP *r, int type, int hsz, int chan)
+newpacket(RTMP *r, int type, int ht, int chan)
 {
 	memset(&r->pk, 0, sizeof(r->pk));
 
 	r->pk.type = type;
-	r->pk.hsz = hsz;
+	r->pk.ht = ht;
 	r->pk.chan = chan;
-	r->p = r->b + hsz;
-	r->b[0] = chan;
-
-	switch(hsz){
-	case SzTiny: r->b[0] |= 3<<6; break;
-	case SzSmall: r->b[0] |= 2<<6; break;
-	case SzMedium: r->b[0] |= 1<<6; break;
-	}
+	r->p = r->b;
 }
 
 static void
@@ -134,7 +188,7 @@
 		sysfatal("memory");
 	if(ob != nil)
 		r->p = r->b + (intptr)(ob - r->p);
-	r->bsz *= 2;
+	r->bsz = bsz*2;
 	r->e = r->b + r->bsz;
 }
 
@@ -173,36 +227,35 @@
 static int
 rtmprecv(RTMP *r)
 {
+	int hsz, bodysz, dummy, n;
 	u8int *h, *e, type;
-	int hsz, chan, bodysz, dummy;
 	u32int ts;
 
+	memset(&r->pk, 0, sizeof(r->pk));
+
 	r->p = r->b;
 	if(readn(r->i, r->p, 1) != 1)
 		goto err;
-	hsz = (r->p[0] & 0xc0)>>6;
-	chan = r->p[0] & 0x3f;
-	if(debug)
-		fprint(2, "hsz=%d chan=%d\n", hsz, chan);
+	r->pk.ht = (r->p[0] & 0xc0)>>6;
+	r->pk.chan = r->p[0] & 0x3f;
+	hsz = szs[r->pk.ht];
 	if(readn(r->i, r->p+1, hsz-1) != hsz-1)
 		goto err;
 
-	memset(&r->pk, 0, sizeof(r->pk));
-	r->pk.hsz = hsz;
-	r->pk.chan = chan;
-
 	h = r->p + 1;
 	e = r->p + hsz;
 	r->pk.type = -1;
 	bodysz = 0;
-	if(hsz >= SzSmall){
+	if(hsz >= szs[SzSmall]){
 		h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
-		if(hsz >= SzMedium){
+		if(hsz >= szs[SzMedium]){
 			h = amfi24get(h, e, &bodysz);
 			h = amfbyteget(h, e, &type);
 			r->pk.type = type;
-			if(hsz >= SzLarge)
+			if(hsz >= szs[SzLarge]){
+				dummy = 0;
 				h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
+			}
 		}
 	}
 
@@ -212,11 +265,31 @@
 		h = amfi32get(h, h+4, (s32int*)&ts);
 	}
 
+	/* FIXME do all consecutive chunks use Tiny? */
 	bextend(r, bodysz);
-	if(readn(r->i, h, bodysz) != bodysz)
-		goto err;
-	h += bodysz;
+	r->pk.data = h;
+	r->pk.sz = bodysz;
+	for(;;){
+		n = min(bodysz, Chunk);
+		if(readn(r->i, h, n) != n)
+			goto err;
+		bodysz -= n;
+		h += n;
+		if(bodysz < 1)
+			break;
+		if(readn(r->i, h, 1) != 1)
+			goto err;
+		if((r->pk.chan | SzTiny<<6) != *h){
+			werrstr("chan/size does not match: %02x", *h);
+			goto err;
+		}
+	}
+	if(rtmpdump)
+		write(1, r->pk.data, r->pk.sz);
 
+	if(debug)
+		fprint(2, "→ %P\n", &r->pk);
+
 	return 0;
 
 err:
@@ -228,48 +301,53 @@
 rtmpsend(RTMP *r)
 {
 	int bodysz, n, hsz;
-	u8int *p, *h, *e;
+	u8int *p, *h, *e, hdata[32];
 
-	assert(r->p != nil);
-
-	bodysz = r->p - r->b - r->pk.hsz;
+	r->pk.data = r->b;
+	r->pk.sz = r->p - r->b;
 	/* FIXME special case when bodysz is 0 */
-	h = r->b;
-	e = h + r->pk.hsz;
-	h++;
-	if(r->pk.hsz >= SzSmall){
+
+	hsz = szs[r->pk.ht];
+	h = hdata;
+	e = h + hsz;
+	*h++ = r->pk.ht<<6 | r->pk.chan;
+	if(hsz >= szs[SzSmall]){
 		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
-		if(r->pk.hsz >= SzMedium){
-			h = amfi24(h, e, bodysz);
+		if(hsz >= szs[SzMedium]){
+			h = amfi24(h, e, r->pk.sz);
 			h = amfbyte(h, e, r->pk.type);
-			if(r->pk.hsz >= SzLarge)
+			if(hsz >= szs[SzLarge])
 				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
 		}
 	}
-	if(h == nil)
-		goto err;
+	assert(h != nil);
 	memset(h, 0, e-h);
+	if(Bwrite(r, hdata, h-hdata) < 0)
+		goto err;
+	if(rtmpdump)
+		write(1, hdata, h-hdata);
 
-	p = r->b;
-	hsz = e - r->b;
-	for(; hsz+bodysz > 0;){
+	for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
 		n = min(bodysz, Chunk);
-		if(Bwrite(r, p, hsz+n) < 0)
+		if(Bwrite(r, p, n) < 0)
 			goto err;
 		if(rtmpdump)
-			write(1, p, hsz+n);
+			write(1, p, n);
+		p += n;
 		bodysz -= n;
-		p += hsz+n;
-		hsz = 0;
 		if(bodysz > 0){
-			*(--p) = 0xc0 | r->b[0];
-			hsz = 1;
+			*h = r->pk.chan | SzTiny<<6;
+			Bputc(r, *h);
+			if(rtmpdump)
+				write(1, h, 1);
 		}
 	}
 
-	r->p = nil;
 	Bflush(r);
 
+	if(debug)
+		fprint(2, "← %P\n", &r->pk);
+
 	return 0;
 err:
 	werrstr("rtmpsend: %r");
@@ -305,6 +383,9 @@
 	char *s, *e, *path, *app;
 	int f, port, ctl;
 	RTMP *r;
+
+	fmtinstall('T', pktypefmt);
+	fmtinstall('P', pkfmt);
 
 	r = nil;
 	f = -1;