shithub: rtmp

Download patch

ref: d36842808af48e19a4335fb513801cd9c7ea6464
parent: 8fb3df30a16cc34dcb10c3c068ddca913fb71047
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Wed Jul 28 09:34:14 EDT 2021

implement proper amf parsing & pretty printing; rpc

--- a/amf.c
+++ b/amf.c
@@ -1,6 +1,7 @@
 #include <u.h>
 #include <libc.h>
 #include "amf.h"
+#include "util.h"
 
 enum {
 	Anum,
@@ -12,11 +13,11 @@
 	Alstr = 12,
 };
 
-#define atleast(x) do{ \
+#define atleast(what, x) do{ \
 	if(p == nil) \
 		return nil; \
 	if(e-p < x){ \
-		werrstr("buffer short"); \
+		werrstr(what ": buffer short: expected %d, have %d", x, (int)(e-p)); \
 		return nil; \
 	} \
 }while(0)
@@ -24,7 +25,7 @@
 u8int *
 amfbool(u8int *p, u8int *e, int v)
 {
-	atleast(2);
+	atleast("bool", 2);
 	*p++ = Abool;
 	*p++ = !!v;
 	return p;
@@ -33,7 +34,7 @@
 u8int *
 amfbyte(u8int *p, u8int *e, u8int byte)
 {
-	atleast(1);
+	atleast("byte", 1);
 	*p++ = byte;
 	return p;
 }
@@ -41,7 +42,7 @@
 u8int *
 amfi16(u8int *p, u8int *e, s16int i)
 {
-	atleast(2);
+	atleast("i16", 2);
 	*p++ = i >> 8;
 	*p++ = i;
 	return p;
@@ -50,7 +51,7 @@
 u8int *
 amfi24(u8int *p, u8int *e, s32int i)
 {
-	atleast(3);
+	atleast("i24", 3);
 	*p++ = i >> 16;
 	*p++ = i >> 8;
 	*p++ = i;
@@ -60,7 +61,7 @@
 u8int *
 amfi32(u8int *p, u8int *e, s32int i)
 {
-	atleast(4);
+	atleast("i32", 4);
 	*p++ = i >> 24;
 	*p++ = i >> 16;
 	*p++ = i >> 8;
@@ -76,7 +77,7 @@
 		u64int u;
 	}x;
 
-	atleast(1+8);
+	atleast("num", 1+8);
 	*p++ = Anum;
 	x.v = v;
 	*p++ = x.u >> 56;
@@ -98,12 +99,12 @@
 
 	n = strlen(s);
 	if(n <= 0xffff){
-		atleast(1+2+n);
+		atleast("str", 1+2+n);
 		*p++ = Astr;
 		return (u8int*)memmove(amfi16(p, e, n), s, n) + n;
 	}
 
-	atleast(1+4+n);
+	atleast("lstr", 1+4+n);
 	*p++ = Alstr;
 	return (u8int*)memmove(amfi32(p, e, n), s, n) + n;
 }
@@ -135,7 +136,7 @@
 		werrstr("string too long");
 		return nil;
 	}
-	atleast(2+n);
+	atleast("kv", 2+n);
 	p = amfi16(p, e, n);
 
 	return (u8int*)memmove(p, name, n) + n;
@@ -162,7 +163,7 @@
 u8int *
 amfbyteget(u8int *p, u8int *e, u8int *byte)
 {
-	atleast(1);
+	atleast("byte", 1);
 	*byte = *p;
 	return p+1;
 }
@@ -170,7 +171,7 @@
 u8int *
 amfi16get(u8int *p, u8int *e, s16int *i)
 {
-	atleast(2);
+	atleast("i16", 2);
 	*i = p[0]<<8 | p[1];
 	return p+2;
 }
@@ -178,7 +179,7 @@
 u8int *
 amfi24get(u8int *p, u8int *e, s32int *i)
 {
-	atleast(3);
+	atleast("i24", 3);
 	*i = p[0]<<16 | p[1]<<8 | p[2];
 	return p+3;
 }
@@ -186,92 +187,180 @@
 u8int *
 amfi32get(u8int *p, u8int *e, s32int *i)
 {
-	atleast(4);
+	atleast("i32", 4);
 	*i = p[0]<<16 | p[1]<<16 | p[2]<<8 | p[3];
 	return p+4;
 }
 
+void
+amffree(Amf *a)
+{
+	int i;
+
+	if(a == nil)
+		return;
+
+	switch(a->type){
+	case Tstr:
+		free(a->str);
+		break;
+	case Tobj:
+		for(i = 0; i < a->obj.n; i++){
+			free(a->obj.k[i]);
+			amffree(a->obj.v[i]);
+		}
+		break;
+	case Tarr:
+		for(i = 0; i < a->arr.n; i++)
+			amffree(a->arr.v[i]);
+	case Tnum:
+	case Tbool:
+		break;
+	default:
+		sysfatal("unknown amf type %d", a->type);
+	}
+
+	free(a);
+}
+
 u8int *
-amffmt(Fmt *f, u8int *p, u8int *e, int one)
+amfparse(Amf **amf, u8int *p, u8int *e)
 {
+	s16int s16;
 	union {
 		double v;
 		u64int u;
 	}x;
-	s16int s16;
 	int n;
+	Amf *a;
 
-	for(; p != e;){
-		atleast(1);
+	atleast("type", 1);
 
-		switch(*p++){
-		case Anum:
-			atleast(8);
-			x.u = (uvlong)p[0]<<56 | (uvlong)p[1]<<48 | (uvlong)p[2]<<40 | (uvlong)p[3]<<32;
-			x.u |= p[4]<<24 | p[5]<<16 | p[6]<<8 | p[7];
-			fmtprint(f, "%g", x.v);
-			p += 8;
-			break;
-		case Abool:
-			atleast(1);
-			fmtprint(f, *p ? "true" : "false");
-			p++;
-			break;
-		case Astr:
+	a = ecalloc(1, sizeof(Amf));
+	*amf = nil;
+
+	switch(*p++){
+	case Anum:
+		atleast("num", 8);
+		for(n = 0, x.u = 0; n < 8; n++)
+			x.u = x.u<<8 | *p++;
+		a->type = Tnum;
+		a->num = x.v;
+		break;
+	case Abool:
+		atleast("bool", 1);
+		a->type = Tbool;
+		a->bool = *p++;
+		break;
+	case Alstr:
+		if((p = amfi32get(p, e, &n)) == nil)
+			return nil;
+		if(0){
+	case Astr:
 			if((p = amfi16get(p, e, &s16)) == nil)
 				return nil;
 			n = s16;
-String:
-			atleast(n);
-			/* FIXME this isn't correct - UTF-8 */
-			fmtprint(f, "%.*#q", n, (char*)p);
-			p += n;
-			break;
-		case Aobj:
-			fmtprint(f, "O{");
-			for(;;){
-				if((p = amfi16get(p, e, &s16)) == nil)
-					return nil;
-				if(s16 == 0){
-					atleast(1);
-					if(*p != Aend){
-						werrstr("object doesn't end well");
-						return nil;
-					}
-					p++;
-					fmtprint(f, "}");
+		}
+		atleast("str", n);
+		a->str = estrndup(p, n);
+		p += n;
+		break;
+	case Aobj:
+		atleast("obj", 3); /* Aend should be there anyway */
+		a->type = Tobj;
+		for(a->obj.n = 0;;){
+			if((p = amfi16get(p, e, &s16)) == nil)
+				break;
+			if(s16 == 0){
+				atleast("obj end?", 1);
+				if(*p != Aend){
+					werrstr("object doesn't end well");
+					p = nil;
 					break;
 				}
-				n = s16;
-				atleast(n);
-				fmtprint(f, "%.*s=", n, (char*)p);
-				p += n;
-				p = amffmt(f, p, e, 1);
-				fmtprint(f, " ");
+				p++;
+				break;
 			}
-			break;
-		case Aarr:
-			fmtprint(f, "A{");
-			atleast(4);
-			p += 4;
-			break;
-		case Aend:
-			fmtprint(f, "}");
-			break;
-		case Alstr:
-			if((p = amfi32get(p, e, &n)) == nil)
-				return nil;
-			goto String;
-		default:
-			fmtprint(f, "?%d?", p[-1]);
-			break;
+			n = s16;
+			atleast("obj key", n);
+			a->obj.n++;
+			a->obj.k = erealloc(a->obj.k, sizeof(*a->obj.k)*a->obj.n);
+			a->obj.v = erealloc(a->obj.v, sizeof(*a->obj.v)*a->obj.n);
+			a->obj.k[a->obj.n-1] = estrndup(p, n);
+			p += n;
+			if((p = amfparse(&a->obj.v[a->obj.n-1], p, e)) == nil){
+				werrstr("obj val: %r");
+				break;
+			}
 		}
-
-		if(one)
+		break;
+	case Aarr:
+		a->type = Tarr;
+		if((p = amfi32get(p, e, &a->arr.n)) == nil)
 			break;
-		if(p != e)
-			fmtprint(f, " ");
+		a->arr.v = emalloc(sizeof(*a->arr.v)*a->arr.n);
+		for(n = 0; n < a->arr.n; n++){
+			if((p = amfparse(&a->arr.v[n], p, e)) == nil){
+				werrstr("arr el: %r");
+				break;
+			}
+		}
+		if((p = amfi24get(p, e, &n)) == nil || n != Aend){
+			p = nil;
+			werrstr("array doesn't end with Aend");
+		}
+		break;
+	case Aend:
+		p = nil;
+		werrstr("unexpected Aend");
+		break;
+	default:
+		werrstr("unexpected type %d", p[-1]);
+		p = nil;
+		break;
 	}
 
+	if(p == nil)
+		amffree(a);
+	else
+		*amf = a;
+
 	return p;
+}
+
+int
+amffmt(Fmt *f)
+{
+	Amf *a;
+	int i;
+
+	a = va_arg(f->args, Amf*);
+
+	switch(a->type){
+	case Tstr:
+		fmtprint(f, "%#q", a->str);
+		break;
+	case Tobj:
+		fmtprint(f, "{");
+		for(i = 0; i < a->obj.n; i++)
+			fmtprint(f, "%s%q:%A", i > 0 ? ", " : "", a->obj.k[i], a->obj.v[i]);
+		fmtprint(f, "}");
+		break;
+	case Tarr:
+		fmtprint(f, "[");
+		for(i = 0; i < a->arr.n; i++)
+			fmtprint(f, "%s%A", i > 0 ? ", " : "", a->arr.v[i]);
+		fmtprint(f, "]");
+		break;
+	case Tnum:
+		fmtprint(f, "%g", a->num);
+		break;
+	case Tbool:
+		fmtprint(f, a->bool ? "true" : "false");
+		break;
+	default:
+		sysfatal("unknown amf type %d", a->type);
+	}
+
+	return 0;
 }
--- a/amf.h
+++ b/amf.h
@@ -1,3 +1,31 @@
+enum {
+	Tstr,
+	Tnum,
+	Tbool,
+	Tarr,
+	Tobj,
+};
+
+typedef struct Amf Amf;
+
+struct Amf {
+	int type;
+	union {
+		char *str;
+		double num;
+		u8int bool;
+		struct {
+			Amf **v;
+			int n;
+		}arr;
+		struct {
+			char **k;
+			Amf **v;
+			int n;
+		}obj;
+	};
+};
+
 u8int *amfi16(u8int *p, u8int *e, s16int i);
 u8int *amfbool(u8int *p, u8int *e, int v);
 u8int *amfbyte(u8int *p, u8int *e, u8int byte);
@@ -17,4 +45,8 @@
 u8int *amfi24get(u8int *p, u8int *e, s32int *i);
 u8int *amfi32get(u8int *p, u8int *e, s32int *i);
 
-u8int *amffmt(Fmt *f, u8int *p, u8int *e, int one);
+u8int *amfparse(Amf **a, u8int *p, u8int *e);
+void amffree(Amf *a);
+
+#pragma varargck type "A" Amf*
+int amffmt(Fmt *f);
--- a/main.c
+++ b/main.c
@@ -5,6 +5,7 @@
 #include "ivf.h"
 #include "flv.h"
 #include "rtmp.h"
+#include "util.h"
 
 int mainstacksize = 65536;
 int debug = 0;
@@ -33,9 +34,6 @@
 	case 'd':
 		debug++;
 		break;
-	case 'D':
-		rtmpdump++;
-		break;
 	case 'a':
 		if((a = Bopen(EARGF(usage()), OREAD)) == nil)
 			sysfatal("%r");
@@ -63,8 +61,7 @@
 	threadexitsall(nil);
 
 	bufsz = 65536;
-	if((b = malloc(bufsz)) == nil)
-		sysfatal("memory");
+	b = emalloc(bufsz);
 	e = b + bufsz;
 
 	if((p = flvscript(b, e, ivf.w, ivf.h, a != nil)) == nil)
@@ -79,8 +76,7 @@
 		if(bufsz < f.sz+64){
 			free(b);
 			bufsz *= 2;
-			if((b = malloc(bufsz)) == nil)
-				sysfatal("memory");
+			b = emalloc(bufsz);
 			e = b + bufsz;
 		}
 		ns = ivfns(&ivf, f.ts);
--- a/mkfile
+++ b/mkfile
@@ -15,6 +15,7 @@
 	ivf.$O\
 	main.$O\
 	rtmp.$O\
+	util.$O\
 
 default:V: all
 
--- a/rtmp.c
+++ b/rtmp.c
@@ -6,6 +6,7 @@
 #include "amf.h"
 #include "ivf.h"
 #include "rtmp.h"
+#include "util.h"
 
 #define min(a,b) ((a)<(b)?(a):(b))
 
@@ -21,17 +22,24 @@
 
 	ChanCtl = 3,
 
+	CbWhat = 0,
+	CbInvoke,
+	CbData,
+	CbStatus,
+	NumCbA,
+
 	PktChunkSz = 1,
-	PktBytesReadReport,
-	PktControl,
-	PktServerBW,
+	PktBytesReadReport = 3,
+	PktControl = 4,
+	PktServerBW = 5,
 	PktClientBW,
 	PktAudio = 8,
 	PktVideo,
 	PktFlexStreamSend = 15,
 	PktFlexSharedObj,
+	PkgFlexMessage,
 	PktFlexInfo,
-	PktSharedObj,
+	PktSharedObj = 19,
 	PktInvoke = 20,
 	PktFlashVideo = 22,
 
@@ -39,8 +47,17 @@
 	Bufsz = 64*1024,
 };
 
+typedef struct Invoke Invoke;
 typedef struct Packet Packet;
 
+struct Invoke {
+	void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux);
+	void *aux;
+	int n;
+
+	Invoke *prev, *next;
+};
+
 struct Packet {
 	int type;
 	int ht;
@@ -48,23 +65,25 @@
 	u32int ts;
 	u8int *data;
 	int sz;
-	int left;
-	Packet *prev;
-	Packet *next;
+	Invoke invoke;
 };
 
 struct RTMP {
 	Biobufhdr;
+	Channel *c;
 	char *app;
 	char *path;
 	char *tcurl;
 	Packet pk;
-	Packet *ch;
 	u8int *b, *p, *e;
+	int chunk;
 	int mode;
 	int bsz;
-	int invokes;
 	int i;
+	struct {
+		int n;
+		Invoke *w;
+	}invokes;
 	u8int biobuf[Biobufsz];
 };
 
@@ -80,6 +99,12 @@
 #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
 #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
 
+#define putinvoke(name) do { \
+	putstr(name); \
+	putnum(r->pk.invoke.n); \
+	putobj(); \
+}while(0)
+
 static szs[] = {
 	[SzTiny] = 1,
 	[SzSmall] = 4,
@@ -103,7 +128,6 @@
 	[PktFlashVideo] = "FlashVideo",
 };
 
-int rtmpdump = 0;
 extern int debug;
 
 #pragma varargck type "T" int
@@ -125,51 +149,28 @@
 static int
 pkfmt(Fmt *f)
 {
+	u8int *s, *e;
 	Packet *p;
+	Amf *a;
 
 	p = va_arg(f->args, Packet*);
 
-	fmtprint(f, "type=%T chan=%d ts=%ud sz=%d: ", p->type, p->chan, p->ts, p->sz);
+	fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
+	if(p->type == PktInvoke){
+		fmtprint(f, ":");
+		for(s = p->data, e = s + p->sz; s != nil && s != e;){
+			if((s = amfparse(&a, s, e)) != nil)
+				fmtprint(f, " %A", a);
+			else
+				fmtprint(f, " %r");
+			amffree(a);
+		}
+	}
 
-	if(p->type == PktInvoke)
-		amffmt(f, p->data, p->data+p->sz, 0);
-
 	return 0;
 }
 
-static Packet *
-pk4chan(RTMP *r, int chan)
-{
-	Packet *p;
-
-	for(p = r->ch; p != nil && p->chan != chan; p = p->next);
-
-	if(p == nil){
-		if((p = calloc(1, sizeof(*p))) == nil)
-			sysfatal("memory");
-		p->type = -1;
-		p->chan = chan;
-		if((p->next = r->ch) != nil)
-			r->ch->prev = p;
-	}
-
-	return p;
-}
-
 static void
-pkfree(RTMP *r, Packet *p)
-{
-	if(p->prev != nil)
-		p->prev->next = p->next;
-	if(p->next != nil)
-		p->next->prev = p->prev;
-	if(r->ch == p)
-		r->ch = p->next;
-
-	free(p->data);
-}
-
-static void
 newpacket(RTMP *r, int type, int ht, int chan)
 {
 	memset(&r->pk, 0, sizeof(r->pk));
@@ -178,6 +179,8 @@
 	r->pk.ht = ht;
 	r->pk.chan = chan;
 	r->p = r->b;
+	if(type == PktInvoke)
+		r->pk.invoke.n = ++r->invokes.n;
 }
 
 static void
@@ -188,8 +191,7 @@
 	if(r->bsz >= bsz)
 		return;
 	ob = r->b;
-	if((r->b = realloc(r->b, bsz*2)) == nil)
-		sysfatal("memory");
+	r->b = erealloc(r->b, bsz*2);
 	if(ob != nil)
 		r->p = r->b + (intptr)(ob - r->p);
 	r->bsz = bsz*2;
@@ -274,7 +276,7 @@
 	r->pk.data = h;
 	r->pk.sz = bodysz;
 	for(;;){
-		n = min(bodysz, Chunk);
+		n = min(bodysz, r->chunk);
 		if(readn(r->i, h, n) != n)
 			goto err;
 		bodysz -= n;
@@ -288,8 +290,6 @@
 			goto err;
 		}
 	}
-	if(rtmpdump)
-		write(1, r->pk.data, r->pk.sz);
 
 	if(debug)
 		fprint(2, "→ %P\n", &r->pk);
@@ -304,9 +304,13 @@
 static int
 rtmpsend(RTMP *r)
 {
-	int bodysz, n, hsz;
 	u8int *p, *h, *e, hdata[32];
+	int bodysz, n, hsz;
+	Invoke *i;
 
+	if(r->p == nil)
+		goto err;
+
 	r->pk.data = r->b;
 	r->pk.sz = r->p - r->b;
 	/* FIXME special case when bodysz is 0 */
@@ -328,22 +332,16 @@
 	memset(h, 0, e-h);
 	if(Bwrite(r, hdata, h-hdata) < 0)
 		goto err;
-	if(rtmpdump)
-		write(1, hdata, h-hdata);
 
 	for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
-		n = min(bodysz, Chunk);
+		n = min(bodysz, r->chunk);
 		if(Bwrite(r, p, n) < 0)
 			goto err;
-		if(rtmpdump)
-			write(1, p, n);
 		p += n;
 		bodysz -= n;
 		if(bodysz > 0){
 			*h = r->pk.chan | SzTiny<<6;
 			Bputc(r, *h);
-			if(rtmpdump)
-				write(1, h, 1);
 		}
 	}
 
@@ -352,6 +350,15 @@
 	if(debug)
 		fprint(2, "← %P\n", &r->pk);
 
+	if(r->pk.type == PktInvoke){
+		i = emalloc(sizeof(*i));
+		*i = r->pk.invoke;
+		assert(i->cb != nil);
+		if((i->next = r->invokes.w) != nil)
+			i->next->prev = i;
+		r->invokes.w = i;
+	}
+
 	return 0;
 err:
 	werrstr("rtmpsend: %r");
@@ -358,13 +365,17 @@
 	return -1;
 }
 
