shithub: rtmp

Download patch

ref: 5a2a972aa72c4b827ea784d91c15fd02adee3f71
parent: c6ee10356946d9bc7b34974cc737acec166f0e6b
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Thu Jul 29 04:56:27 EDT 2021

rewrite according to the rtmp spec

--- a/LICENSE
+++ b/LICENSE
@@ -1,1 +1,1 @@
-This is partially based on librtmp which is LGPLv2.1, same goes for this project.
+Public domain.
--- a/amf.c
+++ /dev/null
@@ -1,366 +1,0 @@
-#include <u.h>
-#include <libc.h>
-#include "amf.h"
-#include "util.h"
-
-enum {
-	Anum,
-	Abool,
-	Astr,
-	Aobj,
-	Aarr = 8,
-	Aend,
-	Alstr = 12,
-};
-
-#define atleast(what, x) do{ \
-	if(p == nil) \
-		return nil; \
-	if(e-p < x){ \
-		werrstr(what ": buffer short: expected %d, have %d", x, (int)(e-p)); \
-		return nil; \
-	} \
-}while(0)
-
-u8int *
-amfbool(u8int *p, u8int *e, int v)
-{
-	atleast("bool", 2);
-	*p++ = Abool;
-	*p++ = !!v;
-	return p;
-}
-
-u8int *
-amfbyte(u8int *p, u8int *e, u8int byte)
-{
-	atleast("byte", 1);
-	*p++ = byte;
-	return p;
-}
-
-u8int *
-amfi16(u8int *p, u8int *e, s16int i)
-{
-	atleast("i16", 2);
-	*p++ = i >> 8;
-	*p++ = i;
-	return p;
-}
-
-u8int *
-amfi24(u8int *p, u8int *e, s32int i)
-{
-	atleast("i24", 3);
-	*p++ = i >> 16;
-	*p++ = i >> 8;
-	*p++ = i;
-	return p;
-}
-
-u8int *
-amfi32(u8int *p, u8int *e, s32int i)
-{
-	atleast("i32", 4);
-	*p++ = i >> 24;
-	*p++ = i >> 16;
-	*p++ = i >> 8;
-	*p++ = i;
-	return p;
-}
-
-u8int *
-amfnum(u8int *p, u8int *e, double v)
-{
-	union {
-		double v;
-		u64int u;
-	}x;
-
-	atleast("num", 1+8);
-	*p++ = Anum;
-	x.v = v;
-	*p++ = x.u >> 56;
-	*p++ = x.u >> 48;
-	*p++ = x.u >> 40;
-	*p++ = x.u >> 32;
-	*p++ = x.u >> 24;
-	*p++ = x.u >> 16;
-	*p++ = x.u >> 8;
-	*p++ = x.u;
-
-	return p;
-}
-
-u8int *
-amfstr(u8int *p, u8int *e, char *s)
-{
-	int n;
-
-	n = strlen(s);
-	if(n <= 0xffff){
-		atleast("str", 1+2+n);
-		*p++ = Astr;
-		return (u8int*)memmove(amfi16(p, e, n), s, n) + n;
-	}
-
-	atleast("lstr", 1+4+n);
-	*p++ = Alstr;
-	return (u8int*)memmove(amfi32(p, e, n), s, n) + n;
-}
-
-u8int *
-amfarr(u8int *p, u8int *e)
-{
-	return amfbyte(p, e, Aarr);
-}
-
-u8int *
-amfobj(u8int *p, u8int *e)
-{
-	return amfbyte(p, e, Aobj);
-}
-
-u8int *
-amfend(u8int *p, u8int *e)
-{
-	return amfi24(p, e, Aend);
-}
-
-static u8int *
-amfkv(u8int *p, u8int *e, char *name)
-{
-	int n;
-
-	if((n = strlen(name)) > 0xffff){
-		werrstr("string too long");
-		return nil;
-	}
-	atleast("kv", 2+n);
-	p = amfi16(p, e, n);
-
-	return (u8int*)memmove(p, name, n) + n;
-}
-
-u8int *
-amfkvbool(u8int *p, u8int *e, char *name, int v)
-{
-	return amfbool(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfkvnum(u8int *p, u8int *e, char *name, double v)
-{
-	return amfnum(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfkvstr(u8int *p, u8int *e, char *name, char *v)
-{
-	return amfstr(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfbyteget(u8int *p, u8int *e, u8int *byte)
-{
-	atleast("byte", 1);
-	*byte = *p;
-	return p+1;
-}
-
-u8int *
-amfi16get(u8int *p, u8int *e, s16int *i)
-{
-	atleast("i16", 2);
-	*i = p[0]<<8 | p[1];
-	return p+2;
-}
-
-u8int *
-amfi24get(u8int *p, u8int *e, s32int *i)
-{
-	atleast("i24", 3);
-	*i = p[0]<<16 | p[1]<<8 | p[2];
-	return p+3;
-}
-
-u8int *
-amfi32get(u8int *p, u8int *e, s32int *i)
-{
-	atleast("i32", 4);
-	*i = p[0]<<16 | p[1]<<16 | p[2]<<8 | p[3];
-	return p+4;
-}
-
-void
-amffree(Amf *a)
-{
-	int i;
-
-	if(a == nil)
-		return;
-
-	switch(a->type){
-	case Tstr:
-		free(a->str);
-		break;
-	case Tobj:
-		for(i = 0; i < a->obj.n; i++){
-			free(a->obj.k[i]);
-			amffree(a->obj.v[i]);
-		}
-		break;
-	case Tarr:
-		for(i = 0; i < a->arr.n; i++)
-			amffree(a->arr.v[i]);
-	case Tnum:
-	case Tbool:
-		break;
-	default:
-		sysfatal("unknown amf type %d", a->type);
-	}
-
-	free(a);
-}
-
-u8int *
-amfparse(Amf **amf, u8int *p, u8int *e)
-{
-	s16int s16;
-	union {
-		double v;
-		u64int u;
-	}x;
-	int n;
-	Amf *a;
-
-	atleast("type", 1);
-
-	a = ecalloc(1, sizeof(Amf));
-	*amf = nil;
-
-	switch(*p++){
-	case Anum:
-		atleast("num", 8);
-		for(n = 0, x.u = 0; n < 8; n++)
-			x.u = x.u<<8 | *p++;
-		a->type = Tnum;
-		a->num = x.v;
-		break;
-	case Abool:
-		atleast("bool", 1);
-		a->type = Tbool;
-		a->bool = *p++;
-		break;
-	case Alstr:
-		if((p = amfi32get(p, e, &n)) == nil)
-			return nil;
-		if(0){
-	case Astr:
-			if((p = amfi16get(p, e, &s16)) == nil)
-				return nil;
-			n = s16;
-		}
-		atleast("str", n);
-		a->str = estrndup(p, n);
-		p += n;
-		break;
-	case Aobj:
-		atleast("obj", 3); /* Aend should be there anyway */
-		a->type = Tobj;
-		for(a->obj.n = 0;;){
-			if((p = amfi16get(p, e, &s16)) == nil)
-				break;
-			if(s16 == 0){
-				atleast("obj end?", 1);
-				if(*p != Aend){
-					werrstr("object doesn't end well");
-					p = nil;
-					break;
-				}
-				p++;
-				break;
-			}
-			n = s16;
-			atleast("obj key", n);
-			a->obj.n++;
-			a->obj.k = erealloc(a->obj.k, sizeof(*a->obj.k)*a->obj.n);
-			a->obj.v = erealloc(a->obj.v, sizeof(*a->obj.v)*a->obj.n);
-			a->obj.k[a->obj.n-1] = estrndup(p, n);
-			p += n;
-			if((p = amfparse(&a->obj.v[a->obj.n-1], p, e)) == nil){
-				werrstr("obj val: %r");
-				break;
-			}
-		}
-		break;
-	case Aarr:
-		a->type = Tarr;
-		if((p = amfi32get(p, e, &a->arr.n)) == nil)
-			break;
-		a->arr.v = emalloc(sizeof(*a->arr.v)*a->arr.n);
-		for(n = 0; n < a->arr.n; n++){
-			if((p = amfparse(&a->arr.v[n], p, e)) == nil){
-				werrstr("arr el: %r");
-				break;
-			}
-		}
-		if((p = amfi24get(p, e, &n)) == nil || n != Aend){
-			p = nil;
-			werrstr("array doesn't end with Aend");
-		}
-		break;
-	case Aend:
-		p = nil;
-		werrstr("unexpected Aend");
-		break;
-	default:
-		werrstr("unexpected type %d", p[-1]);
-		p = nil;
-		break;
-	}
-
-	if(p == nil)
-		amffree(a);
-	else
-		*amf = a;
-
-	return p;
-}
-
-int
-amffmt(Fmt *f)
-{
-	Amf *a;
-	int i;
-
-	a = va_arg(f->args, Amf*);
-
-	switch(a->type){
-	case Tstr:
-		fmtprint(f, "%#q", a->str);
-		break;
-	case Tobj:
-		fmtprint(f, "{");
-		for(i = 0; i < a->obj.n; i++)
-			fmtprint(f, "%s%q:%A", i > 0 ? ", " : "", a->obj.k[i], a->obj.v[i]);
-		fmtprint(f, "}");
-		break;
-	case Tarr:
-		fmtprint(f, "[");
-		for(i = 0; i < a->arr.n; i++)
-			fmtprint(f, "%s%A", i > 0 ? ", " : "", a->arr.v[i]);
-		fmtprint(f, "]");
-		break;
-	case Tnum:
-		fmtprint(f, "%g", a->num);
-		break;
-	case Tbool:
-		fmtprint(f, a->bool ? "true" : "false");
-		break;
-	default:
-		sysfatal("unknown amf type %d", a->type);
-	}
-
-	return 0;
-}
--- a/amf.h
+++ /dev/null
@@ -1,52 +1,0 @@
-enum {
-	Tstr,
-	Tnum,
-	Tbool,
-	Tarr,
-	Tobj,
-};
-
-typedef struct Amf Amf;
-
-struct Amf {
-	int type;
-	union {
-		char *str;
-		double num;
-		u8int bool;
-		struct {
-			Amf **v;
-			int n;
-		}arr;
-		struct {
-			char **k;
-			Amf **v;
-			int n;
-		}obj;
-	};
-};
-
-u8int *amfi16(u8int *p, u8int *e, s16int i);
-u8int *amfbool(u8int *p, u8int *e, int v);
-u8int *amfbyte(u8int *p, u8int *e, u8int byte);
-u8int *amfi24(u8int *p, u8int *e, s32int i);
-u8int *amfi32(u8int *p, u8int *e, s32int i);
-u8int *amfnum(u8int *p, u8int *e, double v);
-u8int *amfstr(u8int *p, u8int *e, char *s);
-u8int *amfarr(u8int *p, u8int *e);
-u8int *amfobj(u8int *p, u8int *e);
-u8int *amfend(u8int *p, u8int *e);
-u8int *amfkvnum(u8int *p, u8int *e, char *name, double v);
-u8int *amfkvstr(u8int *p, u8int *e, char *name, char *v);
-u8int *amfkvbool(u8int *p, u8int *e, char *name, int v);
-
-u8int *amfbyteget(u8int *p, u8int *e, u8int *byte);
-u8int *amfi16get(u8int *p, u8int *e, s16int *i);
-u8int *amfi24get(u8int *p, u8int *e, s32int *i);
-u8int *amfi32get(u8int *p, u8int *e, s32int *i);
-
-u8int *amfparse(Amf **a, u8int *p, u8int *e);
-void amffree(Amf *a);
-
-#pragma varargck type "A" Amf*
-int amffmt(Fmt *f);
--- a/flv.c
+++ b/flv.c
@@ -1,6 +1,6 @@
 #include <u.h>
 #include <libc.h>
-#include "amf.h"
+#include "amf0.h"
 #include "flv.h"
 
 enum {
@@ -27,25 +27,25 @@
 	p0 = p;
 	*p++ = Fvideo;
 	psz = p;
-	p = amfi24(p, e, 0); /* sz set later */
-	p = amfi24(p, e, ts);
+	p = amf0i24(p, e, 0); /* sz set later */
+	p = amf0i24(p, e, ts);
 	*p++ = ts>>24;
-	p = amfi24(p, e, stream);
+	p = amf0i24(p, e, stream);
 
 	d = p;
-	p = amfstr(p, e, "onMetaData");
-	p = amfarr(p, e);
-	p = amfi32(p, e, audio ? 5 : 4);
-	p = amfkvnum(p, e, "duration", 0.0);
-	p = amfkvnum(p, e, "width", w);
-	p = amfkvnum(p, e, "height", h);
-	p = amfkvnum(p, e, "videocodecid", EncH264);
+	p = amf0str(p, e, "onMetaData");
+	p = amf0arr(p, e);
+	p = amf0i32(p, e, audio ? 5 : 4);
+	p = amf0kvnum(p, e, "duration", 0.0);
+	p = amf0kvnum(p, e, "width", w);
+	p = amf0kvnum(p, e, "height", h);
+	p = amf0kvnum(p, e, "videocodecid", EncH264);
 	if(audio)
-		p = amfkvnum(p, e, "audiocodecid", EncAAC);
-	p = amfend(p, e);
-	amfi24(psz, e, p-d);
+		p = amf0kvnum(p, e, "audiocodecid", EncAAC);
+	p = amf0end(p, e);
+	amf0i24(psz, e, p-d);
 
-	return amfi32(p, e, p-p0);
+	return amf0i32(p, e, p-p0);
 }
 
 u8int *
@@ -61,10 +61,10 @@
 	p0 = p;
 	*p++ = type;
 	psz = p;
-	p = amfi24(p, e, 0); /* size to be set later */
-	p = amfi24(p, e, dts);
+	p = amf0i24(p, e, 0); /* size to be set later */
+	p = amf0i24(p, e, dts);
 	*p++ = dts >> 24;
-	p = amfi24(p, e, stream);
+	p = amf0i24(p, e, stream);
 
 	d = p;
 	if(type == Faudio){
@@ -75,12 +75,12 @@
 		*p++ = ((fl & FlKey) ? 0x10 : 0x20) | EncH264;
 		*p++ = (fl & FlHdr) ? 0 : 1;
 		pts = ((fl & FlHdr) || pts < dts) ? 0 : (pts - dts);
-		p = amfi24(p, e, pts);
+		p = amf0i24(p, e, pts);
 		if((fl & FlHdr) == 0)
-			p = amfi32(p, e, sz);
+			p = amf0i32(p, e, sz);
 	}
 	p = (u8int*)memmove(p, data, sz) + sz;
-	amfi24(psz, e, p-d);
+	amf0i24(psz, e, p-d);
 
-	return amfi32(p, e, p-p0);
+	return amf0i32(p, e, p-p0);
 }
--- a/mkfile
+++ b/mkfile
@@ -4,13 +4,14 @@
 TARG=rtmp
 
 HFILES=\
-	amf.h\
+	amf0.h\
 	flv.h\
 	ivf.h\
 	rtmp.h\
+	util.h\
 
 OFILES=\
-	amf.$O\
+	amf0.$O\
 	flv.$O\
 	ivf.$O\
 	main.$O\
--- a/rtmp.c
+++ b/rtmp.c
@@ -3,7 +3,7 @@
 #include <thread.h>
 #include <bio.h>
 #include <libsec.h>
-#include "amf.h"
+#include "amf0.h"
 #include "ivf.h"
 #include "rtmp.h"
 #include "util.h"
@@ -11,69 +11,80 @@
 #define min(a,b) ((a)<(b)?(a):(b))
 
 enum {
-	SzLarge,
-	SzMedium,
-	SzSmall,
-	SzTiny,
-
 	Port = 1935,
-	Sigsz = 1536,
+	CSsz = 1536,
 	Chunk = 128,
 
-	ChanCtl = 3,
+	Type0 = 0,
+	Type1,
+	Type2,
+	Type3,
 
-	CbWhat = 0,
-	CbInvoke,
-	CbData,
-	CbStatus,
-	NumCbA,
+	CSCtl = 3,
 
+	CbCommand = 0,
+	CbTransID,
+	CbObject,
+	CbResponse,
+	NumCb,
+
+	/* UserControl */
 	CtlStreamBegin = 0,
-	CtlStreamEnd,
+	CtlStreamEOF,
 	CtlStreamDry,
-	CtlStreamRecorded,
-	CtlPing = 6,
-	CtlBufferEmpty = 31,
-	CtlBufferReady,
+	CtlSetBufferLen,
+	CtlStreamIsRecorded,
+	CtlPingRequest = 6,
+	CtlPingResponse,
 
-	PktChunkSz = 1,
-	PktBytesReadReport = 3,
-	PktControl = 4,
-	PktServerBW = 5,
-	PktClientBW,
-	PktAudio = 8,
-	PktVideo,
-	PktFlexStreamSend = 15,
-	PktFlexSharedObj,
-	PkgFlexMessage,
-	PktFlexInfo,
-	PktSharedObj = 19,
-	PktInvoke = 20,
-	PktFlashVideo = 22,
+	/* Message.type */
+	SetChunkSize = 1,
+	Abort,
+	Ack,
+	UserControl,
+	WindowAckSize,
+	SetBandwidth,
+	Audio = 8,
+	Video,
+	AMF3Metadata = 15,
+	AMF3SharedObject,
+	AMF3Command,
+	AMF0Metadata,
+	AMF0SharedObject,
+	AMF0Command,
+	Aggregate = 22,
 
+	/* RTMP.bwlimit */
+	LimitHard = 0,
+	LimitSoft,
+	LimitDynamic,
+
 	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
 	Bufsz = 64*1024,
 };
 
-typedef struct Invoke Invoke;
-typedef struct Packet Packet;
+typedef struct Command Command;
+typedef struct Message Message;
 
-struct Invoke {
-	void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux);
+#pragma varargck type "T" int
+#pragma varargck type "M" Message*
+
+struct Command {
+	void (*cb)(RTMP *r, int ok, Amf0 *a[NumCb], void *aux);
 	void *aux;
-	int n;
+	int tid;
 
-	Invoke *prev, *next;
+	Command *prev, *next;
 };
 
-struct Packet {
+struct Message {
 	int type;
-	int ht;
-	int chan;
+	int fmt;
+	int cs;
 	u32int ts;
 	u8int *data;
 	int sz;
-	Invoke invoke;
+	Command cmd;
 };
 
 struct RTMP {
@@ -82,130 +93,89 @@
 	char *app;
 	char *path;
 	char *tcurl;
-	Packet pk;
+	Message msg;
 	u8int *b, *p, *e;
 	int chunk;
 	int mode;
 	int bsz;
 	int i;
-	int svbw;
-	int clbw;
-	u8int clbw2;
+	int winacksz;
+	int bw;
+	u8int bwlimit;
 	struct {
-		int n;
-		Invoke *w;
-	}invokes;
+		int tid;
+		Command *w;
+	}cmds;
 	u8int biobuf[Biobufsz];
 };
 
-#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
-#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
-#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
-#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
-#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
-#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
-#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
-#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
-#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
-#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
-#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
+#define puti16(i) do{ r->p = amf0i16(r->p, r->e, i); }while(0)
+#define puti24(i) do{ r->p = amf0i24(r->p, r->e, i); }while(0)
+#define puti32(i) do{ r->p = amf0i32(r->p, r->e, i); }while(0)
+#define putnum(v) do{ r->p = amf0num(r->p, r->e, v); }while(0)
+#define putstr(s) do{ r->p = amf0str(r->p, r->e, s); }while(0)
+#define putarr() do{ r->p = amf0arr(r->p, r->e); }while(0)
+#define putobj() do{ r->p = amf0obj(r->p, r->e); }while(0)
+#define putend() do{ r->p = amf0end(r->p, r->e); }while(0)
+#define putkvnum(name, v) do{ r->p = amf0kvnum(r->p, r->e, name, v); }while(0)
+#define putkvstr(name, s) do{ r->p = amf0kvstr(r->p, r->e, name, s); }while(0)
+#define putkvbool(name, s) do{ r->p = amf0kvbool(r->p, r->e, name, s); }while(0)
 
-#define putinvoke(name) do { \
+#define putcommand(name, cb_) do { \
 	putstr(name); \
-	putnum(r->pk.invoke.n); \
+	putnum(r->msg.cmd.tid); \
 	putobj(); \
+	r->msg.cmd.cb = cb_; \
 }while(0)
 
 static int szs[] = {
-	[SzTiny] = 1,
-	[SzSmall] = 4,
-	[SzMedium] = 8,
-	[SzLarge] = 12,
+	[Type3] = 0,
+	[Type2] = 3,
+	[Type1] = 7,
+	[Type0] = 11,
 };
 
-static char *pktype2s[] = {
-	[PktChunkSz] = "ChunkSz",
-	[PktBytesReadReport] = "BytesReadReport",
-	[PktControl] = "Control",
-	[PktServerBW] = "ServerBW",
-	[PktClientBW] = "ClientBW",
-	[PktAudio] = "Audio",
-	[PktVideo] = "Video",
-	[PktFlexStreamSend] = "FlexStreamSend",
-	[PktFlexSharedObj] = "FlexSharedObj",
-	[PktFlexInfo] = "FlexInfo",
-	[PktSharedObj] = "SharedObj",
-	[PktInvoke] = "Invoke",
-	[PktFlashVideo] = "FlashVideo",
+static char *msgtype2s[] = {
+	[SetChunkSize] = "SetChunkSize",
+	[Abort] = "Abort",
+	[Ack] = "Ack",
+	[UserControl] = "UserControl",
+	[WindowAckSize] = "WindowAckSize",
+	[SetBandwidth] = "SetBandwidth",
+	[Audio] = "Audio",
+	[Video] = "Video",
+	[AMF3Metadata] = "AMF3Metadata",
+	[AMF3SharedObject] = "AMF3SharedObject",
+	[AMF3Command] = "AMF3Command",
+	[AMF0Metadata] = "AMF0Metadata",
+	[AMF0SharedObject] = "AMF0SharedObject",
+	[AMF0Command] = "AMF0Command",
+	[Aggregate] = "Aggregate",
 };
 
 static char *ctl2s[] = {
 	[CtlStreamBegin] = "StreamBegin",
-	[CtlStreamEnd] = "StreamEnd",
+	[CtlStreamEOF] = "StreamEOF",
 	[CtlStreamDry] = "StreamDry",
-	[CtlStreamRecorded] = "StreamRecorded",
-	[CtlPing] = "Ping",
-	[CtlBufferEmpty] = "BufferEmpty",
-	[CtlBufferReady] = "BufferReady",
+	[CtlSetBufferLen] = "SetBufferLen",
+	[CtlStreamIsRecorded] = "StreamIsRecorded",
+	[CtlPingRequest] = "PingRequest",
+	[CtlPingResponse] = "PingResponse",
 };
 
 extern int debug;
 
-#pragma varargck type "T" int
-static int
-pktypefmt(Fmt *f)
-{
-	char *s;
-	int t;
-
-	if((t = va_arg(f->args, int)) >= 0 &&
-	   t < nelem(pktype2s) &&
-	   (s = pktype2s[t]) != nil)
-		return fmtprint(f, "%s", s);
-
-	return fmtprint(f, "%d", t);
-}
-
-#pragma varargck type "P" Packet*
-static int
-pkfmt(Fmt *f)
-{
-	u8int *s, *e;
-	Packet *p;
-	Amf *a;
-
-	p = va_arg(f->args, Packet*);
-
-	fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
-
-	s = p->data;
-	e = s + p->sz;
-
-	if(p->type == PktInvoke){
-		fmtprint(f, ":");
-		for(; s != nil && s != e;){
-			if((s = amfparse(&a, s, e)) != nil)
-				fmtprint(f, " %A", a);
-			else
-				fmtprint(f, " %r");
-			amffree(a);
-		}
-	}
-
-	return 0;
-}
-
 static void
-newpacket(RTMP *r, int type, int ht, int chan)
+newmsg(RTMP *r, int type, int fmt, int cs)
 {
-	memset(&r->pk, 0, sizeof(r->pk));
+	memset(&r->msg, 0, sizeof(r->msg));
 
-	r->pk.type = type;
-	r->pk.ht = ht;
-	r->pk.chan = chan;
+	r->msg.type = type;
+	r->msg.fmt = fmt;
+	r->msg.cs = cs;
 	r->p = r->b;
-	if(type == PktInvoke)
-		r->pk.invoke.n = ++r->invokes.n;
+	if(type == AMF0Command)
+		r->msg.cmd.tid = ++r->cmds.tid;
 }
 
 static void
@@ -224,69 +194,47 @@
 }
 
 static int
-handshake(int f)
-{
-	u8int cl[1+Sigsz], sv[1+Sigsz];
-
-	cl[0] = 3; /* no encryption */
-	memset(cl+1, 0, 8);
-	prng(cl+1+8, Sigsz-8);
-	if(write(f, cl, sizeof(cl)) != sizeof(cl))
-		goto err;
-	if(readn(f, sv, sizeof(sv)) != sizeof(sv))
-		goto err;
-	if(cl[0] != sv[0]){
-		werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
-		goto err;
-	}
-	if(write(f, sv+1, Sigsz) != Sigsz)
-		goto err;
-	if(readn(f, sv+1, Sigsz) != Sigsz)
-		goto err;
-	if(memcmp(cl, sv, sizeof(cl)) != 0){
-		werrstr("signature mismatch");
-		goto err;
-	}
-
-	return 0;
-
-err:
-	werrstr("handshake: %r");
-	return -1;
-}
-
-static int
 rtmprecv(RTMP *r)
 {
-	int hsz, bodysz, dummy, n;
-	u8int *h, *e, type;
+	int hsz, len, n, msid;
+	u8int *h, *e, byte;
 	u32int ts;
 
-	memset(&r->pk, 0, sizeof(r->pk));
+	memset(&r->msg, 0, sizeof(r->msg));
 
 	r->p = r->b;
 	if(readn(r->i, r->p, 1) != 1)
 		goto err;
-	r->pk.ht = (r->p[0] & 0xc0)>>6;
-	r->pk.chan = r->p[0] & 0x3f;
-	hsz = szs[r->pk.ht];
-	if(readn(r->i, r->p+1, hsz-1) != hsz-1)
+	r->msg.fmt = (r->p[0] & 0xc0)>>6;
+	r->msg.cs = r->p[0] & 0x3f;
+	n = r->msg.cs + 1;
+	if(n <= 2){
+		if(readn(r->i, r->p, n) != n)
+			goto err;
+		r->msg.cs = 64 + r->p[0];
+		if(n == 2)
+			r->msg.cs += 256 * r->p[1];
+	}
+
+	hsz = szs[r->msg.fmt];
+	if(readn(r->i, r->p, hsz) != hsz)
 		goto err;
 
-	h = r->p + 1;
+	h = r->p;
 	e = r->p + hsz;
-	r->pk.type = -1;
-	bodysz = 0;
-	if(hsz >= szs[SzSmall]){
-		h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
-		if(hsz >= szs[SzMedium]){
-			h = amfi24get(h, e, &bodysz);
-			h = amfbyteget(h, e, &type);
-			r->pk.type = type;
-			if(hsz >= szs[SzLarge]){
-				dummy = 0;
-				h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
-			}
+
+	r->msg.type = -1;
+	msid = 0;
+	ts = 0;
+	len = 0;
+	if(hsz >= szs[Type2]){
+		h = amf0i24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
+		if(hsz >= szs[Type1]){
+			h = amf0i24get(h, e, &len);
+			h = amf0byteget(h, e, &byte);
+			r->msg.type = byte;
+			if(hsz >= szs[Type0])
+				h = amf0i32leget(h, e, &msid);
 		}
 	}
 
@@ -293,31 +241,31 @@
 	if(ts == 0xffffff){ /* exntended timestamp */
 		if(readn(r->i, h, 4) != 4)
 			goto err;
-		h = amfi32get(h, h+4, (s32int*)&ts);
+		h = amf0i32get(h, h+4, (s32int*)&ts);
 	}
 
-	/* FIXME do all consecutive chunks use Tiny? */
-	bextend(r, bodysz);
-	r->pk.data = h;
-	r->pk.sz = bodysz;
+	/* FIXME do all consecutive chunks use Type3? */
+	bextend(r, len);
+	r->msg.data = h;
+	r->msg.sz = len;
 	for(;;){
-		n = min(bodysz, r->chunk);
+		n = min(len, r->chunk);
 		if(readn(r->i, h, n) != n)
 			goto err;
-		bodysz -= n;
+		len -= n;
 		h += n;
-		if(bodysz < 1)
+		if(len < 1)
 			break;
 		if(readn(r->i, h, 1) != 1)
 			goto err;
-		if((r->pk.chan | SzTiny<<6) != *h){
-			werrstr("chan/size does not match: %02x", *h);
+		if((r->msg.cs | Type3<<6) != *h){
+			werrstr("cs/fmt does not match: %02x", *h);
 			goto err;
 		}
 	}
 
 	if(debug)
-		fprint(2, "→ %P\n", &r->pk);
+		fprint(2, "→ %M\n", &r->msg);
 
 	return 0;
 
@@ -329,28 +277,27 @@
 static int
 rtmpsend(RTMP *r)
 {
-	u8int *p, *h, *e, hdata[32];
-	int bodysz, n, hsz;
-	Invoke *i;
+	u8int *p, *h, *e, hdata[24];
+	int len, n, hsz;
+	Command *c;
 
 	if(r->p == nil)
 		goto err;
 
-	r->pk.data = r->b;
-	r->pk.sz = r->p - r->b;
-	/* FIXME special case when bodysz is 0 */
+	r->msg.data = r->b;
+	r->msg.sz = r->p - r->b;
 
-	hsz = szs[r->pk.ht];
 	h = hdata;
+	*h++ = r->msg.fmt<<6 | r->msg.cs;
+	hsz = szs[r->msg.fmt];
 	e = h + hsz;
-	*h++ = r->pk.ht<<6 | r->pk.chan;
-	if(hsz >= szs[SzSmall]){
-		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
-		if(hsz >= szs[SzMedium]){
-			h = amfi24(h, e, r->pk.sz);
-			h = amfbyte(h, e, r->pk.type);
-			if(hsz >= szs[SzLarge])
-				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
+	if(hsz >= szs[Type2]){
+		h = amf0i24(h, e, 0); /* FIXME put actual timestamps? */
+		if(hsz >= szs[Type1]){
+			h = amf0i24(h, e, r->msg.sz);
+			h = amf0byte(h, e, r->msg.type);
+			if(hsz >= szs[Type0])
+				h = amf0i32(h, e, 0); /* FIXME message stream id */
 		}
 	}
 	assert(h != nil);
@@ -358,14 +305,14 @@
 	if(Bwrite(r, hdata, h-hdata) < 0)
 		goto err;
 
-	for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
-		n = min(bodysz, r->chunk);
+	for(p = r->msg.data, len = r->msg.sz; len > 0;){
+		n = min(len, r->chunk);
 		if(Bwrite(r, p, n) < 0)
 			goto err;
 		p += n;
-		bodysz -= n;
-		if(bodysz > 0){
-			*h = r->pk.chan | SzTiny<<6;
+		len -= n;
+		if(len > 0){
+			*h = r->msg.cs | Type3<<6;
 			Bputc(r, *h);
 		}
 	}
@@ -373,15 +320,15 @@
 	Bflush(r);
 
 	if(debug)
-		fprint(2, "← %P\n", &r->pk);
+		fprint(2, "← %M\n", &r->msg);
 
-	if(r->pk.type == PktInvoke){
-		i = emalloc(sizeof(*i));
-		*i = r->pk.invoke;
-		assert(i->cb != nil);
-		if((i->next = r->invokes.w) != nil)
-			i->next->prev = i;
-		r->invokes.w = i;
+	if(r->msg.type == AMF0Command){
+		c = emalloc(sizeof(*c));
+		*c = r->msg.cmd;
+		assert(c->cb != nil);
+		if((c->next = r->cmds.w) != nil)
+			c->next->prev = c;
+		r->cmds.w = c;
 	}
 
 	return 0;
@@ -391,34 +338,6 @@
 }
 
 static void
-connected(RTMP *r, int ok, Amf *a[NumCbA], void *)
-{
-	sendp(r->c, ok ? nil : smprint("%A", a[CbStatus]));
-}
-
-static int
-connect(RTMP *r)
-{
-	newpacket(r, PktInvoke, SzLarge, ChanCtl);
-	putinvoke("connect");
-		putkvstr("app", r->app);
-		putkvstr("tcUrl", r->tcurl);
-		if(r->mode & OWRITE)
-			putkvstr("type", "nonprivate");
-		else{
-			putkvbool("fpad", 0);
-			putkvnum("capabilities", 15);
-			putkvnum("audioCodecs", 3191);
-			putkvnum("videoCodecs", 252);
-			putkvnum("videoFunction", 1);
-		}
-	putend();
-	r->pk.invoke.cb = connected;
-
-	return rtmpsend(r);
-}
-
-static void
 rtmpfree(RTMP *r)
 {
 	free(r->app);
@@ -437,20 +356,20 @@
 loop(void *aux)
 {
 	int res, n, ok;
-	Amf *a[NumCbA];
+	Amf0 *a[NumCb];
 	u8int *s, *e;
 	s16int s16;
-	Packet *p;
-	Invoke *i;
+	Message *m;
+	Command *c;
 	RTMP *r;
 
 	r = aux;
-	p = &r->pk;
+	m = &r->msg;
 	res = 0;
 	memset(a, 0, sizeof(a));
 	for(;;){
 		for(n = 0; n < nelem(a); n++)
-			amffree(a[n]);
+			amf0free(a[n]);
 		memset(a, 0, sizeof(a));
 
 		if(res != 0 || (res = rtmprecv(r)) != 0){
@@ -457,60 +376,55 @@
 			if(debug)
 				fprint(2, "rtmp loop: %r\n");
 			for(n = 0; n < nelem(a); n++)
-				amffree(a[n]);
+				amf0free(a[n]);
 			break;
 		}
 
-		s = r->pk.data;
-		e = s + r->pk.sz;
+		s = r->msg.data;
+		e = s + r->msg.sz;
 
-		switch(p->type){
-		case PktInvoke:
-			i = nil;
-			ok = 0;
-			for(n = 0; n < NumCbA; n++){
-				if((s = amfparse(&a[n], s, e)) == nil)
+		switch(m->type){
+		case AMF0Command:
+			c = nil;
+			ok = 1;
+			for(n = 0; n < NumCb; n++){
+				if((s = amf0parse(&a[n], s, e)) == nil)
 					goto err;
 				switch(n){
-				case CbWhat:
+				case CbCommand:
 					if(a[n]->type != Tstr){
-						werrstr("invoke a[%d] not a string", n);
+						werrstr("command name is not a string");
 						goto err;
 					}
-					if(strcmp(a[n]->str, "_result") == 0)
-						ok = 1;
-					else if(strcmp(a[n]->str, "_error") == 0)
+					if(strcmp(a[n]->str, "_error") == 0)
 						ok = 0;
-					else{
-						werrstr("unexpected a[%d]: %#q", n, a[n]->str);
-						goto err;
-					}
+					/* other values: "_result", etc */
 					break;
-				case CbInvoke:
+				case CbTransID:
 					if(a[n]->type != Tnum){
-						werrstr("invoke a[%d] not a number", n);
+						werrstr("transaction ID is not a number");
 						goto err;
 					}
-					for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next);
-					if(i == nil){
-						werrstr("did not expect invoke %d result", (int)a[n]->num);
+					for(c = r->cmds.w; c != nil && c->tid != a[n]->num; c = c->next);
+					if(c == nil){
+						werrstr("response to non-existent transaction %d", (int)a[n]->num);
 						goto err;
 					}
 					break;
 				}
 			}
-			if(i->prev != nil)
-				i->prev->next = i->next;
-			if(i->next != nil)
-				i->next->prev = i->prev;
-			if(r->invokes.w == i)
-				r->invokes.w = i->next;
-			i->cb(r, ok, a, i->aux);
-			free(i);
+			if(c->prev != nil)
+				c->prev->next = c->next;
+			if(c->next != nil)
+				c->next->prev = c->prev;
+			if(r->cmds.w == c)
+				r->cmds.w = c->next;
+			c->cb(r, ok, a, c->aux);
+			free(c);
 			break;
 
-		case PktChunkSz:
-			if(amfi32get(s, e, &r->chunk) == nil)
+		case SetChunkSize:
+			if(amf0i32get(s, e, &r->chunk) == nil)
 				goto err;
 			if(r->chunk < 2){
 				werrstr("invalid chunk size: %d", r->chunk);
@@ -520,52 +434,58 @@
 				fprint(2, "new chunk size: %d bytes\n", r->chunk);
 			break;
 
-		case PktBytesReadReport:
-		case PktControl:
-			if((s = amfi16get(s, e, &s16)) == nil)
+		case UserControl:
+			if((s = amf0i16get(s, e, &s16)) == nil)
 				goto err;
-			if((s = amfi32get(s, e, &n)) == nil)
+			if(amf0i32get(s, e, &n) == nil)
 				n = -1;
 			switch(s16){
 			case CtlStreamBegin:
-			case CtlStreamEnd:
+			case CtlStreamEOF:
 			case CtlStreamDry:
-			case CtlStreamRecorded:
-			case CtlBufferEmpty:
-			case CtlBufferReady:
+			case CtlSetBufferLen:
+			case CtlStreamIsRecorded:
 				if(0){
-			case CtlPing:
+			case CtlPingRequest:
 					/* FIXME pong */
 					USED(n);
 				}
 				if(debug)
-					fprint(2, "control packet: %s %d\n", ctl2s[s16], n);
+					fprint(2, "control message: %s %d\n", ctl2s[s16], n);
 				break;
 			default:
 				if(debug)
-					fprint(2, "unknown control packet %d (value %d)\n", s16, n);
+					fprint(2, "unknown control message %d (value %d)\n", s16, n);
 				break;
 			}
 			break;
 
-		case PktServerBW:
-			if((s = amfi32get(s, e, &r->svbw)) == nil)
+		case WindowAckSize:
+			if(amf0i32get(s, e, &r->winacksz) == nil)
 				goto err;
 			break;
 
-		case PktClientBW:
-			if((s = amfi32get(s, e, &r->clbw)) == nil || (s = amfbyteget(s, e, &r->clbw2)) == nil)
+		case SetBandwidth:
+			if((s = amf0i32get(s, e, &r->bw)) == nil || amf0byteget(s, e, &r->bwlimit) == nil)
 				goto err;
 			break;
 
-		case PktAudio:
-		case PktVideo:
-		case PktFlexStreamSend:
-		case PktFlexSharedObj:
-		case PktFlexInfo:
-		case PktSharedObj:
-		case PktFlashVideo:
+		/* FIXME */
+		case Aggregate:
+		case Abort:
+		case Ack:
+		case Audio:
+		case Video:
+		case AMF0Metadata:
+		case AMF0SharedObject:
 			break;
+
+		case AMF3Metadata:
+		case AMF3SharedObject:
+		case AMF3Command:
+			if(debug)
+				fprint(2, "AMF3 message, ignoring\n");
+			break;
 err:
 			res = -1;
 			break;
@@ -576,6 +496,102 @@
 	threadexitsall(res == 0 ? nil : "error");
 }
 
+static int
+handshake(int f)
+{
+	u8int c[1+CSsz], s[1+CSsz];
+
+	c[0] = 3; /* rtmp v3 */
+	memset(c+1, 0, 4+4); /* timestamp + zero */
+	prng(c+1+8, CSsz-4-4);
+	if(write(f, c, sizeof(c)) != sizeof(c))
+		goto err;
+	if(readn(f, s, sizeof(s)) != sizeof(s))
+		goto err;
+	if(c[0] != s[0]){
+		werrstr("expected version %d, got %d", c[0], s[0]);
+		goto err;
+	}
+	if(write(f, s+1, CSsz) != CSsz)
+		goto err;
+	if(readn(f, s+1, CSsz) != CSsz)
+		goto err;
+	if(memcmp(c, s, sizeof(c)) != 0){
+		werrstr("C1 != S2");
+		goto err;
+	}
+
+	return 0;
+
+err:
+	werrstr("handshake: %r");
+	return -1;
+}
+
+static void
+connected(RTMP *r, int ok, Amf0 *a[NumCb], void *)
+{
+	sendp(r->c, ok ? nil : smprint("%A", a[CbResponse]));
+}
+
+static int
+connect(RTMP *r)
+{
+	newmsg(r, AMF0Command, Type0, CSCtl);
+	putcommand("connect", connected);
+		putkvstr("app", r->app);
+		putkvstr("tcUrl", r->tcurl);
+		putkvbool("fpad", 0); /* no proxy */
+		putkvnum("audioCodecs", 0x4 | 0x400); /* mp3 + aac */
+		putkvnum("videoCodecs", 0x80); /* h.264 */
+		putkvnum("videoFunction", 0); /* no frame-accurate seek */
+	putend();
+
+	return rtmpsend(r);
+}
+
+static int
+msgtypefmt(Fmt *f)
+{
+	char *s;
+	int t;
+
+	if((t = va_arg(f->args, int)) >= 0 &&
+	   t < nelem(msgtype2s) &&
+	   (s = msgtype2s[t]) != nil)
+		return fmtprint(f, "%s", s);
+
+	return fmtprint(f, "%d", t);
+}
+
+static int
+msgfmt(Fmt *f)
+{
+	u8int *s, *e;
+	Message *m;
+	Amf0 *a;
+
+	m = va_arg(f->args, Message*);
+
+	fmtprint(f, "type=%T cs=%d ts=%ud sz=%d", m->type, m->cs, m->ts, m->sz);
+
+	s = m->data;
+	e = s + m->sz;
+
+	if(m->type == AMF0Command){
+		fmtprint(f, ":");
+		for(; s != nil && s != e;){
+			if((s = amf0parse(&a, s, e)) != nil)
+				fmtprint(f, " %A", a);
+			else
+				fmtprint(f, " %r");
+			amf0free(a);
+		}
+	}
+
+	return 0;
+}
+
 RTMP *
 rtmpdial(char *url, int w, int h, int withaudio)
 {
@@ -583,9 +599,9 @@
 	int f, port, ctl;
 	RTMP *r;
 
-	fmtinstall('A', amffmt);
-	fmtinstall('T', pktypefmt);
-	fmtinstall('P', pkfmt);
+	fmtinstall('A', amf0fmt);
+	fmtinstall('T', msgtypefmt);
+	fmtinstall('M', msgfmt);
 	quotefmtinstall();
 
 	r = nil;