ref: f6d84677e9cd1cd4725391a577295ff561e09443
dir: /rtmp.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include <libsec.h> #include "amf.h" #include "ivf.h" #include "rtmp.h" #define min(a,b) ((a)<(b)?(a):(b)) enum { Port = 1935, Sigsz = 1536, Chunk = 128, ChanCtl = 3, SzTiny = 1, SzSmall = 4, SzMedium = 8, SzLarge = 12, PktInvoke = 20, Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */ Bufsz = 64*1024, }; typedef struct Packet Packet; struct Packet { int type; int hsz; int chan; u32int ts; u8int *data; int sz; int left; Packet *prev; Packet *next; }; struct RTMP { Biobufhdr; char *app; char *path; char *tcurl; Packet pk; Packet *ch; u8int *b, *p, *e; int mode; int bsz; int invokes; int i; u8int biobuf[Biobufsz]; }; #define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0) #define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0) #define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0) #define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0) #define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0) #define putarr() do{ r->p = amfarr(r->p, r->e); }while(0) #define putobj() do{ r->p = amfobj(r->p, r->e); }while(0) #define putend() do{ r->p = amfend(r->p, r->e); }while(0) #define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0) #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0) #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0) int rtmpdump = 0; extern int debug; static Packet * pk4chan(RTMP *r, int chan) { Packet *p; for(p = r->ch; p != nil && p->chan != chan; p = p->next); if(p == nil){ if((p = calloc(1, sizeof(*p))) == nil) sysfatal("memory"); p->type = -1; p->chan = chan; if((p->next = r->ch) != nil) r->ch->prev = p; } return p; } static void pkfree(RTMP *r, Packet *p) { if(p->prev != nil) p->prev->next = p->next; if(p->next != nil) p->next->prev = p->prev; if(r->ch == p) r->ch = p->next; free(p->data); } static void newpacket(RTMP *r, int type, int hsz, int chan) { memset(&r->pk, 0, sizeof(r->pk)); r->pk.type = type; r->pk.hsz = hsz; r->pk.chan = chan; r->p = r->b + hsz; r->b[0] = chan; switch(hsz){ case SzTiny: r->b[0] |= 3<<6; break; case SzSmall: r->b[0] |= 2<<6; break; case SzMedium: r->b[0] |= 1<<6; break; } } static void bextend(RTMP *r, int bsz) { u8int *ob; if(r->bsz >= bsz) return; ob = r->b; if((r->b = realloc(r->b, bsz*2)) == nil) sysfatal("memory"); if(ob != nil) r->p = r->b + (intptr)(ob - r->p); r->bsz *= 2; r->e = r->b + r->bsz; } static int handshake(int f) { u8int cl[1+Sigsz], sv[1+Sigsz]; cl[0] = 3; /* no encryption */ memset(cl+1, 0, 8); prng(cl+1+8, Sigsz-8); if(write(f, cl, sizeof(cl)) != sizeof(cl)) goto err; if(readn(f, sv, sizeof(sv)) != sizeof(sv)) goto err; if(cl[0] != sv[0]){ werrstr("expected %d (no encryption), got %d", cl[0], sv[0]); goto err; } if(write(f, sv+1, Sigsz) != Sigsz) goto err; if(readn(f, sv+1, Sigsz) != Sigsz) goto err; if(memcmp(cl, sv, sizeof(cl)) != 0){ werrstr("signature mismatch"); goto err; } return 0; err: werrstr("handshake: %r"); return -1; } static int rtmprecv(RTMP *r) { u8int *h, *e, type; int hsz, chan, bodysz, dummy; u32int ts; r->p = r->b; if(readn(r->i, r->p, 1) != 1) goto err; hsz = (r->p[0] & 0xc0)>>6; chan = r->p[0] & 0x3f; if(debug) fprint(2, "hsz=%d chan=%d\n", hsz, chan); if(readn(r->i, r->p+1, hsz-1) != hsz-1) goto err; memset(&r->pk, 0, sizeof(r->pk)); r->pk.hsz = hsz; r->pk.chan = chan; h = r->p + 1; e = r->p + hsz; r->pk.type = -1; bodysz = 0; if(hsz >= SzSmall){ h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */ if(hsz >= SzMedium){ h = amfi24get(h, e, &bodysz); h = amfbyteget(h, e, &type); r->pk.type = type; if(hsz >= SzLarge) h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */ } } if(ts == 0xffffff){ /* exntended timestamp */ if(readn(r->i, h, 4) != 4) goto err; h = amfi32get(h, h+4, (s32int*)&ts); } bextend(r, bodysz); if(readn(r->i, h, bodysz) != bodysz) goto err; h += bodysz; return 0; err: werrstr("rtmprecv: %r"); return -1; } static int rtmpsend(RTMP *r) { int bodysz, n, hsz; u8int *p, *h, *e; assert(r->p != nil); bodysz = r->p - r->b - r->pk.hsz; /* FIXME special case when bodysz is 0 */ h = r->b; e = h + r->pk.hsz; h++; if(r->pk.hsz >= SzSmall){ h = amfi24(h, e, 0); /* FIXME proper timestamps? */ if(r->pk.hsz >= SzMedium){ h = amfi24(h, e, bodysz); h = amfbyte(h, e, r->pk.type); if(r->pk.hsz >= SzLarge) h = amfi32(h, e, 0); /* FIXME seems to be always 0? */ } } if(h == nil) goto err; memset(h, 0, e-h); p = r->b; hsz = e - r->b; for(; hsz+bodysz > 0;){ n = min(bodysz, Chunk); if(Bwrite(r, p, hsz+n) < 0) goto err; if(rtmpdump) write(1, p, hsz+n); bodysz -= n; p += hsz+n; hsz = 0; if(bodysz > 0){ *(--p) = 0xc0 | r->b[0]; hsz = 1; } } r->p = nil; Bflush(r); return 0; err: werrstr("rtmpsend: %r"); return -1; } static int connect(RTMP *r) { newpacket(r, PktInvoke, SzLarge, ChanCtl); putstr("connect"); putnum(++r->invokes); putobj(); putkvstr("app", r->app); putkvstr("tcUrl", r->tcurl); if(r->mode & OWRITE) putkvstr("type", "nonprivate"); else{ putkvbool("fpad", 0); putkvnum("capabilities", 15); putkvnum("audioCodecs", 3191); putkvnum("videoCodecs", 252); putkvnum("videoFunction", 1); } putend(); return rtmpsend(r); } RTMP * rtmpdial(char *url, int w, int h, int withaudio) { char *s, *e, *path, *app; int f, port, ctl; RTMP *r; r = nil; f = -1; url = strdup(url); /* since we're changing it in-place */ if(memcmp(url, "rtmp://", 7) != 0){ werrstr("invalid url"); goto err; } s = url + 7; if((e = strpbrk(s, ":/")) == nil){ werrstr("no path"); goto err; } port = 1935; if(*e == ':'){ if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){ werrstr("invalid port"); goto err; } }else{ path = e; } while(*(++path) == '/'); s = smprint("tcp!%.*s!%d", (int)(e-s), s, port); f = dial(s, nil, nil, &ctl); free(s); if(f < 0) goto err; app = path; if((s = strchr(path, '/')) == nil){ werrstr("no path"); goto err; } if((e = strchr(s+1, '/')) != nil){ /* at this point it can be app instance if there is another slash following */ if((s = strchr(e+1, '/')) == nil){ /* no, just path leftovers */ s = e; } *s = 0; path = s+1; }else{ path = nil; } if(handshake(f) != 0) goto err; if((r = calloc(1, sizeof(*r))) == nil) sysfatal("memory"); if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil)) sysfatal("memory"); bextend(r, Bufsz); r->tcurl = url; Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf)); r->i = f; if(connect(r) != 0 || rtmprecv(r) != 0) goto err; return r; err: werrstr("rtmpdial: %r"); if(r != nil) rtmpclose(r); else if(f >= 0) close(f); free(url); return nil; } void rtmpclose(RTMP *r) { if(r == nil) return; free(r->path); free(r->b); close(r->i); Bterm(r); free(r); }