+static void
+connected(RTMP *r, int ok, Amf *a[NumCbA], void *)
+{
+	sendp(r->c, ok ? nil : smprint("%A", a[CbStatus]));
+}
+
 static int
 connect(RTMP *r)
 {
 	newpacket(r, PktInvoke, SzLarge, ChanCtl);
-	putstr("connect");
-		putnum(++r->invokes);
-	putobj();
+	putinvoke("connect");
 		putkvstr("app", r->app);
 		putkvstr("tcUrl", r->tcurl);
 		if(r->mode & OWRITE)
@@ -377,11 +388,127 @@
 			putkvnum("videoFunction", 1);
 		}
 	putend();
+	r->pk.invoke.cb = connected;
 
 	return rtmpsend(r);
 }
 
-RTMP *
+static void
+rtmpfree(RTMP *r)
+{
+	free(r->app);
+	free(r->b);
+	free(r->path);
+	free(r->tcurl);
+	if(r->c != nil){
+		sendp(r->c, "done");
+		chanfree(r->c);
+	}
+	Bterm(r);
+	free(r);
+}
+
+static void
+loop(void *aux)
+{
+	int res, n, ok;
+	Amf *a[NumCbA];
+	u8int *s, *e;
+	Packet *p;
+	Invoke *i;
+	RTMP *r;
+
+	r = aux;
+	p = &r->pk;
+	res = 0;
+	memset(a, 0, sizeof(a));
+	for(;;){
+		for(n = 0; n < nelem(a); n++)
+			amffree(a[n]);
+		memset(a, 0, sizeof(a));
+
+		if(res != 0 || (res = rtmprecv(r)) != 0){
+			if(debug)
+				fprint(2, "rtmp loop: %r\n");
+			for(n = 0; n < nelem(a); n++)
+				amffree(a[n]);
+			break;
+		}
+
+		switch(p->type){
+		case PktInvoke:
+			i = nil;
+			ok = 0;
+			s = r->pk.data;
+			e = s + r->pk.sz;
+			for(n = 0; n < NumCbA; n++){
+				if((s = amfparse(&a[n], s, e)) == nil)
+					goto err;
+				switch(n){
+				case CbWhat:
+					if(a[n]->type != Tstr){
+						werrstr("invoke a[%d] not a string", n);
+						goto err;
+					}
+					if(strcmp(a[n]->str, "_result") == 0)
+						ok = 1;
+					else if(strcmp(a[n]->str, "_error") == 0)
+						ok = 0;
+					else{
+						werrstr("unexpected a[%d]: %#q", n, a[n]->str);
+						goto err;
+					}
+					break;
+				case CbInvoke:
+					if(a[n]->type != Tnum){
+						werrstr("invoke a[%d] not a number", n);
+						goto err;
+					}
+					for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next);
+					if(i == nil){
+						werrstr("did not expect invoke %d result", (int)a[n]->num);
+						goto err;
+					}
+					break;
+				}
+			}
+			if(i->prev != nil)
+				i->prev->next = i->next;
+			if(i->next != nil)
+				i->next->prev = i->prev;
+			if(r->invokes.w == i)
+				r->invokes.w = i->next;
+			i->cb(r, ok, a, i->aux);
+			free(i);
+			break;
+
+		case PktChunkSz:
+			if(amfi32get(r->pk.data, r->pk.data+r->pk.sz, &r->chunk) == nil)
+				goto err;
+			break;
+		case PktBytesReadReport:
+		case PktControl:
+		case PktServerBW:
+		case PktClientBW:
+		case PktAudio:
+		case PktVideo:
+		case PktFlexStreamSend:
+		case PktFlexSharedObj:
+		case PktFlexInfo:
+		case PktSharedObj:
+		case PktFlashVideo:
+			break;
+err:
+			res = -1;
+			break;
+		}
+	}
+
+	rtmpfree(r);
+	threadexitsall(res == 0 ? nil : "error");
+}
+
+RTMP *
 rtmpdial(char *url, int w, int h, int withaudio)
 {
 	char *s, *e, *path, *app;
@@ -388,6 +515,7 @@
 	int f, port, ctl;
 	RTMP *r;
 
+	fmtinstall('A', amffmt);
 	fmtinstall('T', pktypefmt);
 	fmtinstall('P', pkfmt);
 	quotefmtinstall();
@@ -394,7 +522,7 @@
 
 	r = nil;
 	f = -1;
-	url = strdup(url); /* since we're changing it in-place */
+	url = estrdup(url); /* since we're changing it in-place */
 	if(memcmp(url, "rtmp://", 7) != 0){
 		werrstr("invalid url");
 		goto err;
@@ -440,24 +568,38 @@
 
 	if(handshake(f) != 0)
 		goto err;
-	if((r = calloc(1, sizeof(*r))) == nil)
-		sysfatal("memory");
-	if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil))
-		sysfatal("memory");
-	bextend(r, Bufsz);
+
+	r = ecalloc(1, sizeof(*r));
+	r->i = f;
+	r->chunk = Chunk;
 	r->tcurl = url;
