aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMistivia <i@mistivia.com>2025-12-20 05:09:26 +0800
committerMistivia <i@mistivia.com>2025-12-20 05:25:07 +0800
commit038c58b0ac053dbfe8aea3faab73863000d5fdc9 (patch)
tree5a7e477a5f75d76be7adc9dcaab82347bac82ae7
parentbb725cc3802b42992666e18d0758459e4332cdd7 (diff)
use srt instead of rtmp
-rw-r--r--.gitignore2
-rw-r--r--Makefile2
-rw-r--r--README.md8
-rw-r--r--amf.cpp429
-rw-r--r--amf.h167
-rw-r--r--main.c31
-rw-r--r--rtmp.h74
-rw-r--r--rtmpserver.cpp862
-rw-r--r--rtmputils.cpp67
-rw-r--r--rtmputils.h49
-rw-r--r--srtserver.c127
-rw-r--r--srtserver.h (renamed from rtmpserver.h)7
-rw-r--r--transcode_talker.c4
13 files changed, 167 insertions, 1662 deletions
diff --git a/.gitignore b/.gitignore
index 8dd1137..13d6daf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,5 @@ config
rootfs
conf
*.tar.gz
+podman-start.sh
+config.back
diff --git a/Makefile b/Makefile
index 95e41c2..c47dc4c 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,7 @@ CC := gcc
CXX := g++
CFLAGS := -g -Wall
CXXFLAGS := -g -Wall -std=c++14
-LDFLAGS := -g \
+LDFLAGS := -g -lsrt \
-lavformat -lavutil -lavcodec \
-laws-cpp-sdk-core -laws-cpp-sdk-s3
diff --git a/README.md b/README.md
index 5a0ac7a..36016aa 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
EZLive is a minimal self-hosted livestream solution built on top of S3-compatible object storage.
-It runs a local RTMP server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream.
+It runs a local SRT server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream.
# Build
@@ -24,7 +24,7 @@ Then create a config file `config`:
```
listening_addr=127.0.0.1
-listening_port=1935
+listening_port=61935
bucket=YOUR_BUCKET_NAME
endpoint=https://your-s3.com
s3_path=ezlive/
@@ -74,7 +74,7 @@ Start EZLive:
./ezlive
```
-Open OBS, streaming to `rtmp://127.0.0.1/live`, no streaming key needed. The streaming format must be H.264 + AAC.
+Open OBS, streaming to `srt://127.0.0.1:61935`, with streaming key. The streaming format must be H.264 + AAC.
Then use a HLS player to load `https://YOUR_BUCKET_NAME.your-s3.com/ezlive/stream.m3u8` to watch the stream.
@@ -123,6 +123,4 @@ To start using, unzip the windows tarball, create a `config` file in the same di
# Credits
-The built-in RTMP server is modified from [pine](https://github.com/deboot/pine).
-
Thank [@uonr](https://github.com/uonr) for making nix flake.
diff --git a/amf.cpp b/amf.cpp
deleted file mode 100644
index 2794419..0000000
--- a/amf.cpp
+++ /dev/null
@@ -1,429 +0,0 @@
-#include "amf.h"
-#include "rtmputils.h"
-
-#include <stdexcept>
-#include <string.h>
-#include <arpa/inet.h>
-
-namespace {
-
-uint8_t peek(const Decoder *dec)
-{
- if (dec->pos >= dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- return uint8_t(dec->buf[dec->pos]);
-}
-
-uint8_t get_byte(Decoder *dec)
-{
- if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) {
- debug("entering AMF3 mode\n");
- dec->pos++;
- dec->version = 3;
- }
-
- if (dec->pos >= dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- return uint8_t(dec->buf[dec->pos++]);
-}
-
-}
-
-AMFValue::AMFValue(AMFType type) :
- m_type(type)
-{
- switch (m_type) {
- case AMF_OBJECT:
- case AMF_ECMA_ARRAY:
- m_value.object = new amf_object_t;
- break;
- case AMF_NUMBER:
- case AMF_INTEGER:
- case AMF_NULL:
- case AMF_UNDEFINED:
- break;
- default:
- assert(0);
- }
-}
-
-AMFValue::AMFValue(const std::string &s) :
- m_type(AMF_STRING)
-{
- m_value.string = new std::string(s);
-}
-
-AMFValue::AMFValue(double n) :
- m_type(AMF_NUMBER)
-{
- m_value.number = n;
-}
-
-AMFValue::AMFValue(int i) :
- m_type(AMF_INTEGER)
-{
- m_value.integer = i;
-}
-
-AMFValue::AMFValue(bool b) :
- m_type(AMF_BOOLEAN)
-{
- m_value.boolean = b;
-}
-
-AMFValue::AMFValue(const amf_object_t &object) :
- m_type(AMF_OBJECT)
-{
- m_value.object = new amf_object_t(object);
-}
-
-AMFValue::AMFValue(const AMFValue &from) :
- m_type(AMF_NULL)
-{
- *this = from;
-}
-
-AMFValue::~AMFValue()
-{
- destroy();
-}
-
-void AMFValue::destroy()
-{
- switch (m_type) {
- case AMF_STRING:
- delete m_value.string;
- break;
- case AMF_OBJECT:
- case AMF_ECMA_ARRAY:
- delete m_value.object;
- break;
- case AMF_NULL:
- case AMF_NUMBER:
- case AMF_INTEGER:
- case AMF_BOOLEAN:
- case AMF_UNDEFINED:
- break;
- }
-}
-
-void AMFValue::operator = (const AMFValue &from)
-{
- destroy();
- m_type = from.m_type;
- switch (m_type) {
- case AMF_STRING:
- m_value.string = new std::string(*from.m_value.string);
- break;
- case AMF_OBJECT:
- case AMF_ECMA_ARRAY:
- m_value.object = new amf_object_t(*from.m_value.object);
- break;
- case AMF_NUMBER:
- m_value.number = from.m_value.number;
- break;
- case AMF_INTEGER:
- m_value.integer = from.m_value.integer;
- break;
- case AMF_BOOLEAN:
- m_value.boolean = from.m_value.boolean;
- break;
- default:
- break;
- }
-}
-
-void amf_write(Encoder *enc, const std::string &s)
-{
- enc->buf += char(AMF0_STRING);
- uint16_t str_len = htons(s.size());
- enc->buf.append((char *) &str_len, 2);
- enc->buf += s;
-}
-
-void amf_write(Encoder *enc, int i)
-{
- throw std::runtime_error("AMF0 does not have integers");
-}
-
-void amf_write(Encoder *enc, double n)
-{
- enc->buf += char(AMF0_NUMBER);
- uint64_t encoded = 0;
-//#if defined(__i386__) || defined(__x86_64__)
- /* Flash uses same floating point format as x86 */
- memcpy(&encoded, &n, 8);
-//#endif
- uint32_t val = htonl(encoded >> 32);
- enc->buf.append((char *) &val, 4);
- val = htonl(encoded);
- enc->buf.append((char *) &val, 4);
-}
-
-void amf_write(Encoder *enc, bool b)
-{
- enc->buf += char(AMF0_BOOLEAN);
- enc->buf += char(b);
-}
-
-void amf_write_key(Encoder *enc, const std::string &s)
-{
- uint16_t str_len = htons(s.size());
- enc->buf.append((char *) &str_len, 2);
- enc->buf += s;
-}
-
-void amf_write(Encoder *enc, const amf_object_t &object)
-{
- enc->buf += char(AMF0_OBJECT);
- for (amf_object_t::const_iterator i = object.begin();
- i != object.end(); ++i) {
- amf_write_key(enc, i->first);
- amf_write(enc, i->second);
- }
- amf_write_key(enc, "");
- enc->buf += char(AMF0_OBJECT_END);
-}
-
-void amf_write_ecma(Encoder *enc, const amf_object_t &object)
-{
- enc->buf += char(AMF0_ECMA_ARRAY);
- uint32_t zero = 0;
- enc->buf.append((char *) &zero, 4);
- for (amf_object_t::const_iterator i = object.begin();
- i != object.end(); ++i) {
- amf_write_key(enc, i->first);
- amf_write(enc, i->second);
- }
- amf_write_key(enc, "");
- enc->buf += char(AMF0_OBJECT_END);
-}
-
-void amf_write_null(Encoder *enc)
-{
- enc->buf += char(AMF0_NULL);
-}
-
-void amf_write(Encoder *enc, const AMFValue &value)
-{
- switch (value.type()) {
- case AMF_STRING:
- amf_write(enc, value.as_string());
- break;
- case AMF_NUMBER:
- amf_write(enc, value.as_number());
- break;
- case AMF_INTEGER:
- amf_write(enc, value.as_integer());
- break;
- case AMF_BOOLEAN:
- amf_write(enc, value.as_boolean());
- break;
- case AMF_OBJECT:
- amf_write(enc, value.as_object());
- break;
- case AMF_ECMA_ARRAY:
- amf_write_ecma(enc, value.as_object());
- break;
- case AMF_NULL:
- amf_write_null(enc);
- break;
- default:
- break;
- }
-}
-
-unsigned int load_amf3_integer(Decoder *dec)
-{
- unsigned int value = 0;
- for (int i = 0; i < 4; ++i) {
- uint8_t b = get_byte(dec);
- if (i == 3) {
- /* use all bits from 4th byte */
- value = (value << 8) | b;
- break;
- }
- value = (value << 7) | (b & 0x7f);
- if ((b & 0x80) == 0)
- break;
- }
- return value;
-}
-
-std::string amf_load_string(Decoder *dec)
-{
- size_t str_len = 0;
- uint8_t type = get_byte(dec);
- if (dec->version == 3) {
- if (type != AMF3_STRING) {
- throw std::runtime_error("Expected a string");
- }
- str_len = load_amf3_integer(dec) / 2;
-
- } else {
- if (type != AMF0_STRING) {
- throw std::runtime_error("Expected a string");
- }
- if (dec->pos + 2 > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- str_len = load_be16(&dec->buf[dec->pos]);
- dec->pos += 2;
- }
- if (dec->pos + str_len > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- std::string s(dec->buf, dec->pos, str_len);
- dec->pos += str_len;
- return s;
-}
-
-double amf_load_number(Decoder *dec)
-{
- if (get_byte(dec) != AMF0_NUMBER) {
- throw std::runtime_error("Expected a string");
- }
- if (dec->pos + 8 > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) |
- load_be32(&dec->buf[dec->pos + 4]);
- double n = 0;
-//#if defined(__i386__) || defined(__x86_64__)
- /* Flash uses same floating point format as x86 */
- memcpy(&n, &val, 8);
-//#endif
- dec->pos += 8;
- return n;
-}
-
-int amf_load_integer(Decoder *dec)
-{
- if (dec->version == 3) {
- return load_amf3_integer(dec);
- } else {
- return amf_load_number(dec);
- }
-}
-
-bool amf_load_boolean(Decoder *dec)
-{
- if (get_byte(dec) != AMF0_BOOLEAN) {
- throw std::runtime_error("Expected a boolean");
- }
- return get_byte(dec) != 0;
-}
-
-std::string amf_load_key(Decoder *dec)
-{
- if (dec->pos + 2 > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- size_t str_len = load_be16(&dec->buf[dec->pos]);
- dec->pos += 2;
- if (dec->pos + str_len > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- std::string s(dec->buf, dec->pos, str_len);
- dec->pos += str_len;
- return s;
-}
-
-amf_object_t amf_load_object(Decoder *dec)
-{
- amf_object_t object;
- if (get_byte(dec) != AMF0_OBJECT) {
- throw std::runtime_error("Expected an object");
- }
- while (1) {
- std::string key = amf_load_key(dec);
- if (key.empty())
- break;
- AMFValue value = amf_load(dec);
- object.insert(std::make_pair(key, value));
- }
- if (get_byte(dec) != AMF0_OBJECT_END) {
- throw std::runtime_error("expected object end");
- }
- return object;
-}
-
-amf_object_t amf_load_ecma(Decoder *dec)
-{
- /* ECMA array is the same as object, with 4 extra zero bytes */
- amf_object_t object;
- if (get_byte(dec) != AMF0_ECMA_ARRAY) {
- throw std::runtime_error("Expected an ECMA array");
- }
- if (dec->pos + 4 > dec->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- dec->pos += 4;
- while (1) {
- std::string key = amf_load_key(dec);
- if (key.empty())
- break;
- AMFValue value = amf_load(dec);
- object.insert(std::make_pair(key, value));
- }
- if (get_byte(dec) != AMF0_OBJECT_END) {
- throw std::runtime_error("expected object end");
- }
- return object;
-}
-
-AMFValue amf_load(Decoder *dec)
-{
- uint8_t type = peek(dec);
- if (dec->version == 3) {
- switch (type) {
- case AMF3_STRING:
- return AMFValue(amf_load_string(dec));
- case AMF3_NUMBER:
- return AMFValue(amf_load_number(dec));
- case AMF3_INTEGER:
- return AMFValue(amf_load_integer(dec));
- case AMF3_FALSE:
- dec->pos++;
- return AMFValue(false);
- case AMF3_TRUE:
- dec->pos++;
- return AMFValue(true);
- case AMF3_OBJECT:
- return AMFValue(amf_load_object(dec));
- case AMF3_ARRAY:
- return AMFValue(amf_load_ecma(dec));
- case AMF3_NULL:
- dec->pos++;
- return AMFValue(AMF_NULL);
- case AMF3_UNDEFINED:
- dec->pos++;
- return AMFValue(AMF_UNDEFINED);
- default:
- throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type));
- }
- } else {
- switch (type) {
- case AMF0_STRING:
- return AMFValue(amf_load_string(dec));
- case AMF0_NUMBER:
- return AMFValue(amf_load_number(dec));
- case AMF0_BOOLEAN:
- return AMFValue(amf_load_boolean(dec));
- case AMF0_OBJECT:
- return AMFValue(amf_load_object(dec));
- case AMF0_ECMA_ARRAY:
- return AMFValue(amf_load_ecma(dec));
- case AMF0_NULL:
- dec->pos++;
- return AMFValue(AMF_NULL);
- case AMF0_UNDEFINED:
- dec->pos++;
- return AMFValue(AMF_UNDEFINED);
- default:
- throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type));
- }
- }
-}
diff --git a/amf.h b/amf.h
deleted file mode 100644
index 6d5738e..0000000
--- a/amf.h
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * @file amf.h
- * @author Dean Zou (zoudingyuan@junjietech.com)
- * @brief
- * @version 1.0
- * @date 2019-06-27
- *
- * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
- *
- */
-
-#ifndef AMF_H_
-#define AMF_H_
-
-#include <string>
-#include <map>
-#include <assert.h>
-
-enum AMFType {
- AMF_NUMBER,
- AMF_INTEGER,
- AMF_BOOLEAN,
- AMF_STRING,
- AMF_OBJECT,
- AMF_NULL,
- AMF_UNDEFINED,
- AMF_ECMA_ARRAY,
-};
-
-enum {
- AMF0_NUMBER,
- AMF0_BOOLEAN,
- AMF0_STRING,
- AMF0_OBJECT,
- AMF0_MOVIECLIP,
- AMF0_NULL,
- AMF0_UNDEFINED,
- AMF0_REFERENCE,
- AMF0_ECMA_ARRAY,
- AMF0_OBJECT_END,
- AMF0_STRICT_ARRAY,
- AMF0_DATE,
- AMF0_LONG_STRING,
- AMF0_UNSUPPORTED,
- AMF0_RECORD_SET,
- AMF0_XML_OBJECT,
- AMF0_TYPED_OBJECT,
- AMF0_SWITCH_AMF3,
-};
-
-enum {
- AMF3_UNDEFINED,
- AMF3_NULL,
- AMF3_FALSE,
- AMF3_TRUE,
- AMF3_INTEGER,
- AMF3_NUMBER,
- AMF3_STRING,
- AMF3_LEGACY_XML,
- AMF3_DATE,
- AMF3_ARRAY,
- AMF3_OBJECT,
- AMF3_XML,
- AMF3_BYTE_ARRAY,
-};
-
-struct Decoder {
- std::string buf;
- size_t pos;
- int version;
-};
-
-struct Encoder {
- std::string buf;
-};
-
-class AMFValue;
-
-typedef std::map<std::string, AMFValue> amf_object_t;
-
-class AMFValue {
-public:
- AMFValue(AMFType type = AMF_NULL);
- AMFValue(const std::string &s);
- AMFValue(double n);
- AMFValue(int i);
- AMFValue(bool b);
- AMFValue(const amf_object_t &object);
- AMFValue(const AMFValue &from);
- ~AMFValue();
-
- AMFType type() const { return m_type; }
-
- std::string as_string() const
- {
- assert(m_type == AMF_STRING);
- return *m_value.string;
- }
- double as_number() const
- {
- assert(m_type == AMF_NUMBER);
- return m_value.number;
- }
- double as_integer() const
- {
- assert(m_type == AMF_INTEGER);
- return m_value.integer;
- }
- bool as_boolean() const
- {
- assert(m_type == AMF_BOOLEAN);
- return m_value.boolean;
- }
- amf_object_t as_object() const
- {
- assert(m_type == AMF_OBJECT);
- return *m_value.object;
- }
-
- AMFValue get(const std::string &s) const
- {
- assert(m_type == AMF_OBJECT);
- amf_object_t::const_iterator i = m_value.object->find(s);
- if (i == m_value.object->end())
- return AMFValue();
- return i->second;
- }
-
- void set(const std::string &s, const AMFValue &val)
- {
- assert(m_type == AMF_OBJECT);
- m_value.object->insert(std::make_pair(s, val));
- }
-
- void operator = (const AMFValue &from);
-
-private:
- AMFType m_type;
- union {
- std::string *string;
- double number;
- int integer;
- bool boolean;
- amf_object_t *object;
- } m_value;
-
- void destroy();
-};
-
-void amf_write(Encoder *enc, const std::string &s);
-void amf_write(Encoder *enc, double n);
-void amf_write(Encoder *enc, bool b);
-void amf_write_key(Encoder *enc, const std::string &s);
-void amf_write(Encoder *enc, const amf_object_t &object);
-void amf_write_ecma(Encoder *enc, const amf_object_t &object);
-void amf_write_null(Encoder *enc);
-void amf_write(Encoder *enc, const AMFValue &value);
-
-std::string amf_load_string(Decoder *dec);
-double amf_load_number(Decoder *dec);
-bool amf_load_boolean(Decoder *dec);
-std::string amf_load_key(Decoder *dec);
-amf_object_t amf_load_object(Decoder *dec);
-amf_object_t amf_load_ecma(Decoder *dec);
-AMFValue amf_load(Decoder *dec);
-
-#endif
diff --git a/main.c b/main.c
index 2abe3d0..47c33d9 100644
--- a/main.c
+++ b/main.c
@@ -58,6 +58,25 @@ void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) {
RingBuffer_write_word32be(rb, size + 11);
}
+void on_srt_start(void *ctx) {
+ MainCtx *main_ctx = ctx;
+ main_ctx->ringbuf = malloc(sizeof(RingBuffer));
+ RingBuffer_init(main_ctx->ringbuf, 4096);
+ RingBuffer *rb = main_ctx->ringbuf;
+ TranscodeTalker_new_stream(&main_ctx->transcode_talker, rb);
+}
+
+void on_srt_stop(void *ctx) {
+ MainCtx *main_ctx = ctx;
+ RingBuffer_end(main_ctx->ringbuf);
+}
+
+void on_srt_data(void *ctx, char *buf, size_t size) {
+ MainCtx *main_ctx = ctx;
+ RingBuffer *rb = main_ctx->ringbuf;
+ RingBuffer_write(rb, (const uint8_t*)buf, size);
+}
+
int main(int argc, char **argv) {
ezlive_config = malloc(sizeof(EZLiveConfig));
EZLiveConfig_init(ezlive_config);
@@ -76,11 +95,10 @@ int main(int argc, char **argv) {
}
srand((unsigned) time(NULL));
MainCtx main_ctx;
- RtmpCallbacks rtmp_cbs = {
- .on_audio = &on_rtmp_audio,
- .on_video = &on_rtmp_video,
- .on_start = &on_rtmp_start,
- .on_stop = &on_rtmp_stop,
+ SrtCallbacks srt_cbs = {
+ .on_data = &on_srt_data,
+ .on_stop = &on_srt_stop,
+ .on_start = &on_srt_start,
};
TranscodeTalker_init(&main_ctx.transcode_talker);
@@ -92,6 +110,7 @@ int main(int argc, char **argv) {
pthread_t s3worker_thread;
pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL);
- start_rtmpserver(rtmp_cbs, &main_ctx);
+ // start_rtmpserver(rtmp_cbs, &main_ctx);
+ start_srt_server(srt_cbs, &main_ctx);
return 0;
} \ No newline at end of file
diff --git a/rtmp.h b/rtmp.h
deleted file mode 100644
index bdc97cc..0000000
--- a/rtmp.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * @file rtmp.h
- * @author Dean Zou (zoudingyuan@junjietech.com)
- * @brief
- * @version 1.0
- * @date 2019-06-27
- *
- * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
- *
- */
-
-#ifndef RTMP_H_
-#define RTMP_H_
-
-#include <stdint.h>
-
-#define PORT 1935
-
-#define DEFAULT_CHUNK_LEN 128
-
-#define PACKED __attribute__((packed))
-
-#define HANDSHAKE_PLAINTEXT 0x03
-
-#define RANDOM_LEN (1536 - 8)
-
-#define MSG_SET_CHUNK 0x01
-#define MSG_BYTES_READ 0x03
-#define MSG_USER_CONTROL 0x04
-#define MSG_RESPONSE 0x05
-#define MSG_REQUEST 0x06
-#define MSG_AUDIO 0x08
-#define MSG_VIDEO 0x09
-#define MSG_INVOKE3 0x11 /* AMF3 */
-#define MSG_NOTIFY 0x12
-#define MSG_OBJECT 0x13
-#define MSG_INVOKE 0x14 /* AMF0 */
-#define MSG_FLASH_VIDEO 0x16
-
-#define CONTROL_CLEAR_STREAM 0x00
-#define CONTROL_CLEAR_BUFFER 0x01
-#define CONTROL_STREAM_DRY 0x02
-#define CONTROL_BUFFER_TIME 0x03
-#define CONTROL_RESET_STREAM 0x04
-#define CONTROL_PING 0x06
-#define CONTROL_REQUEST_VERIFY 0x1a
-#define CONTROL_RESPOND_VERIFY 0x1b
-#define CONTROL_BUFFER_EMPTY 0x1f
-#define CONTROL_BUFFER_READY 0x20
-
-#define CONTROL_ID 0
-#define STREAM_ID 1337
-
-#define CHAN_CONTROL 2
-#define CHAN_RESULT 3
-#define CHAN_STREAM 4
-
-#define FLV_KEY_FRAME 0x01
-#define FLV_INTER_FRAME 0x02
-
-struct Handshake {
- uint8_t flags[8];
- uint8_t random[RANDOM_LEN];
-} PACKED;
-
-struct RTMP_Header {
- uint8_t flags;
- char timestamp[3];
- char msg_len[3];
- uint8_t msg_type;
- char endpoint[4]; /* Note, this is little-endian while others are BE */
-} PACKED;
-
-#endif
diff --git a/rtmpserver.cpp b/rtmpserver.cpp
deleted file mode 100644
index cb0766e..0000000
--- a/rtmpserver.cpp
+++ /dev/null
@@ -1,862 +0,0 @@
-#include "amf.h"
-#include "rtmputils.h"
-#include "rtmp.h"
-#include <vector>
-#include <stdexcept>
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <errno.h>
-#include <assert.h>
-#include <stdarg.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <sys/poll.h>
-#include <sys/time.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <arpa/inet.h>
-#include <time.h>
-
-#include "rtmpserver.h"
-
-extern "C" {
-#include "ezlive_config.h"
-}
-
-#define APP_NAME "live"
-
-RtmpCallbacks g_rtmp_server_cbs;
-void *g_rtmp_server_ctx;
-
-struct RTMP_Message {
- uint8_t type;
- size_t len;
- unsigned long timestamp;
- uint32_t endpoint;
- std::string buf;
-};
-
-struct Client {
- int fd;
- bool publisher;
- bool playing; /* Wants to receive the stream? */
- bool ready; /* Wants to receive and seen a keyframe */
- RTMP_Message messages[64];
- std::string buf;
- std::string send_queue;
- std::string path;
- size_t chunk_len;
- uint32_t written_seq;
- uint32_t read_seq;
- int64_t livets;
-};
-
-namespace {
-
-amf_object_t metadata;
-int listen_fd;
-std::vector<pollfd> poll_table;
-std::vector<Client *> clients;
-
-int set_nonblock(int fd, bool enabled)
-{
- int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK;
- if (enabled) {
- flags |= O_NONBLOCK;
- }
- return fcntl(fd, F_SETFL, flags);
-}
-
-size_t recv_all(int fd, void *buf, size_t len)
-{
- size_t pos = 0;
- while (pos < len) {
- ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0);
- if (bytes < 0) {
- if (errno == EAGAIN || errno == EINTR)
- continue;
- throw std::runtime_error(strf("unable to recv: %s", strerror(errno)));
- }
- if (bytes == 0)
- break;
- pos += bytes;
- }
- return pos;
-}
-
-size_t send_all(int fd, const void *buf, size_t len)
-{
- size_t pos = 0;
- while (pos < len) {
- ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0);
- if (written < 0) {
- if (errno == EAGAIN || errno == EINTR)
- continue;
- throw std::runtime_error(strf("unable to send: %s", strerror(errno)));
- }
- if (written == 0)
- break;
- pos += written;
- }
- return pos;
-}
-
-bool is_safe(uint8_t b)
-{
- return b >= ' ' && b < 128;
-}
-
-void hexdump(const void *buf, size_t len)
-{
- const uint8_t *data = (const uint8_t *) buf;
- for (size_t i = 0; i < len; i += 16) {
- for (int j = 0; j < 16; ++j) {
- if (i + j < len)
- debug("%.2x ", data[i + j]);
- else
- debug(" ");
- }
- for (int j = 0; j < 16; ++j) {
- if (i + j < len) {
- putc(is_safe(data[i + j]) ? data[i + j] : '.',
- stdout);
- } else {
- putc(' ', stdout);
- }
- }
- putc('\n', stdout);
- }
-}
-
-void try_to_send(void *data)
-{
- Client *client = (Client *)data;
- size_t len = client->send_queue.size();
- if (len > 4096)
- len = 4096;
-
- ssize_t written = send(client->fd, client->send_queue.data(), len, 0);
- if (written < 0) {
- if (errno == EAGAIN || errno == EINTR)
- return;
- throw std::runtime_error(strf("unable to write to a client: %s",
- strerror(errno)));
- }
-
- client->send_queue.erase(0, written);
-}
-
-void rtmp_send(Client *client, uint8_t type, uint32_t endpoint,
- const std::string &buf, unsigned long timestamp = 0,
- int channel_num = CHAN_CONTROL)
-{
- if (endpoint == STREAM_ID) {
- /*
- * For some unknown reason, stream-related msgs must be sent
- * on a specific channel.
- */
- channel_num = CHAN_STREAM;
- }
-
- RTMP_Header header;
- header.flags = (channel_num & 0x3f) | (0 << 6);
- header.msg_type = type;
- set_be24(header.timestamp, timestamp);
- set_be24(header.msg_len, buf.size());
- set_le32(header.endpoint, endpoint);
-
- client->send_queue.append((char *) &header, sizeof header);
- client->written_seq += sizeof header;
-
- size_t pos = 0;
- while (pos < buf.size()) {
- if (pos) {
- uint8_t flags = (channel_num & 0x3f) | (3 << 6);
- client->send_queue += char(flags);
-
- client->written_seq += 1;
- }
-
- size_t chunk = buf.size() - pos;
- if (chunk > client->chunk_len)
- chunk = client->chunk_len;
- client->send_queue.append(buf, pos, chunk);
-
- client->written_seq += chunk;
- pos += chunk;
- }
-
- try_to_send(client);
-}
-
-void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(),
- const AMFValue &status = AMFValue())
-{
- if (txid <= 0.0)
- return;
- Encoder invoke;
- amf_write(&invoke, std::string("_result"));
- amf_write(&invoke, txid);
- amf_write(&invoke, reply);
- amf_write(&invoke, status);
- rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT);
-}
-
-void handle_connect(Client *client, double txid, Decoder *dec)
-{
- amf_object_t params = amf_load_object(dec);
- std::string app = get(params, std::string("app")).as_string();
- std::string ver = "(unknown)";
-
- AMFValue flashver = get(params, std::string("flashVer"));
- if (flashver.type() == AMF_STRING) {
- ver = flashver.as_string();
- }
-
- /*
- if (app != APP_NAME) {
- throw std::runtime_error("Unsupported application: " + app);
- }
- */
-
- printf("connect: %s (version %s)\n", app.c_str(), ver.c_str());
-
- amf_object_t version;
- version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484")));
- version.insert(std::make_pair("capabilities", 255.0));
- version.insert(std::make_pair("mode", 1.0));
-
- amf_object_t status;
- status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success")));
- /* report support for AMF3 */
- // status.insert(std::make_pair("objectEncoding", 3.0));
-
- send_reply(client, txid, version, status);
-
- uint32_t chunk_len = htonl(1024);
- std::string set_chunk((char *) &chunk_len, 4);
- rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk);
-/*
- client->chunk_len = 1024;
-*/
-}
-
-int publishernum = 0;
-
-void handle_fcpublish(Client *client, double txid, Decoder *dec)
-{
- if (publishernum > 0) {
- throw std::runtime_error{"only one publisher."};
- }
- publishernum = 1;
- g_rtmp_server_cbs.on_start(g_rtmp_server_ctx);
- client->publisher = true;
- printf( "publisher connected.");
-
- amf_load(dec); /* NULL */
-
- std::string path = amf_load_string(dec);
- debug("fcpublish %s\n", path.c_str());
- printf("fcpublish %s\n", path.c_str());
- if (strlen(ezlive_config->key) > 0) {
- if (strcmp(ezlive_config->key, path.c_str()) != 0) {
- publishernum = 0;
- printf( "wrong live key.\n");
- throw std::runtime_error{"wrong live key."};
- }
- }
-
- client->path = path;
-
- amf_object_t status;
- status.insert(std::make_pair("code", std::string("NetStream.Publish.Start")));
- status.insert(std::make_pair("description", path));
-
- Encoder invoke;
- amf_write(&invoke, std::string("onFCPublish"));
- amf_write(&invoke, 0.0);
- amf_write_null(&invoke);
- amf_write(&invoke, status);
- rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf);
-
- send_reply(client, txid);
-}
-
-void handle_createstream(Client *client, double txid, Decoder *dec)
-{
- send_reply(client, txid, AMFValue(), double(STREAM_ID));
-}
-
-void handle_publish(Client *client, double txid, Decoder *dec)
-{
- amf_load(dec); /* NULL */
-
- std::string path = amf_load_string(dec);
- debug("publish %s\n", path.c_str());
-
- client->path = path;
-
- amf_object_t status;
- status.insert(std::make_pair("level", std::string("status")));
- status.insert(std::make_pair("code", std::string("NetStream.Publish.Start")));
- status.insert(std::make_pair("description", std::string("Stream is now published.")));
- status.insert(std::make_pair("details", path));
-
- Encoder invoke;
- amf_write(&invoke, std::string("onStatus"));
- amf_write(&invoke, 0.0);
- amf_write_null(&invoke);
- amf_write(&invoke, status);
- rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
-
- send_reply(client, txid);
-}
-
-void start_playback(Client *client)
-{
- throw std::runtime_error{"playback not supported."};
-}
-
-void handle_play(Client *client, double txid, Decoder *dec)
-{
- amf_load(dec); /* NULL */
-
- std::string path = amf_load_string(dec);
-
- debug("play %s\n", path.c_str());
- client->path = path;
-
- start_playback(client);
-
- send_reply(client, txid);
-}
-
-void handle_play2(Client *client, double txid, Decoder *dec)
-{
- amf_load(dec); /* NULL */
-
- amf_object_t params = amf_load_object(dec);
- std::string path = get(params, std::string("streamName")).as_string();
-
- debug("play %s\n", path.c_str());
- client->path = path;
- start_playback(client);
-
- send_reply(client, txid);
-}
-
-void handle_pause(Client *client, double txid, Decoder *dec)
-{
- amf_load(dec); /* NULL */
-
- bool paused = amf_load_boolean(dec);
-
- if (paused) {
- debug("pausing\n");
-
- amf_object_t status;
- status.insert(std::make_pair("level", std::string("status")));
- status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify")));
- status.insert(std::make_pair("description", std::string("Pausing.")));
-
- Encoder invoke;
- amf_write(&invoke, std::string("onStatus"));
- amf_write(&invoke, 0.0);
- amf_write_null(&invoke);
- amf_write(&invoke, status);
- rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
- client->playing = false;
- } else {
- start_playback(client);
- }
-
- send_reply(client, txid);
-}
-
-void handle_setdataframe(Client *client, Decoder *dec)
-{
- if(client->publisher == false) {
- printf("not a publisher");
- throw std::runtime_error("not a publisher");
- }
-
- std::string type = amf_load_string(dec);
- if (type != "onMetaData") {
- throw std::runtime_error("can only set metadata");
- }
-
- metadata = amf_load_ecma(dec);
-
- Encoder notify;
- amf_write(&notify, std::string("onMetaData"));
- amf_write_ecma(&notify, metadata);
-
- FOR_EACH(std::vector<Client *>, i, clients) {
- Client *receiver = *i;
- if (receiver != NULL && receiver->playing && receiver->path == client->path) {
- rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf);
- }
- }
-}
-
-void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec)
-{
- std::string method = amf_load_string(dec);
- double txid = amf_load_number(dec);
-
- debug( "invoked %s txid %f\n", method.c_str(), txid);
-
- if (msg->endpoint == CONTROL_ID) {
- if (method == "connect") {
- handle_connect(client, txid, dec);
- } else if (method == "FCPublish") {
- handle_fcpublish(client, txid, dec);
- } else if (method == "createStream") {
- handle_createstream(client, txid, dec);
- }
-
- } else if (msg->endpoint == STREAM_ID) {
- if (method == "publish") {
- handle_publish(client, txid, dec);
- } else if (method == "play") {
- handle_play(client, txid, dec);
- } else if (method == "play2") {
- handle_play2(client, txid, dec);
- } else if (method == "pause") {
- handle_pause(client, txid, dec);
- }
- }
-}
-
-void handle_message(Client *client, RTMP_Message *msg)
-{
-
- debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len,
- msg->timestamp);
-
- size_t pos = 0;
-
- switch (msg->type) {
- case MSG_BYTES_READ:
- if (pos + 4 > msg->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- client->read_seq = load_be32(&msg->buf[pos]);
- debug("%d in queue\n",
- int(client->written_seq - client->read_seq));
- break;
-
- case MSG_SET_CHUNK:
- if (pos + 4 > msg->buf.size()) {
- throw std::runtime_error("Not enough data");
- }
- client->chunk_len = load_be32(&msg->buf[pos]);
- debug("chunk size set to %zu\n", client->chunk_len);
- break;
-
- case MSG_INVOKE: {
- debug("handling message invoke 0 \n");
- Decoder dec;
- dec.version = 0;
- dec.buf = msg->buf;
- dec.pos = 0;
- handle_invoke(client, msg, &dec);
- }
- break;
-
- case MSG_INVOKE3: {
- debug("handling message invoke 3 \n");
- Decoder dec;
- dec.version = 0;
- dec.buf = msg->buf;
- dec.pos = 1;
- handle_invoke(client, msg, &dec);
- }
- break;
-
- case MSG_NOTIFY: {
- Decoder dec;
- dec.version = 0;
- dec.buf = msg->buf;
- dec.pos = 0;
- std::string type = amf_load_string(&dec);
- debug("notify %s\n", type.c_str());
- if (msg->endpoint == STREAM_ID) {
- if (type == "@setDataFrame") {
- handle_setdataframe(client, &dec);
- }
- }
- }
- break;
-
- case MSG_AUDIO:
- if(client->publisher == false) {
- throw std::runtime_error("not a publisher");
- }
- g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size());
- // FOR_EACH(std::vector<Client *>, i, clients) {
- // Client *receiver = *i;
- // if (receiver != NULL && receiver->ready) {
- // rtmp_send(receiver, MSG_AUDIO, STREAM_ID,
- // msg->buf, msg->timestamp);
- // }
- // }
- break;
- case MSG_VIDEO: {
- if(client->publisher == false){
- throw std::runtime_error("not a publisher");
- }
- // uint8_t flags = msg->buf[0];
- g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size());
- // FOR_EACH(std::vector<Client *>, i, clients) {
- // Client *receiver = *i;
- // if (receiver != NULL && receiver->playing &&
- // client->path == receiver->path) {
- // if (flags >> 4 == FLV_KEY_FRAME &&
- // !receiver->ready) {
- // std::string control;
- // uint16_t type = htons(CONTROL_CLEAR_STREAM);
- // control.append((char *) &type, 2);
- // uint32_t stream = htonl(STREAM_ID);
- // control.append((char *) &stream, 4);
- // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control);
- // receiver->ready = true;
- // }
- // if (receiver->ready) {
- // static int flag = 0;
- // if(flag < 10) {
- // flag ++;
- // }
- // rtmp_send(receiver, MSG_VIDEO,
- // STREAM_ID, msg->buf,
- // msg->timestamp);
- // }
- // }
- // }
- }
- break;
-
- case MSG_FLASH_VIDEO:
- //printf( "streaming FLV not supported");
- throw std::runtime_error("streaming FLV not supported");
- break;
-
- default:
- debug("unhandled message: %02x\n", msg->type);
- hexdump(msg->buf.data(), msg->buf.size());
- break;
- }
-}
-
-/* TODO: Make this asynchronous */
-void do_handshake(Client *client)
-{
- Handshake serversig;
- Handshake clientsig;
-
- uint8_t c;
- if (recv_all(client->fd, &c, 1) < 1)
- return;
- if (c != HANDSHAKE_PLAINTEXT) {
- //printf( "only plaintext handshake supported");
- throw std::runtime_error("only plaintext handshake supported");
- }
-
- if (send_all(client->fd, &c, 1) < 1)
- return;
-
- memset(&serversig, 0, sizeof serversig);
- serversig.flags[0] = 0x03;
- for (int i = 0; i < RANDOM_LEN; ++i) {
- serversig.random[i] = rand();
- }
-
- if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig)
- return;
-
- /* Echo client's signature back */
- if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
- return;
- if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
- return;
-
- if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
- return;
- if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) {
- //printf( "invalid handshake");
- throw std::runtime_error("invalid handshake");
- }
-
- client->read_seq = 1 + sizeof serversig * 2;
- client->written_seq = 1 + sizeof serversig * 2;
-}
-
-void recv_from_client(void *data)
-{
- Client *client = (Client *)data;
-
- std::string chunk(4096, 0);
- ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0);
- if (got == 0) {
- //printf( "end of life from a client");
- throw std::runtime_error("EOF from a client");
- } else if (got < 0) {
- if (errno == EAGAIN || errno == EINTR)
- return;
- //printf( "unable to read from a client: %s",strerror(errno));
- throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno)));
- }
- client->buf.append(chunk, 0, got);
-
- //printf( "got data and length is %d",client->buf.size());
-
- while (!client->buf.empty()) {
- uint8_t flags = client->buf[0];
-
- static const size_t HEADER_LENGTH[] = {12, 8, 4, 1};
- size_t header_len = HEADER_LENGTH[flags >> 6];
-
- if (client->buf.size() < header_len) {
- /* need more data */
- break;
- }
-
- RTMP_Header header;
- memcpy(&header, client->buf.data(), header_len);
-
- RTMP_Message *msg = &client->messages[flags & 0x3f];
-
- if (header_len >= 8) {
- msg->len = load_be24(header.msg_len);
- if (msg->len < msg->buf.size()) {
- throw std::runtime_error("invalid msg length");
- }
- msg->type = header.msg_type;
- }
- if (header_len >= 12) {
- msg->endpoint = load_le32(header.endpoint);
- }
-
- if (msg->len == 0) {
- throw std::runtime_error("message without a header");
- }
- size_t chunk = msg->len - msg->buf.size();
- if (chunk > client->chunk_len)
- chunk = client->chunk_len;
-
- if (client->buf.size() < header_len + chunk) {
- /* need more data */
- break;
- }
-
- if (header_len >= 4) {
- unsigned long ts = load_be24(header.timestamp);
- if (ts == 0xffffff) {
- throw std::runtime_error("ext timestamp not supported");
- }
- if (header_len < 12) {
- ts += msg->timestamp;
- }
- msg->timestamp = ts;
- }
-
- msg->buf.append(client->buf, header_len, chunk);
- client->buf.erase(0, header_len + chunk);
-
- if (msg->buf.size() == msg->len) {
- handle_message(client, msg);
- msg->buf.clear();
- }
- }
-}
-
-Client *new_client()
-{
- sockaddr_in sin;
- socklen_t addrlen = sizeof sin;
- int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen);
- if (fd < 0) {
- printf("Unable to accept a client: %s\n", strerror(errno));
- return NULL;
- }
-
- Client *client = new Client;
- client->publisher = false;
- client->playing = false;
- client->ready = false;
- client->fd = fd;
- client->written_seq = 0;
- client->read_seq = 0;
- client->chunk_len = DEFAULT_CHUNK_LEN;
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- client->livets = ts.tv_sec;
- for (int i = 0; i < 64; ++i) {
- client->messages[i].timestamp = 0;
- client->messages[i].len = 0;
- }
-
- try {
- do_handshake(client);
- } catch (const std::runtime_error &e) {
- printf("handshake failed: %s\n", e.what());
- close(fd);
- delete client;
- return NULL;
- }
-
- set_nonblock(fd, true);
-
- pollfd entry;
- entry.events = POLLIN;
- entry.revents = 0;
- entry.fd = fd;
- poll_table.push_back(entry);
- clients.push_back(client);
-
- return client;
-}
-
-void close_client(Client *client, size_t i)
-{
- clients.erase(clients.begin() + i);
- poll_table.erase(poll_table.begin() + i);
- close(client->fd);
-
- if (client->publisher == true) {
- printf("publisher disconnected.\n");
- client->publisher = false;
- publishernum = 0;
- g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx);
- FOR_EACH(std::vector<Client *>, i, clients) {
- Client *client = *i;
- if (client != NULL) {
- client->ready = false;
- }
- }
- }
- delete client;
-}
-
-void do_poll(void)
-{
- for (size_t i = 0; i < poll_table.size(); ++i) {
- Client *client = clients[i];
- if (client != NULL) {
- if (!client->send_queue.empty()) {
- //debug("waiting for pollout\n");
- poll_table[i].events = POLLIN | POLLOUT;
- } else {
- poll_table[i].events = POLLIN;
- }
- }
- }
-
- if (poll(&poll_table[0], poll_table.size(), -1) < 0) {
- if (errno == EAGAIN || errno == EINTR)
- return;
- throw std::runtime_error(strf("poll() failed: %s",
- strerror(errno)));
- }
-
- for (size_t i = 0; i < poll_table.size(); ++i) {
- Client *client = clients[i];
- if (poll_table[i].revents & POLLOUT) {
- try {
- try_to_send(client);
- } catch (const std::runtime_error &e) {
- printf("client error: %s\n", e.what());
- close_client(client, i);
- --i;
- continue;
- }
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- client->livets = ts.tv_sec;
- }
- if (poll_table[i].revents & POLLIN) {
- if (client == NULL) {
- new_client();
- } else {
- try {
- recv_from_client(client);
- } catch (const std::runtime_error &e) {
- printf("client error: %s\n", e.what());
- close_client(client, i);
- --i;
- continue;
- }
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- client->livets = ts.tv_sec;
- }
- }
- if ((poll_table[i].revents & POLLHUP)
- || (poll_table[i].revents & POLLERR)) {
- fprintf(stderr, "client error.\n");
- close_client(client, i);
- --i;
- continue;
- }
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- if (client != NULL && ts.tv_sec - client->livets > 60) {
- fprintf(stderr, "client timeout.\n");
- close_client(client, i);
- --i;
- }
- }
-}
-
-}
-
-int g_rtmp_server_quit = 0;
-
-void start_rtmpserver(RtmpCallbacks cbs, void *ctx) {
- g_rtmp_server_cbs = cbs;
- g_rtmp_server_ctx = ctx;
- try {
- listen_fd = socket(AF_INET, SOCK_STREAM, 0);
- if (listen_fd < 0)
- return;
- int opt = 1;
- if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
- perror("setsockopt SO_REUSEADDR");
- exit(EXIT_FAILURE);
- }
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(ezlive_config->listening_port);
- struct in_addr addr;
- if (inet_pton(AF_INET, ezlive_config->listening_addr, &addr) <= 0) {
- fprintf(stderr, "Invalid IP address\n");
- exit(-1);
- }
- sin.sin_addr.s_addr = addr.s_addr;
- if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) {
- throw std::runtime_error(strf("Unable to listen: %s",strerror(errno)));
- return;
- }
-
- listen(listen_fd, 10);
-
- pollfd entry;
- entry.events = POLLIN;
- entry.revents = 0;
- entry.fd = listen_fd;
- poll_table.push_back(entry);
- clients.push_back(NULL);
-
- for (;;) {
- if (g_rtmp_server_quit) {
- return;
- }
- do_poll();
- }
- return;
- } catch (const std::runtime_error &e) {
- fprintf(stderr, "ERROR: %s\n", e.what());
- return;
- }
-}
diff --git a/rtmputils.cpp b/rtmputils.cpp
deleted file mode 100644
index 59088f1..0000000
--- a/rtmputils.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-#include "rtmputils.h"
-#include <string>
-#include <string.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <stdarg.h>
-#include <arpa/inet.h>
-
-/*
- * Used to do unaligned loads on archs that don't support them. GCC can mostly
- * optimize these away.
- */
-uint32_t load_be32(const void *p)
-{
- uint32_t val;
- memcpy(&val, p, sizeof val);
- return ntohl(val);
-}
-
-uint16_t load_be16(const void *p)
-{
- uint16_t val;
- memcpy(&val, p, sizeof val);
- return ntohs(val);
-}
-
-uint32_t load_le32(const void *p)
-{
- const uint8_t *data = (const uint8_t *) p;
- return data[0] | ((uint32_t) data[1] << 8) |
- ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24);
-}
-
-uint32_t load_be24(const void *p)
-{
- const uint8_t *data = (const uint8_t *) p;
- return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16);
-}
-
-void set_be24(void *p, uint32_t val)
-{
- uint8_t *data = (uint8_t *) p;
- data[0] = val >> 16;
- data[1] = val >> 8;
- data[2] = val;
-}
-
-void set_le32(void *p, uint32_t val)
-{
- uint8_t *data = (uint8_t *) p;
- data[0] = val;
- data[1] = val >> 8;
- data[2] = val >> 16;
- data[3] = val >> 24;
-}
-
-const std::string strf(const char *fmt, ...)
-{
- va_list vl;
- va_start(vl, fmt);
- char *buf = NULL;
- vasprintf(&buf, fmt, vl);
- va_end(vl);
- std::string s(buf);
- free(buf);
- return s;
-}
diff --git a/rtmputils.h b/rtmputils.h
deleted file mode 100644
index f873232..0000000
--- a/rtmputils.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * @file utils.h
- * @author Dean Zou (zoudingyuan@junjietech.com)
- * @brief
- * @version 1.0
- * @date 2019-06-27
- *
- * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
- *
- */
-#ifndef RTMPUTILS_H_
-#define RTMPUTILS_H_
-
-#include <map>
-#include <string>
-#include <stdio.h>
-#include <stdint.h>
-
-#define FOR_EACH(type, i, where) \
- for (typename type::iterator i = (where).begin(); i != (where).end(); ++i)
-
-#define FOR_EACH_CONST(type, i, where) \
- for (typename type::const_iterator i = (where).begin(); \
- i != (where).end(); ++i)
-
-// #define debug(fmt...) fprintf(stderr, fmt)
-
-#define debug(fmt...)
-
-template<class Key, class Value>
-Value get(const std::map<Key, Value> &map, const Key &k,
- const Value &def = Value())
-{
- typename std::map<Key, Value>::const_iterator i = map.find(k);
- if (i == map.end())
- return def;
- return i->second;
-}
-
-uint32_t load_be32(const void *p);
-uint16_t load_be16(const void *p);
-uint32_t load_be24(const void *p);
-uint32_t load_le32(const void *p);
-void set_be24(void *p, uint32_t val);
-void set_le32(void *p, uint32_t val);
-
-const std::string strf(const char *fmt, ...);
-
-#endif
diff --git a/srtserver.c b/srtserver.c
new file mode 100644
index 0000000..97ce272
--- /dev/null
+++ b/srtserver.c
@@ -0,0 +1,127 @@
+#include "srtserver.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <stdarg.h>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <sys/poll.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <arpa/inet.h>
+#include <time.h>
+
+#include <srt/srt.h>
+
+#include "ezlive_config.h"
+
+#define BUFFER_SIZE 1500
+
+int handshake_callback(void* opaq, SRTSOCKET ns, int hs_version, const struct sockaddr* peeraddr, const char* streamid) {
+ char addr_str[INET_ADDRSTRLEN];
+ struct sockaddr_in* sin = (struct sockaddr_in*)peeraddr;
+ inet_ntop(AF_INET, &(sin->sin_addr), addr_str, INET_ADDRSTRLEN);
+
+ printf("[Callback] Incoming connection from: %s:%d\n", addr_str, ntohs(sin->sin_port));
+ printf("[Callback] Client Stream ID: %s\n", streamid);
+
+ if (streamid == NULL || strlen(streamid) == 0) {
+ printf("[Callback] Rejected: No Stream ID provided.\n");
+ return -1;
+ }
+
+ if (strlen(ezlive_config->key) == 0) {
+ printf("[Callback] No key. Skip auth.\n");
+ return 0;
+ }
+ if (strcmp(streamid, ezlive_config->key) != 0) {
+ printf("[Callback] Rejected: Invalid Key. Expected '%s', got '%s'\n", ezlive_config->key, streamid);
+ return -1;
+ }
+
+ printf("[Callback] Key validated. Connection Accepted.\n");
+ return 0;
+}
+
+void start_srt_server(SrtCallbacks srtcb, void *ctx) {
+ if (srt_startup() != 0) {
+ fprintf(stderr, "SRT startup failed.\n");
+ return;
+ }
+
+ SRTSOCKET bind_sock = srt_create_socket();
+ if (bind_sock == SRT_INVALID_SOCK) {
+ fprintf(stderr, "Error creating socket: %s\n", srt_getlasterror_str());
+ return;
+ }
+
+ int yes = 1;
+ srt_setsockopt(bind_sock, 0, SRTO_RCVSYN, &yes, sizeof(yes));
+
+ struct sockaddr_in sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(ezlive_config->listening_port);
+ if (inet_pton(AF_INET, ezlive_config->listening_addr, &sa.sin_addr.s_addr) <= 0) {
+ fprintf(stderr, "Invalid IP address\n");
+ exit(-1);
+ }
+
+ if (srt_bind(bind_sock, (struct sockaddr*)&sa, sizeof(sa)) == SRT_ERROR) {
+ fprintf(stderr, "Bind error: %s\n", srt_getlasterror_str());
+ srt_close(bind_sock);
+ srt_cleanup();
+ return;
+ }
+ srt_listen_callback(bind_sock, handshake_callback, NULL);
+ printf("SRT Server listening on port %d...\n", ezlive_config->listening_port);
+ if (srt_listen(bind_sock, 5) == SRT_ERROR) {
+ fprintf(stderr, "Listen error: %s\n", srt_getlasterror_str());
+ return;
+ }
+
+ while (1) {
+ struct sockaddr_storage client_sa;
+ int sa_len = sizeof(client_sa);
+
+ printf("Waiting for client to connect...\n");
+
+ SRTSOCKET client_sock = srt_accept(bind_sock, (struct sockaddr*)&client_sa, &sa_len);
+ if (client_sock == SRT_INVALID_SOCK) {
+ fprintf(stderr, "Accept error: %s\n", srt_getlasterror_str());
+ continue;
+ }
+
+ printf("Client connected! Starting to receive data.\n");
+ srtcb.on_start(ctx);
+
+ char buffer[BUFFER_SIZE];
+
+ while (1) {
+ int n = srt_recvmsg(client_sock, buffer, sizeof(buffer));
+
+ if (n == SRT_ERROR) {
+ fprintf(stderr, "Connection lost or error: %s\n", srt_getlasterror_str());
+ srtcb.on_stop(ctx);
+ break;
+ }
+
+ if (n == 0) {
+ printf("Client closed connection.\n");
+ srtcb.on_stop(ctx);
+ break;
+ }
+ srtcb.on_data(ctx, buffer, n);
+ }
+ printf("Closing client socket.\n");
+ srt_close(client_sock);
+ }
+
+ srt_close(bind_sock);
+ srt_cleanup();
+ return;
+} \ No newline at end of file
diff --git a/rtmpserver.h b/srtserver.h
index c9383b4..b228884 100644
--- a/rtmpserver.h
+++ b/srtserver.h
@@ -15,7 +15,14 @@ typedef struct {
void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size);
} RtmpCallbacks;
+typedef struct {
+ void (*on_start)(void *ctx);
+ void (*on_stop)(void *ctx);
+ void (*on_data)(void* ctx, char *buf, size_t size);
+} SrtCallbacks;
+
void start_rtmpserver(RtmpCallbacks cbs, void *ctx);
+void start_srt_server(SrtCallbacks cbs, void *ctx);
#ifdef __cplusplus
}
diff --git a/transcode_talker.c b/transcode_talker.c
index 3e2f05c..11fa53a 100644
--- a/transcode_talker.c
+++ b/transcode_talker.c
@@ -177,8 +177,8 @@ void* TranscodeTalker_main (void *vself) {
while (wait_for_new_stream(self)) {
AVFormatContext *in_fmt_ctx = avformat_alloc_context();
in_fmt_ctx->pb = create_avio_from_ringbuffer(self->stream, 4096);
- const AVInputFormat *flv_fmt = av_find_input_format("flv");
- if (avformat_open_input(&in_fmt_ctx, NULL, flv_fmt, NULL) < 0) {
+ const AVInputFormat *input_fmt = av_find_input_format("mpegts");
+ if (avformat_open_input(&in_fmt_ctx, NULL, input_fmt, NULL) < 0) {
fprintf(stderr, "Could not open input file\n");
continue;
}