ref: 5a2a972aa72c4b827ea784d91c15fd02adee3f71
parent: c6ee10356946d9bc7b34974cc737acec166f0e6b
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Thu Jul 29 04:56:27 EDT 2021
rewrite according to the rtmp spec
--- a/LICENSE
+++ b/LICENSE
@@ -1,1 +1,1 @@
-This is partially based on librtmp which is LGPLv2.1, same goes for this project.
+Public domain.
--- a/amf.c
+++ /dev/null
@@ -1,366 +1,0 @@
-#include <u.h>
-#include <libc.h>
-#include "amf.h"
-#include "util.h"
-
-enum {
- Anum,
- Abool,
- Astr,
- Aobj,
- Aarr = 8,
- Aend,
- Alstr = 12,
-};
-
-#define atleast(what, x) do{ \
- if(p == nil) \
- return nil; \
- if(e-p < x){ \
- werrstr(what ": buffer short: expected %d, have %d", x, (int)(e-p)); \
- return nil; \
- } \
-}while(0)
-
-u8int *
-amfbool(u8int *p, u8int *e, int v)
-{
- atleast("bool", 2);
- *p++ = Abool;
- *p++ = !!v;
- return p;
-}
-
-u8int *
-amfbyte(u8int *p, u8int *e, u8int byte)
-{
- atleast("byte", 1);
- *p++ = byte;
- return p;
-}
-
-u8int *
-amfi16(u8int *p, u8int *e, s16int i)
-{
- atleast("i16", 2);
- *p++ = i >> 8;
- *p++ = i;
- return p;
-}
-
-u8int *
-amfi24(u8int *p, u8int *e, s32int i)
-{
- atleast("i24", 3);
- *p++ = i >> 16;
- *p++ = i >> 8;
- *p++ = i;
- return p;
-}
-
-u8int *
-amfi32(u8int *p, u8int *e, s32int i)
-{
- atleast("i32", 4);
- *p++ = i >> 24;
- *p++ = i >> 16;
- *p++ = i >> 8;
- *p++ = i;
- return p;
-}
-
-u8int *
-amfnum(u8int *p, u8int *e, double v)
-{
- union {
- double v;
- u64int u;
- }x;
-
- atleast("num", 1+8);
- *p++ = Anum;
- x.v = v;
- *p++ = x.u >> 56;
- *p++ = x.u >> 48;
- *p++ = x.u >> 40;
- *p++ = x.u >> 32;
- *p++ = x.u >> 24;
- *p++ = x.u >> 16;
- *p++ = x.u >> 8;
- *p++ = x.u;
-
- return p;
-}
-
-u8int *
-amfstr(u8int *p, u8int *e, char *s)
-{
- int n;
-
- n = strlen(s);
- if(n <= 0xffff){
- atleast("str", 1+2+n);
- *p++ = Astr;
- return (u8int*)memmove(amfi16(p, e, n), s, n) + n;
- }
-
- atleast("lstr", 1+4+n);
- *p++ = Alstr;
- return (u8int*)memmove(amfi32(p, e, n), s, n) + n;
-}
-
-u8int *
-amfarr(u8int *p, u8int *e)
-{
- return amfbyte(p, e, Aarr);
-}
-
-u8int *
-amfobj(u8int *p, u8int *e)
-{
- return amfbyte(p, e, Aobj);
-}
-
-u8int *
-amfend(u8int *p, u8int *e)
-{
- return amfi24(p, e, Aend);
-}
-
-static u8int *
-amfkv(u8int *p, u8int *e, char *name)
-{
- int n;
-
- if((n = strlen(name)) > 0xffff){
- werrstr("string too long");
- return nil;
- }
- atleast("kv", 2+n);
- p = amfi16(p, e, n);
-
- return (u8int*)memmove(p, name, n) + n;
-}
-
-u8int *
-amfkvbool(u8int *p, u8int *e, char *name, int v)
-{
- return amfbool(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfkvnum(u8int *p, u8int *e, char *name, double v)
-{
- return amfnum(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfkvstr(u8int *p, u8int *e, char *name, char *v)
-{
- return amfstr(amfkv(p, e, name), e, v);
-}
-
-u8int *
-amfbyteget(u8int *p, u8int *e, u8int *byte)
-{
- atleast("byte", 1);
- *byte = *p;
- return p+1;
-}
-
-u8int *
-amfi16get(u8int *p, u8int *e, s16int *i)
-{
- atleast("i16", 2);
- *i = p[0]<<8 | p[1];
- return p+2;
-}
-
-u8int *
-amfi24get(u8int *p, u8int *e, s32int *i)
-{
- atleast("i24", 3);
- *i = p[0]<<16 | p[1]<<8 | p[2];
- return p+3;
-}
-
-u8int *
-amfi32get(u8int *p, u8int *e, s32int *i)
-{
- atleast("i32", 4);
- *i = p[0]<<16 | p[1]<<16 | p[2]<<8 | p[3];
- return p+4;
-}
-
-void
-amffree(Amf *a)
-{
- int i;
-
- if(a == nil)
- return;
-
- switch(a->type){
- case Tstr:
- free(a->str);
- break;
- case Tobj:
- for(i = 0; i < a->obj.n; i++){
- free(a->obj.k[i]);
- amffree(a->obj.v[i]);
- }
- break;
- case Tarr:
- for(i = 0; i < a->arr.n; i++)
- amffree(a->arr.v[i]);
- case Tnum:
- case Tbool:
- break;
- default:
- sysfatal("unknown amf type %d", a->type);
- }
-
- free(a);
-}
-
-u8int *
-amfparse(Amf **amf, u8int *p, u8int *e)
-{
- s16int s16;
- union {
- double v;
- u64int u;
- }x;
- int n;
- Amf *a;
-
- atleast("type", 1);
-
- a = ecalloc(1, sizeof(Amf));
- *amf = nil;
-
- switch(*p++){
- case Anum:
- atleast("num", 8);
- for(n = 0, x.u = 0; n < 8; n++)
- x.u = x.u<<8 | *p++;
- a->type = Tnum;
- a->num = x.v;
- break;
- case Abool:
- atleast("bool", 1);
- a->type = Tbool;
- a->bool = *p++;
- break;
- case Alstr:
- if((p = amfi32get(p, e, &n)) == nil)
- return nil;
- if(0){
- case Astr:
- if((p = amfi16get(p, e, &s16)) == nil)
- return nil;
- n = s16;
- }
- atleast("str", n);
- a->str = estrndup(p, n);
- p += n;
- break;
- case Aobj:
- atleast("obj", 3); /* Aend should be there anyway */
- a->type = Tobj;
- for(a->obj.n = 0;;){
- if((p = amfi16get(p, e, &s16)) == nil)
- break;
- if(s16 == 0){
- atleast("obj end?", 1);
- if(*p != Aend){
- werrstr("object doesn't end well");
- p = nil;
- break;
- }
- p++;
- break;
- }
- n = s16;
- atleast("obj key", n);
- a->obj.n++;
- a->obj.k = erealloc(a->obj.k, sizeof(*a->obj.k)*a->obj.n);
- a->obj.v = erealloc(a->obj.v, sizeof(*a->obj.v)*a->obj.n);
- a->obj.k[a->obj.n-1] = estrndup(p, n);
- p += n;
- if((p = amfparse(&a->obj.v[a->obj.n-1], p, e)) == nil){
- werrstr("obj val: %r");
- break;
- }
- }
- break;
- case Aarr:
- a->type = Tarr;
- if((p = amfi32get(p, e, &a->arr.n)) == nil)
- break;
- a->arr.v = emalloc(sizeof(*a->arr.v)*a->arr.n);
- for(n = 0; n < a->arr.n; n++){
- if((p = amfparse(&a->arr.v[n], p, e)) == nil){
- werrstr("arr el: %r");
- break;
- }
- }
- if((p = amfi24get(p, e, &n)) == nil || n != Aend){
- p = nil;
- werrstr("array doesn't end with Aend");
- }
- break;
- case Aend:
- p = nil;
- werrstr("unexpected Aend");
- break;
- default:
- werrstr("unexpected type %d", p[-1]);
- p = nil;
- break;
- }
-
- if(p == nil)
- amffree(a);
- else
- *amf = a;
-
- return p;
-}
-
-int
-amffmt(Fmt *f)
-{
- Amf *a;
- int i;
-
- a = va_arg(f->args, Amf*);
-
- switch(a->type){
- case Tstr:
- fmtprint(f, "%#q", a->str);
- break;
- case Tobj:
- fmtprint(f, "{");
- for(i = 0; i < a->obj.n; i++)
- fmtprint(f, "%s%q:%A", i > 0 ? ", " : "", a->obj.k[i], a->obj.v[i]);
- fmtprint(f, "}");
- break;
- case Tarr:
- fmtprint(f, "[");
- for(i = 0; i < a->arr.n; i++)
- fmtprint(f, "%s%A", i > 0 ? ", " : "", a->arr.v[i]);
- fmtprint(f, "]");
- break;
- case Tnum:
- fmtprint(f, "%g", a->num);
- break;
- case Tbool:
- fmtprint(f, a->bool ? "true" : "false");
- break;
- default:
- sysfatal("unknown amf type %d", a->type);
- }
-
- return 0;
-}
--- a/amf.h
+++ /dev/null
@@ -1,52 +1,0 @@
-enum {
- Tstr,
- Tnum,
- Tbool,
- Tarr,
- Tobj,
-};
-
-typedef struct Amf Amf;
-
-struct Amf {
- int type;
- union {
- char *str;
- double num;
- u8int bool;
- struct {
- Amf **v;
- int n;
- }arr;
- struct {
- char **k;
- Amf **v;
- int n;
- }obj;
- };
-};
-
-u8int *amfi16(u8int *p, u8int *e, s16int i);
-u8int *amfbool(u8int *p, u8int *e, int v);
-u8int *amfbyte(u8int *p, u8int *e, u8int byte);
-u8int *amfi24(u8int *p, u8int *e, s32int i);
-u8int *amfi32(u8int *p, u8int *e, s32int i);
-u8int *amfnum(u8int *p, u8int *e, double v);
-u8int *amfstr(u8int *p, u8int *e, char *s);
-u8int *amfarr(u8int *p, u8int *e);
-u8int *amfobj(u8int *p, u8int *e);
-u8int *amfend(u8int *p, u8int *e);
-u8int *amfkvnum(u8int *p, u8int *e, char *name, double v);
-u8int *amfkvstr(u8int *p, u8int *e, char *name, char *v);
-u8int *amfkvbool(u8int *p, u8int *e, char *name, int v);
-
-u8int *amfbyteget(u8int *p, u8int *e, u8int *byte);
-u8int *amfi16get(u8int *p, u8int *e, s16int *i);
-u8int *amfi24get(u8int *p, u8int *e, s32int *i);
-u8int *amfi32get(u8int *p, u8int *e, s32int *i);
-
-u8int *amfparse(Amf **a, u8int *p, u8int *e);
-void amffree(Amf *a);
-
-#pragma varargck type "A" Amf*
-int amffmt(Fmt *f);
--- a/flv.c
+++ b/flv.c
@@ -1,6 +1,6 @@
#include <u.h>
#include <libc.h>
-#include "amf.h"
+#include "amf0.h"
#include "flv.h"
enum {
@@ -27,25 +27,25 @@
p0 = p;
*p++ = Fvideo;
psz = p;
- p = amfi24(p, e, 0); /* sz set later */
- p = amfi24(p, e, ts);
+ p = amf0i24(p, e, 0); /* sz set later */
+ p = amf0i24(p, e, ts);
*p++ = ts>>24;
- p = amfi24(p, e, stream);
+ p = amf0i24(p, e, stream);
d = p;
- p = amfstr(p, e, "onMetaData");
- p = amfarr(p, e);
- p = amfi32(p, e, audio ? 5 : 4);
- p = amfkvnum(p, e, "duration", 0.0);
- p = amfkvnum(p, e, "width", w);
- p = amfkvnum(p, e, "height", h);
- p = amfkvnum(p, e, "videocodecid", EncH264);
+ p = amf0str(p, e, "onMetaData");
+ p = amf0arr(p, e);
+ p = amf0i32(p, e, audio ? 5 : 4);
+ p = amf0kvnum(p, e, "duration", 0.0);
+ p = amf0kvnum(p, e, "width", w);
+ p = amf0kvnum(p, e, "height", h);
+ p = amf0kvnum(p, e, "videocodecid", EncH264);
if(audio)
- p = amfkvnum(p, e, "audiocodecid", EncAAC);
- p = amfend(p, e);
- amfi24(psz, e, p-d);
+ p = amf0kvnum(p, e, "audiocodecid", EncAAC);
+ p = amf0end(p, e);
+ amf0i24(psz, e, p-d);
- return amfi32(p, e, p-p0);
+ return amf0i32(p, e, p-p0);
}
u8int *
@@ -61,10 +61,10 @@
p0 = p;
*p++ = type;
psz = p;
- p = amfi24(p, e, 0); /* size to be set later */
- p = amfi24(p, e, dts);
+ p = amf0i24(p, e, 0); /* size to be set later */
+ p = amf0i24(p, e, dts);
*p++ = dts >> 24;
- p = amfi24(p, e, stream);
+ p = amf0i24(p, e, stream);
d = p;
if(type == Faudio){
@@ -75,12 +75,12 @@
*p++ = ((fl & FlKey) ? 0x10 : 0x20) | EncH264;
*p++ = (fl & FlHdr) ? 0 : 1;
pts = ((fl & FlHdr) || pts < dts) ? 0 : (pts - dts);
- p = amfi24(p, e, pts);
+ p = amf0i24(p, e, pts);
if((fl & FlHdr) == 0)
- p = amfi32(p, e, sz);
+ p = amf0i32(p, e, sz);
}
p = (u8int*)memmove(p, data, sz) + sz;
- amfi24(psz, e, p-d);
+ amf0i24(psz, e, p-d);
- return amfi32(p, e, p-p0);
+ return amf0i32(p, e, p-p0);
}
--- a/mkfile
+++ b/mkfile
@@ -4,13 +4,14 @@
TARG=rtmp
HFILES=\
- amf.h\
+ amf0.h\
flv.h\
ivf.h\
rtmp.h\
+ util.h\
OFILES=\
- amf.$O\
+ amf0.$O\
flv.$O\
ivf.$O\
main.$O\
--- a/rtmp.c
+++ b/rtmp.c
@@ -3,7 +3,7 @@
#include <thread.h>
#include <bio.h>
#include <libsec.h>
-#include "amf.h"
+#include "amf0.h"
#include "ivf.h"
#include "rtmp.h"
#include "util.h"
@@ -11,69 +11,80 @@
#define min(a,b) ((a)<(b)?(a):(b))
enum {
- SzLarge,
- SzMedium,
- SzSmall,
- SzTiny,
-
Port = 1935,
- Sigsz = 1536,
+ CSsz = 1536,
Chunk = 128,
- ChanCtl = 3,
+ Type0 = 0,
+ Type1,
+ Type2,
+ Type3,
- CbWhat = 0,
- CbInvoke,
- CbData,
- CbStatus,
- NumCbA,
+ CSCtl = 3,
+ CbCommand = 0,
+ CbTransID,
+ CbObject,
+ CbResponse,
+ NumCb,
+
+ /* UserControl */
CtlStreamBegin = 0,
- CtlStreamEnd,
+ CtlStreamEOF,
CtlStreamDry,
- CtlStreamRecorded,
- CtlPing = 6,
- CtlBufferEmpty = 31,
- CtlBufferReady,
+ CtlSetBufferLen,
+ CtlStreamIsRecorded,
+ CtlPingRequest = 6,
+ CtlPingResponse,
- PktChunkSz = 1,
- PktBytesReadReport = 3,
- PktControl = 4,
- PktServerBW = 5,
- PktClientBW,
- PktAudio = 8,
- PktVideo,
- PktFlexStreamSend = 15,
- PktFlexSharedObj,
- PkgFlexMessage,
- PktFlexInfo,
- PktSharedObj = 19,
- PktInvoke = 20,
- PktFlashVideo = 22,
+ /* Message.type */
+ SetChunkSize = 1,
+ Abort,
+ Ack,
+ UserControl,
+ WindowAckSize,
+ SetBandwidth,
+ Audio = 8,
+ Video,
+ AMF3Metadata = 15,
+ AMF3SharedObject,
+ AMF3Command,
+ AMF0Metadata,
+ AMF0SharedObject,
+ AMF0Command,
+ Aggregate = 22,
+ /* RTMP.bwlimit */
+ LimitHard = 0,
+ LimitSoft,
+ LimitDynamic,
+
Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
Bufsz = 64*1024,
};
-typedef struct Invoke Invoke;
-typedef struct Packet Packet;
+typedef struct Command Command;
+typedef struct Message Message;
-struct Invoke {
- void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux);
+#pragma varargck type "T" int
+#pragma varargck type "M" Message*
+
+struct Command {
+ void (*cb)(RTMP *r, int ok, Amf0 *a[NumCb], void *aux);
void *aux;
- int n;
+ int tid;
- Invoke *prev, *next;
+ Command *prev, *next;
};
-struct Packet {
+struct Message {
int type;
- int ht;
- int chan;
+ int fmt;
+ int cs;
u32int ts;
u8int *data;
int sz;
- Invoke invoke;
+ Command cmd;
};
struct RTMP {
@@ -82,130 +93,89 @@
char *app;
char *path;
char *tcurl;
- Packet pk;
+ Message msg;
u8int *b, *p, *e;
int chunk;
int mode;
int bsz;
int i;
- int svbw;
- int clbw;
- u8int clbw2;
+ int winacksz;
+ int bw;
+ u8int bwlimit;
struct {
- int n;
- Invoke *w;
- }invokes;
+ int tid;
+ Command *w;
+ }cmds;
u8int biobuf[Biobufsz];
};
-#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
-#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
-#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
-#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
-#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
-#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
-#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
-#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
-#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
-#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
-#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
+#define puti16(i) do{ r->p = amf0i16(r->p, r->e, i); }while(0)
+#define puti24(i) do{ r->p = amf0i24(r->p, r->e, i); }while(0)
+#define puti32(i) do{ r->p = amf0i32(r->p, r->e, i); }while(0)
+#define putnum(v) do{ r->p = amf0num(r->p, r->e, v); }while(0)
+#define putstr(s) do{ r->p = amf0str(r->p, r->e, s); }while(0)
+#define putarr() do{ r->p = amf0arr(r->p, r->e); }while(0)
+#define putobj() do{ r->p = amf0obj(r->p, r->e); }while(0)
+#define putend() do{ r->p = amf0end(r->p, r->e); }while(0)
+#define putkvnum(name, v) do{ r->p = amf0kvnum(r->p, r->e, name, v); }while(0)
+#define putkvstr(name, s) do{ r->p = amf0kvstr(r->p, r->e, name, s); }while(0)
+#define putkvbool(name, s) do{ r->p = amf0kvbool(r->p, r->e, name, s); }while(0)
-#define putinvoke(name) do { \
+#define putcommand(name, cb_) do { \
putstr(name); \
- putnum(r->pk.invoke.n); \
+ putnum(r->msg.cmd.tid); \
putobj(); \
+ r->msg.cmd.cb = cb_; \
}while(0)
static int szs[] = {
- [SzTiny] = 1,
- [SzSmall] = 4,
- [SzMedium] = 8,
- [SzLarge] = 12,
+ [Type3] = 0,
+ [Type2] = 3,
+ [Type1] = 7,
+ [Type0] = 11,
};
-static char *pktype2s[] = {
- [PktChunkSz] = "ChunkSz",
- [PktBytesReadReport] = "BytesReadReport",
- [PktControl] = "Control",
- [PktServerBW] = "ServerBW",
- [PktClientBW] = "ClientBW",
- [PktAudio] = "Audio",
- [PktVideo] = "Video",
- [PktFlexStreamSend] = "FlexStreamSend",
- [PktFlexSharedObj] = "FlexSharedObj",
- [PktFlexInfo] = "FlexInfo",
- [PktSharedObj] = "SharedObj",
- [PktInvoke] = "Invoke",
- [PktFlashVideo] = "FlashVideo",
+static char *msgtype2s[] = {
+ [SetChunkSize] = "SetChunkSize",
+ [Abort] = "Abort",
+ [Ack] = "Ack",
+ [UserControl] = "UserControl",
+ [WindowAckSize] = "WindowAckSize",
+ [SetBandwidth] = "SetBandwidth",
+ [Audio] = "Audio",
+ [Video] = "Video",
+ [AMF3Metadata] = "AMF3Metadata",
+ [AMF3SharedObject] = "AMF3SharedObject",
+ [AMF3Command] = "AMF3Command",
+ [AMF0Metadata] = "AMF0Metadata",
+ [AMF0SharedObject] = "AMF0SharedObject",
+ [AMF0Command] = "AMF0Command",
+ [Aggregate] = "Aggregate",
};
static char *ctl2s[] = {
[CtlStreamBegin] = "StreamBegin",
- [CtlStreamEnd] = "StreamEnd",
+ [CtlStreamEOF] = "StreamEOF",
[CtlStreamDry] = "StreamDry",
- [CtlStreamRecorded] = "StreamRecorded",
- [CtlPing] = "Ping",
- [CtlBufferEmpty] = "BufferEmpty",
- [CtlBufferReady] = "BufferReady",
+ [CtlSetBufferLen] = "SetBufferLen",
+ [CtlStreamIsRecorded] = "StreamIsRecorded",
+ [CtlPingRequest] = "PingRequest",
+ [CtlPingResponse] = "PingResponse",
};
extern int debug;
-#pragma varargck type "T" int
-static int
-pktypefmt(Fmt *f)
-{
- char *s;
- int t;
-
- if((t = va_arg(f->args, int)) >= 0 &&
- t < nelem(pktype2s) &&
- (s = pktype2s[t]) != nil)
- return fmtprint(f, "%s", s);
-
- return fmtprint(f, "%d", t);
-}
-
-#pragma varargck type "P" Packet*
-static int
-pkfmt(Fmt *f)
-{
- u8int *s, *e;
- Packet *p;
- Amf *a;
-
- p = va_arg(f->args, Packet*);
-
- fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
-
- s = p->data;
- e = s + p->sz;
-
- if(p->type == PktInvoke){
- fmtprint(f, ":");
- for(; s != nil && s != e;){
- if((s = amfparse(&a, s, e)) != nil)
- fmtprint(f, " %A", a);
- else
- fmtprint(f, " %r");
- amffree(a);
- }
- }
-
- return 0;
-}
-
static void
-newpacket(RTMP *r, int type, int ht, int chan)
+newmsg(RTMP *r, int type, int fmt, int cs)
{
- memset(&r->pk, 0, sizeof(r->pk));
+ memset(&r->msg, 0, sizeof(r->msg));
- r->pk.type = type;
- r->pk.ht = ht;
- r->pk.chan = chan;
+ r->msg.type = type;
+ r->msg.fmt = fmt;
+ r->msg.cs = cs;
r->p = r->b;
- if(type == PktInvoke)
- r->pk.invoke.n = ++r->invokes.n;
+ if(type == AMF0Command)
+ r->msg.cmd.tid = ++r->cmds.tid;
}
static void
@@ -224,69 +194,47 @@
}
static int
-handshake(int f)
-{
- u8int cl[1+Sigsz], sv[1+Sigsz];
-
- cl[0] = 3; /* no encryption */
- memset(cl+1, 0, 8);
- prng(cl+1+8, Sigsz-8);
- if(write(f, cl, sizeof(cl)) != sizeof(cl))
- goto err;
- if(readn(f, sv, sizeof(sv)) != sizeof(sv))
- goto err;
- if(cl[0] != sv[0]){
- werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
- goto err;
- }
- if(write(f, sv+1, Sigsz) != Sigsz)
- goto err;
- if(readn(f, sv+1, Sigsz) != Sigsz)
- goto err;
- if(memcmp(cl, sv, sizeof(cl)) != 0){
- werrstr("signature mismatch");
- goto err;
- }
-
- return 0;
-
-err:
- werrstr("handshake: %r");
- return -1;
-}
-
-static int
rtmprecv(RTMP *r)
{
- int hsz, bodysz, dummy, n;
- u8int *h, *e, type;
+ int hsz, len, n, msid;
+ u8int *h, *e, byte;
u32int ts;
- memset(&r->pk, 0, sizeof(r->pk));
+ memset(&r->msg, 0, sizeof(r->msg));
r->p = r->b;
if(readn(r->i, r->p, 1) != 1)
goto err;
- r->pk.ht = (r->p[0] & 0xc0)>>6;
- r->pk.chan = r->p[0] & 0x3f;
- hsz = szs[r->pk.ht];
- if(readn(r->i, r->p+1, hsz-1) != hsz-1)
+ r->msg.fmt = (r->p[0] & 0xc0)>>6;
+ r->msg.cs = r->p[0] & 0x3f;
+ n = r->msg.cs + 1;
+ if(n <= 2){
+ if(readn(r->i, r->p, n) != n)
+ goto err;
+ r->msg.cs = 64 + r->p[0];
+ if(n == 2)
+ r->msg.cs += 256 * r->p[1];
+ }
+
+ hsz = szs[r->msg.fmt];
+ if(readn(r->i, r->p, hsz) != hsz)
goto err;
- h = r->p + 1;
+ h = r->p;
e = r->p + hsz;
- r->pk.type = -1;
- bodysz = 0;
- if(hsz >= szs[SzSmall]){
- h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
- if(hsz >= szs[SzMedium]){
- h = amfi24get(h, e, &bodysz);
- h = amfbyteget(h, e, &type);
- r->pk.type = type;
- if(hsz >= szs[SzLarge]){
- dummy = 0;
- h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
- }
+
+ r->msg.type = -1;
+ msid = 0;
+ ts = 0;
+ len = 0;
+ if(hsz >= szs[Type2]){
+ h = amf0i24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
+ if(hsz >= szs[Type1]){
+ h = amf0i24get(h, e, &len);
+ h = amf0byteget(h, e, &byte);
+ r->msg.type = byte;
+ if(hsz >= szs[Type0])
+ h = amf0i32leget(h, e, &msid);
}
}
@@ -293,31 +241,31 @@
if(ts == 0xffffff){ /* exntended timestamp */
if(readn(r->i, h, 4) != 4)
goto err;
- h = amfi32get(h, h+4, (s32int*)&ts);
+ h = amf0i32get(h, h+4, (s32int*)&ts);
}
- /* FIXME do all consecutive chunks use Tiny? */
- bextend(r, bodysz);
- r->pk.data = h;
- r->pk.sz = bodysz;
+ /* FIXME do all consecutive chunks use Type3? */
+ bextend(r, len);
+ r->msg.data = h;
+ r->msg.sz = len;
for(;;){
- n = min(bodysz, r->chunk);
+ n = min(len, r->chunk);
if(readn(r->i, h, n) != n)
goto err;
- bodysz -= n;
+ len -= n;
h += n;
- if(bodysz < 1)
+ if(len < 1)
break;
if(readn(r->i, h, 1) != 1)
goto err;
- if((r->pk.chan | SzTiny<<6) != *h){
- werrstr("chan/size does not match: %02x", *h);
+ if((r->msg.cs | Type3<<6) != *h){
+ werrstr("cs/fmt does not match: %02x", *h);
goto err;
}
}
if(debug)
- fprint(2, "→ %P\n", &r->pk);
+ fprint(2, "→ %M\n", &r->msg);
return 0;
@@ -329,28 +277,27 @@
static int
rtmpsend(RTMP *r)
{
- u8int *p, *h, *e, hdata[32];
- int bodysz, n, hsz;
- Invoke *i;
+ u8int *p, *h, *e, hdata[24];
+ int len, n, hsz;
+ Command *c;
if(r->p == nil)
goto err;
- r->pk.data = r->b;
- r->pk.sz = r->p - r->b;
- /* FIXME special case when bodysz is 0 */
+ r->msg.data = r->b;
+ r->msg.sz = r->p - r->b;
- hsz = szs[r->pk.ht];
h = hdata;
+ *h++ = r->msg.fmt<<6 | r->msg.cs;
+ hsz = szs[r->msg.fmt];
e = h + hsz;
- *h++ = r->pk.ht<<6 | r->pk.chan;
- if(hsz >= szs[SzSmall]){
- h = amfi24(h, e, 0); /* FIXME proper timestamps? */
- if(hsz >= szs[SzMedium]){
- h = amfi24(h, e, r->pk.sz);
- h = amfbyte(h, e, r->pk.type);
- if(hsz >= szs[SzLarge])
- h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
+ if(hsz >= szs[Type2]){
+ h = amf0i24(h, e, 0); /* FIXME put actual timestamps? */
+ if(hsz >= szs[Type1]){
+ h = amf0i24(h, e, r->msg.sz);
+ h = amf0byte(h, e, r->msg.type);
+ if(hsz >= szs[Type0])
+ h = amf0i32(h, e, 0); /* FIXME message stream id */
}
}
assert(h != nil);
@@ -358,14 +305,14 @@
if(Bwrite(r, hdata, h-hdata) < 0)
goto err;
- for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
- n = min(bodysz, r->chunk);
+ for(p = r->msg.data, len = r->msg.sz; len > 0;){
+ n = min(len, r->chunk);
if(Bwrite(r, p, n) < 0)
goto err;
p += n;
- bodysz -= n;
- if(bodysz > 0){
- *h = r->pk.chan | SzTiny<<6;
+ len -= n;
+ if(len > 0){
+ *h = r->msg.cs | Type3<<6;
Bputc(r, *h);
}
}
@@ -373,15 +320,15 @@
Bflush(r);
if(debug)
- fprint(2, "← %P\n", &r->pk);
+ fprint(2, "← %M\n", &r->msg);
- if(r->pk.type == PktInvoke){
- i = emalloc(sizeof(*i));
- *i = r->pk.invoke;
- assert(i->cb != nil);
- if((i->next = r->invokes.w) != nil)
- i->next->prev = i;
- r->invokes.w = i;
+ if(r->msg.type == AMF0Command){
+ c = emalloc(sizeof(*c));
+ *c = r->msg.cmd;
+ assert(c->cb != nil);
+ if((c->next = r->cmds.w) != nil)
+ c->next->prev = c;
+ r->cmds.w = c;
}
return 0;
@@ -391,34 +338,6 @@
}
static void
-connected(RTMP *r, int ok, Amf *a[NumCbA], void *)
-{
- sendp(r->c, ok ? nil : smprint("%A", a[CbStatus]));
-}
-
-static int
-connect(RTMP *r)
-{
- newpacket(r, PktInvoke, SzLarge, ChanCtl);
- putinvoke("connect");
- putkvstr("app", r->app);
- putkvstr("tcUrl", r->tcurl);
- if(r->mode & OWRITE)
- putkvstr("type", "nonprivate");
- else{
- putkvbool("fpad", 0);
- putkvnum("capabilities", 15);
- putkvnum("audioCodecs", 3191);
- putkvnum("videoCodecs", 252);
- putkvnum("videoFunction", 1);
- }
- putend();
- r->pk.invoke.cb = connected;
-
- return rtmpsend(r);
-}
-
-static void
rtmpfree(RTMP *r)
{
free(r->app);
@@ -437,20 +356,20 @@
loop(void *aux)
{
int res, n, ok;
- Amf *a[NumCbA];
+ Amf0 *a[NumCb];
u8int *s, *e;
s16int s16;
- Packet *p;
- Invoke *i;
+ Message *m;
+ Command *c;
RTMP *r;
r = aux;
- p = &r->pk;
+ m = &r->msg;
res = 0;
memset(a, 0, sizeof(a));
for(;;){
for(n = 0; n < nelem(a); n++)
- amffree(a[n]);
+ amf0free(a[n]);
memset(a, 0, sizeof(a));
if(res != 0 || (res = rtmprecv(r)) != 0){
@@ -457,60 +376,55 @@
if(debug)
fprint(2, "rtmp loop: %r\n");
for(n = 0; n < nelem(a); n++)
- amffree(a[n]);
+ amf0free(a[n]);
break;
}
- s = r->pk.data;
- e = s + r->pk.sz;
+ s = r->msg.data;
+ e = s + r->msg.sz;
- switch(p->type){
- case PktInvoke:
- i = nil;
- ok = 0;
- for(n = 0; n < NumCbA; n++){
- if((s = amfparse(&a[n], s, e)) == nil)
+ switch(m->type){
+ case AMF0Command:
+ c = nil;
+ ok = 1;
+ for(n = 0; n < NumCb; n++){
+ if((s = amf0parse(&a[n], s, e)) == nil)
goto err;
switch(n){
- case CbWhat:
+ case CbCommand:
if(a[n]->type != Tstr){
- werrstr("invoke a[%d] not a string", n);
+ werrstr("command name is not a string");
goto err;
}
- if(strcmp(a[n]->str, "_result") == 0)
- ok = 1;
- else if(strcmp(a[n]->str, "_error") == 0)
+ if(strcmp(a[n]->str, "_error") == 0)
ok = 0;
- else{
- werrstr("unexpected a[%d]: %#q", n, a[n]->str);
- goto err;
- }
+ /* other values: "_result", etc */
break;
- case CbInvoke:
+ case CbTransID:
if(a[n]->type != Tnum){
- werrstr("invoke a[%d] not a number", n);
+ werrstr("transaction ID is not a number");
goto err;
}
- for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next);
- if(i == nil){
- werrstr("did not expect invoke %d result", (int)a[n]->num);
+ for(c = r->cmds.w; c != nil && c->tid != a[n]->num; c = c->next);
+ if(c == nil){
+ werrstr("response to non-existent transaction %d", (int)a[n]->num);
goto err;
}
break;
}
}
- if(i->prev != nil)
- i->prev->next = i->next;
- if(i->next != nil)
- i->next->prev = i->prev;
- if(r->invokes.w == i)
- r->invokes.w = i->next;
- i->cb(r, ok, a, i->aux);
- free(i);
+ if(c->prev != nil)
+ c->prev->next = c->next;
+ if(c->next != nil)
+ c->next->prev = c->prev;
+ if(r->cmds.w == c)
+ r->cmds.w = c->next;
+ c->cb(r, ok, a, c->aux);
+ free(c);
break;
- case PktChunkSz:
- if(amfi32get(s, e, &r->chunk) == nil)
+ case SetChunkSize:
+ if(amf0i32get(s, e, &r->chunk) == nil)
goto err;
if(r->chunk < 2){
werrstr("invalid chunk size: %d", r->chunk);
@@ -520,52 +434,58 @@
fprint(2, "new chunk size: %d bytes\n", r->chunk);
break;
- case PktBytesReadReport:
- case PktControl:
- if((s = amfi16get(s, e, &s16)) == nil)
+ case UserControl:
+ if((s = amf0i16get(s, e, &s16)) == nil)
goto err;
- if((s = amfi32get(s, e, &n)) == nil)
+ if(amf0i32get(s, e, &n) == nil)
n = -1;
switch(s16){
case CtlStreamBegin:
- case CtlStreamEnd:
+ case CtlStreamEOF:
case CtlStreamDry:
- case CtlStreamRecorded:
- case CtlBufferEmpty:
- case CtlBufferReady:
+ case CtlSetBufferLen:
+ case CtlStreamIsRecorded:
if(0){
- case CtlPing:
+ case CtlPingRequest:
/* FIXME pong */
USED(n);
}
if(debug)
- fprint(2, "control packet: %s %d\n", ctl2s[s16], n);
+ fprint(2, "control message: %s %d\n", ctl2s[s16], n);
break;
default:
if(debug)
- fprint(2, "unknown control packet %d (value %d)\n", s16, n);
+ fprint(2, "unknown control message %d (value %d)\n", s16, n);
break;
}
break;
- case PktServerBW:
- if((s = amfi32get(s, e, &r->svbw)) == nil)
+ case WindowAckSize:
+ if(amf0i32get(s, e, &r->winacksz) == nil)
goto err;
break;
- case PktClientBW:
- if((s = amfi32get(s, e, &r->clbw)) == nil || (s = amfbyteget(s, e, &r->clbw2)) == nil)
+ case SetBandwidth:
+ if((s = amf0i32get(s, e, &r->bw)) == nil || amf0byteget(s, e, &r->bwlimit) == nil)
goto err;
break;
- case PktAudio:
- case PktVideo:
- case PktFlexStreamSend:
- case PktFlexSharedObj:
- case PktFlexInfo:
- case PktSharedObj:
- case PktFlashVideo:
+ /* FIXME */
+ case Aggregate:
+ case Abort:
+ case Ack:
+ case Audio:
+ case Video:
+ case AMF0Metadata:
+ case AMF0SharedObject:
break;
+
+ case AMF3Metadata:
+ case AMF3SharedObject:
+ case AMF3Command:
+ if(debug)
+ fprint(2, "AMF3 message, ignoring\n");
+ break;
err:
res = -1;
break;
@@ -576,6 +496,102 @@
threadexitsall(res == 0 ? nil : "error");
}
+static int
+handshake(int f)
+{
+ u8int c[1+CSsz], s[1+CSsz];
+
+ c[0] = 3; /* rtmp v3 */
+ memset(c+1, 0, 4+4); /* timestamp + zero */
+ prng(c+1+8, CSsz-4-4);
+ if(write(f, c, sizeof(c)) != sizeof(c))
+ goto err;
+ if(readn(f, s, sizeof(s)) != sizeof(s))
+ goto err;
+ if(c[0] != s[0]){
+ werrstr("expected version %d, got %d", c[0], s[0]);
+ goto err;
+ }
+ if(write(f, s+1, CSsz) != CSsz)
+ goto err;
+ if(readn(f, s+1, CSsz) != CSsz)
+ goto err;
+ if(memcmp(c, s, sizeof(c)) != 0){
+ werrstr("C1 != S2");
+ goto err;
+ }
+
+ return 0;
+
+err:
+ werrstr("handshake: %r");
+ return -1;
+}
+
+static void
+connected(RTMP *r, int ok, Amf0 *a[NumCb], void *)
+{
+ sendp(r->c, ok ? nil : smprint("%A", a[CbResponse]));
+}
+
+static int
+connect(RTMP *r)
+{
+ newmsg(r, AMF0Command, Type0, CSCtl);
+ putcommand("connect", connected);
+ putkvstr("app", r->app);
+ putkvstr("tcUrl", r->tcurl);
+ putkvbool("fpad", 0); /* no proxy */
+ putkvnum("audioCodecs", 0x4 | 0x400); /* mp3 + aac */
+ putkvnum("videoCodecs", 0x80); /* h.264 */
+ putkvnum("videoFunction", 0); /* no frame-accurate seek */
+ putend();
+
+ return rtmpsend(r);
+}
+
+static int
+msgtypefmt(Fmt *f)
+{
+ char *s;
+ int t;
+
+ if((t = va_arg(f->args, int)) >= 0 &&
+ t < nelem(msgtype2s) &&
+ (s = msgtype2s[t]) != nil)
+ return fmtprint(f, "%s", s);
+
+ return fmtprint(f, "%d", t);
+}
+
+static int
+msgfmt(Fmt *f)
+{
+ u8int *s, *e;
+ Message *m;
+ Amf0 *a;
+
+ m = va_arg(f->args, Message*);
+
+ fmtprint(f, "type=%T cs=%d ts=%ud sz=%d", m->type, m->cs, m->ts, m->sz);
+
+ s = m->data;
+ e = s + m->sz;
+
+ if(m->type == AMF0Command){
+ fmtprint(f, ":");
+ for(; s != nil && s != e;){
+ if((s = amf0parse(&a, s, e)) != nil)
+ fmtprint(f, " %A", a);
+ else
+ fmtprint(f, " %r");
+ amf0free(a);
+ }
+ }
+
+ return 0;
+}
+
RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
@@ -583,9 +599,9 @@
int f, port, ctl;
RTMP *r;
- fmtinstall('A', amffmt);
- fmtinstall('T', pktypefmt);
- fmtinstall('P', pkfmt);
+ fmtinstall('A', amf0fmt);
+ fmtinstall('T', msgtypefmt);
+ fmtinstall('M', msgfmt);
quotefmtinstall();
r = nil;