ref: c6821e9c478e8c1e4effdba02cfe7d56a78c3cd9
parent: f6d84677e9cd1cd4725391a577295ff561e09443
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Tue Jul 27 09:11:57 EDT 2021
rework send/recv
--- a/rtmp.c
+++ b/rtmp.c
@@ -17,12 +17,24 @@
ChanCtl = 3,
- SzTiny = 1,
- SzSmall = 4,
- SzMedium = 8,
- SzLarge = 12,
+ SzLarge = 0,
+ SzMedium,
+ SzSmall,
+ SzTiny,
+ PktChunkSz = 1,
+ PktBytesReadReport,
+ PktControl,
+ PktServerBW,
+ PktClientBW,
+ PktAudio = 8,
+ PktVideo,
+ PktFlexStreamSend = 15,
+ PktFlexSharedObj,
+ PktFlexInfo,
+ PktSharedObj,
PktInvoke = 20,
+ PktFlashVideo = 22,
Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
Bufsz = 64*1024,
@@ -32,7 +44,7 @@
struct Packet {
int type;
- int hsz;
+ int ht;
int chan;
u32int ts;
u8int *data;
@@ -69,9 +81,58 @@
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
+static szs[] = {
+ [SzTiny] = 1,
+ [SzSmall] = 4,
+ [SzMedium] = 8,
+ [SzLarge] = 12,
+};
+
+static char *pktype2s[] = {
+ [PktChunkSz] = "ChunkSz",
+ [PktBytesReadReport] = "BytesReadReport",
+ [PktControl] = "Control",
+ [PktServerBW] = "ServerBW",
+ [PktClientBW] = "ClientBW",
+ [PktAudio] = "Audio",
+ [PktVideo] = "Video",
+ [PktFlexStreamSend] = "FlexStreamSend",
+ [PktFlexSharedObj] = "FlexSharedObj",
+ [PktFlexInfo] = "FlexInfo",
+ [PktSharedObj] = "SharedObj",
+ [PktInvoke] = "Invoke",
+ [PktFlashVideo] = "FlashVideo",
+};
+
int rtmpdump = 0;
extern int debug;
+#pragma varargck type "T" int
+static int
+pktypefmt(Fmt *f)
+{
+ char *s;
+ int t;
+
+ if((t = va_arg(f->args, int)) >= 0 &&
+ t < nelem(pktype2s) &&
+ (s = pktype2s[t]) != nil)
+ return fmtprint(f, "%s", s);
+
+ return fmtprint(f, "%d", t);
+}
+
+#pragma varargck type "P" Packet*
+static int
+pkfmt(Fmt *f)
+{
+ Packet *p;
+
+ p = va_arg(f->args, Packet*);
+
+ return fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
+}
+
static Packet *
pk4chan(RTMP *r, int chan)
{
@@ -105,21 +166,14 @@
}
static void
-newpacket(RTMP *r, int type, int hsz, int chan)
+newpacket(RTMP *r, int type, int ht, int chan)
{
memset(&r->pk, 0, sizeof(r->pk));
r->pk.type = type;
- r->pk.hsz = hsz;
+ r->pk.ht = ht;
r->pk.chan = chan;
- r->p = r->b + hsz;
- r->b[0] = chan;
-
- switch(hsz){
- case SzTiny: r->b[0] |= 3<<6; break;
- case SzSmall: r->b[0] |= 2<<6; break;
- case SzMedium: r->b[0] |= 1<<6; break;
- }
+ r->p = r->b;
}
static void
@@ -134,7 +188,7 @@
sysfatal("memory");
if(ob != nil)
r->p = r->b + (intptr)(ob - r->p);
- r->bsz *= 2;
+ r->bsz = bsz*2;
r->e = r->b + r->bsz;
}
@@ -173,36 +227,35 @@
static int
rtmprecv(RTMP *r)
{
+ int hsz, bodysz, dummy, n;
u8int *h, *e, type;
- int hsz, chan, bodysz, dummy;
u32int ts;
+ memset(&r->pk, 0, sizeof(r->pk));
+
r->p = r->b;
if(readn(r->i, r->p, 1) != 1)
goto err;
- hsz = (r->p[0] & 0xc0)>>6;
- chan = r->p[0] & 0x3f;
- if(debug)
- fprint(2, "hsz=%d chan=%d\n", hsz, chan);
+ r->pk.ht = (r->p[0] & 0xc0)>>6;
+ r->pk.chan = r->p[0] & 0x3f;
+ hsz = szs[r->pk.ht];
if(readn(r->i, r->p+1, hsz-1) != hsz-1)
goto err;
- memset(&r->pk, 0, sizeof(r->pk));
- r->pk.hsz = hsz;
- r->pk.chan = chan;
-
h = r->p + 1;
e = r->p + hsz;
r->pk.type = -1;
bodysz = 0;
- if(hsz >= SzSmall){
+ if(hsz >= szs[SzSmall]){
h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
- if(hsz >= SzMedium){
+ if(hsz >= szs[SzMedium]){
h = amfi24get(h, e, &bodysz);
h = amfbyteget(h, e, &type);
r->pk.type = type;
- if(hsz >= SzLarge)
+ if(hsz >= szs[SzLarge]){
+ dummy = 0;
h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
+ }
}
}
@@ -212,11 +265,31 @@
h = amfi32get(h, h+4, (s32int*)&ts);
}
+ /* FIXME do all consecutive chunks use Tiny? */
bextend(r, bodysz);
- if(readn(r->i, h, bodysz) != bodysz)
- goto err;
- h += bodysz;
+ r->pk.data = h;
+ r->pk.sz = bodysz;
+ for(;;){
+ n = min(bodysz, Chunk);
+ if(readn(r->i, h, n) != n)
+ goto err;
+ bodysz -= n;
+ h += n;
+ if(bodysz < 1)
+ break;
+ if(readn(r->i, h, 1) != 1)
+ goto err;
+ if((r->pk.chan | SzTiny<<6) != *h){
+ werrstr("chan/size does not match: %02x", *h);
+ goto err;
+ }
+ }
+ if(rtmpdump)
+ write(1, r->pk.data, r->pk.sz);
+ if(debug)
+ fprint(2, "→ %P\n", &r->pk);
+
return 0;
err:
@@ -228,48 +301,53 @@
rtmpsend(RTMP *r)
{
int bodysz, n, hsz;
- u8int *p, *h, *e;
+ u8int *p, *h, *e, hdata[32];
- assert(r->p != nil);
-
- bodysz = r->p - r->b - r->pk.hsz;
+ r->pk.data = r->b;
+ r->pk.sz = r->p - r->b;
/* FIXME special case when bodysz is 0 */
- h = r->b;
- e = h + r->pk.hsz;
- h++;
- if(r->pk.hsz >= SzSmall){
+
+ hsz = szs[r->pk.ht];
+ h = hdata;
+ e = h + hsz;
+ *h++ = r->pk.ht<<6 | r->pk.chan;
+ if(hsz >= szs[SzSmall]){
h = amfi24(h, e, 0); /* FIXME proper timestamps? */
- if(r->pk.hsz >= SzMedium){
- h = amfi24(h, e, bodysz);
+ if(hsz >= szs[SzMedium]){
+ h = amfi24(h, e, r->pk.sz);
h = amfbyte(h, e, r->pk.type);
- if(r->pk.hsz >= SzLarge)
+ if(hsz >= szs[SzLarge])
h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
}
}
- if(h == nil)
- goto err;
+ assert(h != nil);
memset(h, 0, e-h);
+ if(Bwrite(r, hdata, h-hdata) < 0)
+ goto err;
+ if(rtmpdump)
+ write(1, hdata, h-hdata);
- p = r->b;
- hsz = e - r->b;
- for(; hsz+bodysz > 0;){
+ for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
n = min(bodysz, Chunk);
- if(Bwrite(r, p, hsz+n) < 0)
+ if(Bwrite(r, p, n) < 0)
goto err;
if(rtmpdump)
- write(1, p, hsz+n);
+ write(1, p, n);
+ p += n;
bodysz -= n;
- p += hsz+n;
- hsz = 0;
if(bodysz > 0){
- *(--p) = 0xc0 | r->b[0];
- hsz = 1;
+ *h = r->pk.chan | SzTiny<<6;
+ Bputc(r, *h);
+ if(rtmpdump)
+ write(1, h, 1);
}
}
- r->p = nil;
Bflush(r);
+ if(debug)
+ fprint(2, "← %P\n", &r->pk);
+
return 0;
err:
werrstr("rtmpsend: %r");
@@ -305,6 +383,9 @@
char *s, *e, *path, *app;
int f, port, ctl;
RTMP *r;
+
+ fmtinstall('T', pktypefmt);
+ fmtinstall('P', pkfmt);
r = nil;
f = -1;