+	url = nil;
+	r->c = chancreate(sizeof(void*), 0);
+	r->app = estrdup(app);
+	r->path = path == nil ? nil : estrdup(path);
+	bextend(r, Bufsz);
 	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
-	r->i = f;
-	if(connect(r) != 0 || rtmprecv(r) != 0)
+
+	if(connect(r) != 0)
 		goto err;
+	if(proccreate(loop, r, mainstacksize) < 0)
+		goto err;
 
+	/* wait for the connect call to finish */
+	if((s = recvp(r->c)) != nil){
+		rtmpclose(r);
+		werrstr("rtmpdial: %s", s);
+		free(s);
+		r = nil;
+	}
+
 	return r;
 
 err:
 	werrstr("rtmpdial: %r");
 	if(r != nil)
-		rtmpclose(r);
-	else if(f >= 0)
+		rtmpfree(r);
+	if(f >= 0)
 		close(f);
 	free(url);
 	return nil;
@@ -468,9 +610,8 @@
 {
 	if(r == nil)
 		return;
-	free(r->path);
-	free(r->b);
-	close(r->i);
-	Bterm(r);
-	free(r);
+	if(r->i >= 0)
+		close(r->i);
+	if(r->c != nil)
+		chanclose(r->c);
 }
--- a/rtmp.h
+++ b/rtmp.h
@@ -2,7 +2,5 @@
 
 #pragma incomplete RTMP
 
