ref: 354c3b1554f91af178c28cf6e34ac1585df5b757
parent: e95674db1eb82210f12fb429cc848613e24e704f
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Mon Aug 2 14:05:12 EDT 2021
stream publish
--- a/main.c
+++ b/main.c
@@ -58,8 +58,17 @@
if((r = rtmpdial(argv[0])) == nil)
sysfatal("%r");
ulong sid;
- if(rtmpstream(r, &sid) == 0)
+ fprint(2, "asking for a stream\n");
+
+ if(rtmpstream(r, &sid) == 0){
fprint(2, "stream: %lud\n", sid);
+ if(rtmppublish(r, sid, PubLive, "live") == 0)
+ fprint(2, "stream published\n");
+ else
+ fprint(2, "stream publish failed: %r\n");
+ }else{
+ fprint(2, "stream failed\n");
+ }
while(1)
sleep(100);
--- a/rtmp.c
+++ b/rtmp.c
@@ -65,6 +65,7 @@
Bufsz = 64*1024,
};
+typedef struct Buffer Buffer;
typedef struct Command Command;
typedef struct Message Message;
@@ -90,20 +91,25 @@
Command cmd;
};
+struct Buffer {
+ Message msg;
+ u8int *b, *p, *e;
+ int bsz;
+};
+
struct RTMP {
Biobufhdr;
QLock;
+ Buffer i;
+ Buffer o;
Channel *c;
char *app;
char *path;
char *tcurl;
- Message msg;
- u8int *b, *p, *e;
int chunkin;
int chunkout;
int mode;
- int bsz;
- int i;
+ int fd;
int winacksz;
int bw;
u8int bwlimit;
@@ -114,24 +120,24 @@
u8int biobuf[Biobufsz];
};
-#define putnull() do{ r->p = a₀null(r->p, r->e); }while(0)
-#define puti16(i) do{ r->p = a₀i16(r->p, r->e, i); }while(0)
-#define puti24(i) do{ r->p = a₀i24(r->p, r->e, i); }while(0)
-#define puti32(i) do{ r->p = a₀i32(r->p, r->e, i); }while(0)
-#define putnum(v) do{ r->p = a₀num(r->p, r->e, v); }while(0)
-#define putstr(s) do{ r->p = a₀str(r->p, r->e, s); }while(0)
-#define putarr() do{ r->p = a₀arr(r->p, r->e); }while(0)
-#define putobj() do{ r->p = a₀obj(r->p, r->e); }while(0)
-#define putend() do{ r->p = a₀end(r->p, r->e); }while(0)
-#define putkvnum(name, v) do{ r->p = a₀kvnum(r->p, r->e, name, v); }while(0)
-#define putkvstr(name, s) do{ r->p = a₀kvstr(r->p, r->e, name, s); }while(0)
-#define putkvbool(name, s) do{ r->p = a₀kvbool(r->p, r->e, name, s); }while(0)
+#define putnull() do{ r->o.p = a₀null(r->o.p, r->o.e); }while(0)
+#define puti16(i) do{ r->o.p = a₀i16(r->o.p, r->o.e, i); }while(0)
+#define puti24(i) do{ r->o.p = a₀i24(r->o.p, r->o.e, i); }while(0)
+#define puti32(i) do{ r->o.p = a₀i32(r->o.p, r->o.e, i); }while(0)
+#define putnum(v) do{ r->o.p = a₀num(r->o.p, r->o.e, v); }while(0)
+#define putstr(s) do{ r->o.p = a₀str(r->o.p, r->o.e, s); }while(0)
+#define putarr() do{ r->o.p = a₀arr(r->o.p, r->o.e); }while(0)
+#define putobj() do{ r->o.p = a₀obj(r->o.p, r->o.e); }while(0)
+#define putend() do{ r->o.p = a₀end(r->o.p, r->o.e); }while(0)
+#define putkvnum(name, v) do{ r->o.p = a₀kvnum(r->o.p, r->o.e, name, v); }while(0)
+#define putkvstr(name, s) do{ r->o.p = a₀kvstr(r->o.p, r->o.e, name, s); }while(0)
+#define putkvbool(name, s) do{ r->o.p = a₀kvbool(r->o.p, r->o.e, name, s); }while(0)
#define putcommand(name, cb_) do { \
putstr(name); \
- putnum(r->msg.cmd.tid); \
+ putnum(r->o.msg.cmd.tid); \
putobj(); \
- r->msg.cmd.cb = cb_; \
+ r->o.msg.cmd.cb = cb_; \
}while(0)
static int szs[] = {
@@ -175,34 +181,48 @@
[LimitDynamic] = "dynamic",
};
+static char *pubtype2s[] = {
+ [PubLive] = "live",
+ [PubAppend] = "append",
+ [PubRecord] = "record",
+};
+
extern int debug;
static void
newmsg(RTMP *r, int type, int fmt, int cs)
{
- memset(&r->msg, 0, sizeof(r->msg));
+ memset(&r->o.msg, 0, sizeof(r->o.msg));
- r->msg.type = type;
- r->msg.fmt = fmt;
- r->msg.cs = cs;
- r->p = r->b;
+ r->o.msg.type = type;
+ r->o.msg.fmt = fmt;
+ r->o.msg.cs = cs;
+ r->o.p = r->o.b;
if(type == AMF0Command)
- r->msg.cmd.tid = ++r->cmds.tid;
+ r->o.msg.cmd.tid = ++r->cmds.tid;
+ else
+ r->o.msg.cmd.tid = 0;
}
static void
-bextend(RTMP *r, int bsz)
+notransaction(RTMP *r)
{
+ r->o.msg.cmd.tid = 0;
+}
+
+static void
+bextend(Buffer *b, int bsz)
+{
u8int *ob;
- if(r->bsz >= bsz)
+ if(b->bsz >= bsz)
return;
- ob = r->b;
- r->b = erealloc(r->b, bsz*2);
+ ob = b->b;
+ b->b = erealloc(b->b, bsz*2);
if(ob != nil)
- r->p = r->b + (intptr)(ob - r->p);
- r->bsz = bsz*2;
- r->e = r->b + r->bsz;
+ b->p = b->b + (intptr)(ob - b->p);
+ b->bsz = bsz*2;
+ b->e = b->b + b->bsz;
}
static int
@@ -212,30 +232,34 @@
u8int *h, *e, byte;
u32int ts;
- memset(&r->msg, 0, sizeof(r->msg));
+ memset(&r->i.msg, 0, sizeof(r->i.msg));
- r->p = r->b;
- if(readn(r->i, r->p, 1) != 1)
- goto eof;
- r->msg.fmt = (r->p[0] & 0xc0)>>6;
- r->msg.cs = r->p[0] & 0x3f;
- n = r->msg.cs + 1;
+ r->i.p = r->i.b;
+ if((n = readn(r->fd, &byte, 1)) != 1){
+ if(n == 0)
+ werrstr("eof");
+ goto err;
+ }
+
+ r->i.msg.fmt = (byte & 0xc0)>>6;
+ r->i.msg.cs = byte & 0x3f;
+ n = r->i.msg.cs + 1;
if(n <= 2){
- if(readn(r->i, r->p, n) != n)
- goto eof;
- r->msg.cs = 64 + r->p[0];
+ if(readn(r->fd, r->i.p, n) != n)
+ goto err;
+ r->i.msg.cs = 64 + r->i.p[0];
if(n == 2)
- r->msg.cs += 256 * r->p[1];
+ r->i.msg.cs += 256 * r->i.p[1];
}
- hsz = szs[r->msg.fmt];
- if(readn(r->i, r->p, hsz) != hsz)
- goto eof;
+ hsz = szs[r->i.msg.fmt];
+ if(readn(r->fd, r->i.p, hsz) != hsz)
+ goto err;
- h = r->p;
- e = r->p + hsz;
+ h = r->i.p;
+ e = r->i.p + hsz;
- r->msg.type = -1;
+ r->i.msg.type = -1;
msid = 0;
ts = 0;
len = 0;
@@ -244,7 +268,7 @@
if(hsz >= szs[Type1]){
h = a₀i24get(h, e, &len);
h = a₀byteget(h, e, &byte);
- r->msg.type = byte;
+ r->i.msg.type = byte;
if(hsz >= szs[Type0])
h = a₀i32leget(h, e, &msid);
}
@@ -251,26 +275,26 @@
}
if(ts == 0xffffff){ /* exntended timestamp */
- if(readn(r->i, h, 4) != 4)
+ if(readn(r->fd, h, 4) != 4)
goto err;
h = a₀i32get(h, h+4, (s32int*)&ts);
}
/* FIXME do all consecutive chunks use Type3? */
- bextend(r, len);
- r->msg.data = h;
- r->msg.sz = len;
+ bextend(&r->i, len);
+ r->i.msg.data = h;
+ r->i.msg.sz = len;
for(;;){
n = min(len, r->chunkin);
- if(readn(r->i, h, n) != n)
- goto eof;
+ if(readn(r->fd, h, n) != n)
+ goto err;
len -= n;
h += n;
if(len < 1)
break;
- if(readn(r->i, h, 1) != 1)
- goto eof;
- if((r->msg.cs | Type3<<6) != *h){
+ if(readn(r->fd, h, 1) != 1)
+ goto err;
+ if((r->i.msg.cs | Type3<<6) != *h){
werrstr("cs/fmt does not match: %02x", *h);
goto err;
}
@@ -277,8 +301,6 @@
}
return 0;
-eof:
- werrstr("eof");
err:
werrstr("rtmprecv: %r");
return -1;
@@ -290,24 +312,26 @@
u8int *p, *h, *e, hdata[24];
int len, n, hsz;
Command *c;
+ Message *m;
- if(r->p == nil)
+ if(r->o.p == nil)
goto err;
- r->msg.data = r->b;
- r->msg.sz = r->p - r->b;
+ m = &r->o.msg;
+ m->data = r->o.b;
+ m->sz = r->o.p - r->o.b;
h = hdata;
- *h++ = r->msg.fmt<<6 | r->msg.cs;
- hsz = szs[r->msg.fmt];
+ *h++ = m->fmt<<6 | m->cs;
+ hsz = szs[m->fmt];
e = h + hsz;
if(hsz >= szs[Type2]){
h = a₀i24(h, e, 0); /* FIXME put actual timestamps? */
if(hsz >= szs[Type1]){
- h = a₀i24(h, e, r->msg.sz);
- h = a₀byte(h, e, r->msg.type);
+ h = a₀i24(h, e, m->sz);
+ h = a₀byte(h, e, m->type);
if(hsz >= szs[Type0])
- h = a₀i32(h, e, r->msg.sid);
+ h = a₀i32(h, e, m->sid);
}
}
assert(h != nil);
@@ -315,7 +339,7 @@
if(Bwrite(r, hdata, h-hdata) < 0)
goto err;
- for(p = r->msg.data, len = r->msg.sz; len > 0;){
+ for(p = m->data, len = m->sz; len > 0;){
n = min(len, r->chunkout);
if(Bwrite(r, p, n) < 0)
goto err;
@@ -322,7 +346,7 @@
p += n;
len -= n;
if(len > 0){
- *h = r->msg.cs | Type3<<6;
+ *h = m->cs | Type3<<6;
Bputc(r, *h);
}
}
@@ -331,13 +355,13 @@
goto err;
if(debug){
- fprint(2, "← %M", &r->msg);
- if(r->msg.type == AMF0Command){
+ fprint(2, "← %M", m);
+ if(m->type == AMF0Command){
A₀ *a;
u8int *s, *e;
fprint(2, ":");
- s = r->msg.data;
- e = s + r->msg.sz;
+ s = m->data;
+ e = s + m->sz;
for(; s != nil && s != e;){
if((s = a₀parse(&a, s, e)) != nil)
fprint(2, " %A", a);
@@ -349,9 +373,9 @@
fprint(2, "\n");
}
- if(r->msg.type == AMF0Command){
+ if(m->type == AMF0Command){
c = emalloc(sizeof(*c));
- *c = r->msg.cmd;
+ *c = m->cmd;
assert(c->cb != nil);
if((c->next = r->cmds.w) != nil)
c->next->prev = c;
@@ -368,7 +392,8 @@
rtmpfree(RTMP *r)
{
free(r->app);
- free(r->b);
+ free(r->i.b);
+ free(r->o.b);
free(r->path);
free(r->tcurl);
if(r->c != nil){
@@ -414,7 +439,7 @@
RTMP *r;
r = aux;
- m = &r->msg;
+ m = &r->i.msg;
res = 0;
memset(a, 0, sizeof(a));
for(;;){
@@ -422,8 +447,6 @@
a₀free(a[n]);
memset(a, 0, sizeof(a));
- qlock(r);
-
if(res != 0 || (res = rtmprecv(r)) != 0){
if(debug)
fprint(2, "rtmp loop: %r\n");
@@ -432,11 +455,13 @@
break;
}
- s = r->msg.data;
- e = s + r->msg.sz;
+ s = m->data;
+ e = s + m->sz;
+ qlock(r);
+
if(debug)
- fprint(2, "→ %M", &r->msg);
+ fprint(2, "→ %M", m);
switch(m->type){
case AMF0Command:
@@ -461,23 +486,23 @@
goto err;
}
for(c = r->cmds.w; c != nil && c->tid != a[n]->num; c = c->next);
- if(c == nil){
- werrstr("response to non-existent transaction %d", (int)a[n]->num);
- goto err;
- }
+ if(c == nil)
+ fprint(2, "response to non-existent transaction %d", (int)a[n]->num);
break;
}
}
if(debug)
fprint(2, " tid=%A: %A %A %A\n", a[CbTransID], a[CbCommand], a[CbObject], a[CbResponse]);
- if(c->prev != nil)
- c->prev->next = c->next;
- if(c->next != nil)
- c->next->prev = c->prev;
- if(r->cmds.w == c)
- r->cmds.w = c->next;
- c->cb(r, ok, a, c->aux);
- free(c);
+ if(c != nil){
+ if(c->prev != nil)
+ c->prev->next = c->next;
+ if(c->next != nil)
+ c->next->prev = c->prev;
+ if(r->cmds.w == c)
+ r->cmds.w = c->next;
+ c->cb(r, ok, a, c->aux);
+ free(c);
+ }
break;
case SetChunkSize:
@@ -555,7 +580,6 @@
qunlock(r);
}
- qunlock(r);
rtmpfree(r);
threadexitsall(res == 0 ? nil : "error");
@@ -617,16 +641,17 @@
Channel *c;
int n;
+ c = chancreate(sizeof(ulong), 0);
+
qlock(r);
newmsg(r, AMF0Command, Type0, CSCtl);
putstr("createStream");
- putnum(r->msg.cmd.tid);
+ putnum(r->o.msg.cmd.tid);
putnull();
- c = chancreate(sizeof(ulong), 0);
- r->msg.cmd.cb = streamcreated;
- r->msg.cmd.aux = c;
+ r->o.msg.cmd.cb = streamcreated;
+ r->o.msg.cmd.aux = c;
n = rtmpsend(r);
qunlock(r);
@@ -638,15 +663,82 @@
}
static void
+streampublished(RTMP *, int ok, A₀ *a[NumCb], void *aux)
+{
+ Channel *err;
+
+ err = aux;
+ if(strcmp(a[CbCommand]->str, "onStatus") != 0)
+ fprint(2, "streampublished: expected 'onStatus', got %#q\n", a[CbCommand]->str);
+ else if(a[CbResponse]->type != Tobj)
+ fprint(2, "streampublished: expected object, got something else\n");
+ else if(ok)
+ sendp(err, nil);
+
+ chanclose(err);
+}
+
+int
+rtmppublish(RTMP *r, ulong sid, int type, char *name)
+{
+ Channel *c;
+ char *e;
+ int n;
+
+ if(type < 0 || type >= nelem(pubtype2s)){
+ werrstr("invalid publish type %d", type);
+ return -1;
+ }
+ if(name == nil)
+ name = "";
+
+ c = chancreate(sizeof(char*), 0);
+
+ qlock(r);
+
+ newmsg(r, AMF0Command, Type0, CSCtl);
+ notransaction(r);
+ putstr("publish");
+ putnum(0);
+ putnull();
+ putstr(name);
+ putstr(pubtype2s[type]);
+
+ r->o.msg.cmd.cb = streampublished;
+ r->o.msg.cmd.aux = c;
+ r->o.msg.sid = sid;
+ n = rtmpsend(r);
+
+ qunlock(r);
+
+ e = nil;
+ n = (n == 0 && recv(c, &e) == 1) ? 0 : -1;
+ chanfree(c);
+
+ if(e != nil){
+ werrstr("%s", e);
+ free(c);
+ }
+
+ return (n == 0 && e == nil) ? 0 : -1;
+}
+
+static void
connected(RTMP *r, int ok, A₀ *a[NumCb], void *)
{
- if(strcmp(a[CbCommand]->str, "_result") != 0)
- sendp(r->c, smprint("expected '_result', got %#q", a[CbCommand]->str));
- else{
- sendp(r->c, ok ? nil : smprint("%A", a[CbResponse]));
- if(ok)
+ char *s;
+
+ s = nil;
+ if(ok){
+ if(strcmp(a[CbCommand]->str, "_result") != 0)
+ s = smprint("expected '_result', got %#q", a[CbCommand]->str);
+ else
setchunksz(r, ChunkDesired);
+ }else{
+ s = smprint("%A", a[CbResponse]);
}
+
+ sendp(r->c, s);
}
static int
@@ -746,6 +838,7 @@
*s = 0;
path = s+1;
}else{
+ *s = 0;
path = nil;
}
@@ -753,7 +846,7 @@
goto err;
r = ecalloc(1, sizeof(*r));
- r->i = f;
+ r->fd = f;
r->chunkin = ChunkDefault;
r->chunkout = ChunkDefault;
r->tcurl = url;
@@ -761,7 +854,8 @@
r->c = chancreate(sizeof(void*), 0);
r->app = estrdup(app);
r->path = path == nil ? nil : estrdup(path);
- bextend(r, Bufsz);
+ bextend(&r->i, Bufsz);
+ bextend(&r->o, Bufsz);
Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
if(connect(r) != 0)
@@ -794,8 +888,8 @@
{
if(r == nil)
return;
- if(r->i >= 0)
- close(r->i);
+ if(r->fd >= 0)
+ close(r->fd);
if(r->c != nil)
chanclose(r->c);
}
--- a/rtmp.h
+++ b/rtmp.h
@@ -4,5 +4,12 @@
int rtmpstream(RTMP *r, ulong *sid);
+enum {
+ PubLive,
+ PubAppend,
+ PubRecord,
+};
+int rtmppublish(RTMP *r, ulong sid, int type, char *name);
+
RTMP *rtmpdial(char *url);
void rtmpclose(RTMP *r);