shithub: rtmp

ref: f6d84677e9cd1cd4725391a577295ff561e09443
dir: /rtmp.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf.h"
#include "ivf.h"
#include "rtmp.h"

#define min(a,b) ((a)<(b)?(a):(b))

enum {
	Port = 1935,

	Sigsz = 1536,
	Chunk = 128,

	ChanCtl = 3,

	SzTiny = 1,
	SzSmall = 4,
	SzMedium = 8,
	SzLarge = 12,

	PktInvoke = 20,

	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
	Bufsz = 64*1024,
};

typedef struct Packet Packet;

struct Packet {
	int type;
	int hsz;
	int chan;
	u32int ts;
	u8int *data;
	int sz;
	int left;
	Packet *prev;
	Packet *next;
};

struct RTMP {
	Biobufhdr;
	char *app;
	char *path;
	char *tcurl;
	Packet pk;
	Packet *ch;
	u8int *b, *p, *e;
	int mode;
	int bsz;
	int invokes;
	int i;
	u8int biobuf[Biobufsz];
};

#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)

int rtmpdump = 0;
extern int debug;

static Packet *
pk4chan(RTMP *r, int chan)
{
	Packet *p;

	for(p = r->ch; p != nil && p->chan != chan; p = p->next);

	if(p == nil){
		if((p = calloc(1, sizeof(*p))) == nil)
			sysfatal("memory");
		p->type = -1;
		p->chan = chan;
		if((p->next = r->ch) != nil)
			r->ch->prev = p;
	}

	return p;
}

static void
pkfree(RTMP *r, Packet *p)
{
	if(p->prev != nil)
		p->prev->next = p->next;
	if(p->next != nil)
		p->next->prev = p->prev;
	if(r->ch == p)
		r->ch = p->next;

	free(p->data);
}

static void
newpacket(RTMP *r, int type, int hsz, int chan)
{
	memset(&r->pk, 0, sizeof(r->pk));

	r->pk.type = type;
	r->pk.hsz = hsz;
	r->pk.chan = chan;
	r->p = r->b + hsz;
	r->b[0] = chan;

	switch(hsz){
	case SzTiny: r->b[0] |= 3<<6; break;
	case SzSmall: r->b[0] |= 2<<6; break;
	case SzMedium: r->b[0] |= 1<<6; break;
	}
}

static void
bextend(RTMP *r, int bsz)
{
	u8int *ob;

	if(r->bsz >= bsz)
		return;
	ob = r->b;
	if((r->b = realloc(r->b, bsz*2)) == nil)
		sysfatal("memory");
	if(ob != nil)
		r->p = r->b + (intptr)(ob - r->p);
	r->bsz *= 2;
	r->e = r->b + r->bsz;
}