-extern int rtmpdump;
-
 RTMP *rtmpdial(char *url, int w, int h, int withaudio);
 void rtmpclose(RTMP *r);
--- /dev/null
+++ b/util.c
@@ -1,0 +1,62 @@
+#include <u.h>
+#include <libc.h>
+#include "util.h"
+
+void *
+emalloc(usize sz)
+{
+	void *p;
+
+	assert((p = malloc(sz)) != nil);
+	setmalloctag(p, getcallerpc(&sz));
+
+	return p;
+}
+
+void *
+ecalloc(ulong nel, usize elsz)
+{
+	void *p;
+
+	assert((p = calloc(nel, elsz)) != nil);
+	setmalloctag(p, getcallerpc(&nel));
+
+	return p;
+}
+
+void *
+erealloc(void *p, usize sz)
+{
+	void *np;
+
+	np = realloc(p, sz);
+	assert((sz == 0 && np == nil) || np != nil);
+	if(np != nil)
+		setrealloctag(np, getcallerpc(&p));
+
+	return np;
+}
+
+char *
+estrndup(void *d, int n)
+{
+	char *s;
+
+	s = emalloc(n+1);
+	memmove(s, d, n);
+	s[n] = 0;
+
+	return s;
+}
+
+char *
+estrdup(char *s)
+{
+	char *ns;
+
+	ns = strdup(s);
+	assert(ns != nil);
+	setmalloctag(ns, getcallerpc(&s));
+
+	return ns;
+}
--- /dev/null
+++ b/util.h
@@ -1,0 +1,5 @@
+void *emalloc(usize sz);
+void *ecalloc(ulong nel, usize elsz);
+void *erealloc(void *p, usize sz);
+char *estrndup(void *d, int n);
+char *estrdup(char *s);