ref: 996f6e5c036260a18fd4c46405027ef4a23a7923
parent: f867fbf66e9843d74161ec57fe12980feb9aedce
author: Sigrid Solveig Haflínudóttir <[email protected]>
date: Thu Jul 29 09:18:50 EDT 2021
createStream
--- a/amf0.c
+++ b/amf0.c
@@ -228,6 +228,7 @@
case Tarr:
for(i = 0; i < a->arr.n; i++)
amf0free(a->arr.v[i]);
+ case Tnull:
case Tnum:
case Tbool:
break;
@@ -258,7 +259,6 @@
case Anull:
a->type = Tnull;
break;
-
case Anum:
atleast("num", 8);
for(n = 0, x.u = 0; n < 8; n++)
@@ -376,6 +376,9 @@
break;
case Tbool:
fmtprint(f, a->bool ? "true" : "false");
+ break;
+ case Tnull:
+ fmtprint(f, "null");
break;
default:
sysfatal("unknown amf0 type %d", a->type);
--- a/amf0.h
+++ b/amf0.h
@@ -1,10 +1,10 @@
enum {
- Tnull,
Tstr,
Tnum,
Tbool,
Tarr,
Tobj,
+ Tnull,
};
typedef struct Amf0 Amf0;
--- a/main.c
+++ b/main.c
@@ -57,6 +57,9 @@
srand(time(nil));
if((r = rtmpdial(argv[0])) == nil)
sysfatal("%r");
+ ulong sid;
+ if(rtmpstream(r, &sid) == 0)
+ fprint(2, "stream: %lud\n", sid);
while(1)
sleep(100);
--- a/rtmp.c
+++ b/rtmp.c
@@ -83,6 +83,7 @@
int type;
int fmt;
int cs;
+ int sid;
u32int ts;
u8int *data;
int sz;
@@ -91,6 +92,7 @@
struct RTMP {
Biobufhdr;
+ QLock;
Channel *c;
char *app;
char *path;
@@ -112,6 +114,7 @@
u8int biobuf[Biobufsz];
};
+#define putnull() do{ r->p = amf0null(r->p, r->e); }while(0)
#define puti16(i) do{ r->p = amf0i16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amf0i24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amf0i32(r->p, r->e, i); }while(0)
@@ -213,13 +216,13 @@
r->p = r->b;
if(readn(r->i, r->p, 1) != 1)
- goto err;
+ goto eof;
r->msg.fmt = (r->p[0] & 0xc0)>>6;
r->msg.cs = r->p[0] & 0x3f;
n = r->msg.cs + 1;
if(n <= 2){
if(readn(r->i, r->p, n) != n)
- goto err;
+ goto eof;
r->msg.cs = 64 + r->p[0];
if(n == 2)
r->msg.cs += 256 * r->p[1];
@@ -227,7 +230,7 @@
hsz = szs[r->msg.fmt];
if(readn(r->i, r->p, hsz) != hsz)
- goto err;
+ goto eof;
h = r->p;
e = r->p + hsz;
@@ -260,7 +263,7 @@
for(;;){
n = min(len, r->chunkin);
if(readn(r->i, h, n) != n)
- goto err;
+ goto eof;
len -= n;
h += n;
if(len < 1)
@@ -274,7 +277,8 @@
}
return 0;
-
+eof:
+ werrstr("eof");
err:
werrstr("rtmprecv: %r");
return -1;
@@ -303,7 +307,7 @@
h = amf0i24(h, e, r->msg.sz);
h = amf0byte(h, e, r->msg.type);
if(hsz >= szs[Type0])
- h = amf0i32(h, e, 0); /* FIXME message stream id */
+ h = amf0i32(h, e, r->msg.sid);
}
}
assert(h != nil);
@@ -418,6 +422,8 @@
amf0free(a[n]);
memset(a, 0, sizeof(a));
+ qlock(r);
+
if(res != 0 || (res = rtmprecv(r)) != 0){
if(debug)
fprint(2, "rtmp loop: %r\n");
@@ -442,7 +448,7 @@
switch(n){
case CbCommand:
if(a[n]->type != Tstr){
- werrstr("command name is not a string");
+ werrstr("command name is not a string: %A", a[n]);
goto err;
}
if(strcmp(a[n]->str, "_error") == 0)
@@ -545,9 +551,13 @@
res = -1;
break;
}
+
+ qunlock(r);
}
+ qunlock(r);
rtmpfree(r);
+
threadexitsall(res == 0 ? nil : "error");
}
@@ -581,6 +591,50 @@
err:
werrstr("handshake: %r");
return -1;
+}
+
+static void
+streamcreated(RTMP *, int ok, Amf0 *a[NumCb], void *aux)
+{
+ Channel *sid;
+
+ sid = aux;
+ if(strcmp(a[CbCommand]->str, "_result") != 0)
+ fprint(2, "createStream: expected '_result', got %#q\n", a[CbCommand]->str);
+ else if(a[CbResponse]->type != Tnum)
+ fprint(2, "createStream: expected stream ID, got NaN\n");
+ else if(!ok)
+ fprint(2, "createStream: %A\n", a[CbResponse]);
+ else
+ sendul(sid, (ulong)a[CbResponse]->num);
+
+ chanclose(sid);
+}
+
+int
+rtmpstream(RTMP *r, ulong *sid)
+{
+ Channel *c;
+ int n;
+
+ qlock(r);
+
+ newmsg(r, AMF0Command, Type0, CSCtl);
+ putstr("createStream");
+ putnum(r->msg.cmd.tid);
+ putnull();
+
+ c = chancreate(sizeof(ulong), 0);
+ r->msg.cmd.cb = streamcreated;
+ r->msg.cmd.aux = c;
+ n = rtmpsend(r);
+
+ qunlock(r);
+
+ n = (n == 0 && recv(c, sid) == 1) ? 0 : -1;
+ chanfree(c);
+
+ return n;
}
static void
--- a/rtmp.h
+++ b/rtmp.h
@@ -2,5 +2,7 @@
#pragma incomplete RTMP
+int rtmpstream(RTMP *r, ulong *sid);
+
RTMP *rtmpdial(char *url);
void rtmpclose(RTMP *r);