static int
handshake(int f)
{
	u8int cl[1+Sigsz], sv[1+Sigsz];

	cl[0] = 3; /* no encryption */
	memset(cl+1, 0, 8);
	prng(cl+1+8, Sigsz-8);
	if(write(f, cl, sizeof(cl)) != sizeof(cl))
		goto err;
	if(readn(f, sv, sizeof(sv)) != sizeof(sv))
		goto err;
	if(cl[0] != sv[0]){
		werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
		goto err;
	}
	if(write(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(readn(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(memcmp(cl, sv, sizeof(cl)) != 0){
		werrstr("signature mismatch");
		goto err;
	}

	return 0;

err:
	werrstr("handshake: %r");
	return -1;
}

static int
rtmprecv(RTMP *r)
{
	u8int *h, *e, type;
	int hsz, chan, bodysz, dummy;
	u32int ts;

	r->p = r->b;
	if(readn(r->i, r->p, 1) != 1)
		goto err;
	hsz = (r->p[0] & 0xc0)>>6;
	chan = r->p[0] & 0x3f;
	if(debug)
		fprint(2, "hsz=%d chan=%d\n", hsz, chan);
	if(readn(r->i, r->p+1, hsz-1) != hsz-1)
		goto err;

	memset(&r->pk, 0, sizeof(r->pk));
	r->pk.hsz = hsz;
	r->pk.chan = chan;

	h = r->p + 1;
	e = r->p + hsz;
	r->pk.type = -1;
	bodysz = 0;
	if(hsz >= SzSmall){
		h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
		if(hsz >= SzMedium){
			h = amfi24get(h, e, &bodysz);
			h = amfbyteget(h, e, &type);
			r->pk.type = type;
			if(hsz >= SzLarge)
				h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
		}
	}

	if(ts == 0xffffff){ /* exntended timestamp */
		if(readn(r->i, h, 4) != 4)
			goto err;
		h = amfi32get(h, h+4, (s32int*)&ts);
	}

	bextend(r, bodysz);
	if(readn(r->i, h, bodysz) != bodysz)
		goto err;
	h += bodysz;

	return 0;

err:
	werrstr("rtmprecv: %r");
	return -1;
}

static int
rtmpsend(RTMP *r)
{
	int bodysz, n, hsz;
	u8int *p, *h, *e;

	assert(r->p != nil);

	bodysz = r->p - r->b - r->pk.hsz;
	/* FIXME special case when bodysz is 0 */
	h = r->b;
	e = h + r->pk.hsz;
	h++;
	if(r->pk.hsz >= SzSmall){
		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
		if(r->pk.hsz >= SzMedium){
			h = amfi24(h, e, bodysz);
			h = amfbyte(h, e, r->pk.type);
			if(r->pk.hsz >= SzLarge)
				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
		}
	}
	if(h == nil)
		goto err;
	memset(h, 0, e-h);

	p = r->b;
	hsz = e - r->b;
	for(; hsz+bodysz > 0;){
		n = min(bodysz, Chunk);
		if(Bwrite(r, p, hsz+n) < 0)
			goto err;
		if(rtmpdump)
			write(1, p, hsz+n);
		bodysz -= n;
		p += hsz+n;
		hsz = 0;
		if(bodysz > 0){
			*(--p) = 0xc0 | r->b[0];
			hsz = 1;
		}
	}

	r->p = nil;
	Bflush(r);

	return 0;
err:
	werrstr("rtmpsend: %r");
	return -1;
}

static int
connect(RTMP *r)
{
	newpacket(r, PktInvoke, SzLarge, ChanCtl);
	putstr("connect");
		putnum(++r->invokes);
	putobj();
		putkvstr("app", r->app);
		putkvstr("tcUrl", r->tcurl);
		if(r->mode & OWRITE)
			putkvstr("type", "nonprivate");
		else{
			putkvbool("fpad", 0);
			putkvnum("capabilities", 15);
			putkvnum("audioCodecs", 3191);
			putkvnum("videoCodecs", 252);
			putkvnum("videoFunction", 1);
		}
	putend();

	return rtmpsend(r);
}

RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
	char *s, *e, *path, *app;
	int f, port, ctl;
	RTMP *r;

	r = nil;
	f = -1;
	url = strdup(url); /* since we're changing it in-place */
	if(memcmp(url, "rtmp://", 7) != 0){
		werrstr("invalid url");
		goto err;
	}
	s = url + 7;
	if((e = strpbrk(s, ":/")) == nil){
		werrstr("no path");
		goto err;
	}
	port = 1935;
	if(*e == ':'){
		if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){
			werrstr("invalid port");
			goto err;
		}
	}else{
		path = e;
	}
	while(*(++path) == '/');

	s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
	f = dial(s, nil, nil, &ctl);
	free(s);
	if(f < 0)
		goto err;

	app = path;
	if((s = strchr(path, '/')) == nil){
		werrstr("no path");
		goto err;
	}
	if((e = strchr(s+1, '/')) != nil){
		/* at this point it can be app instance if there is another slash following */
		if((s = strchr(e+1, '/')) == nil){
			/* no, just path leftovers */
			s = e;
		}
		*s = 0;
		path = s+1;
	}else{
		path = nil;
	}

	if(handshake(f) != 0)
		goto err;
	if((r = calloc(1, sizeof(*r))) == nil)
		sysfatal("memory");
	if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil))
		sysfatal("memory");
	bextend(r, Bufsz);
	r->tcurl = url;
	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
	r->i = f;
	if(connect(r) != 0 || rtmprecv(r) != 0)
		goto err;

	return r;

err:
	werrstr("rtmpdial: %r");
	if(r != nil)
		rtmpclose(r);
	else if(f >= 0)
		close(f);
	free(url);
	return nil;
}

void
rtmpclose(RTMP *r)
{
	if(r == nil)
		return;
	free(r->path);
	free(r->b);
	close(r->i);
	Bterm(r);
	free(r);
}