From 038c58b0ac053dbfe8aea3faab73863000d5fdc9 Mon Sep 17 00:00:00 2001 From: Mistivia Date: Sat, 20 Dec 2025 05:09:26 +0800 Subject: use srt instead of rtmp --- .gitignore | 2 + Makefile | 2 +- README.md | 8 +- amf.cpp | 429 -------------------------- amf.h | 167 ----------- main.c | 31 +- rtmp.h | 74 ----- rtmpserver.cpp | 862 ----------------------------------------------------- rtmpserver.h | 25 -- rtmputils.cpp | 67 ----- rtmputils.h | 49 --- srtserver.c | 127 ++++++++ srtserver.h | 32 ++ transcode_talker.c | 4 +- 14 files changed, 192 insertions(+), 1687 deletions(-) delete mode 100644 amf.cpp delete mode 100644 amf.h delete mode 100644 rtmp.h delete mode 100644 rtmpserver.cpp delete mode 100644 rtmpserver.h delete mode 100644 rtmputils.cpp delete mode 100644 rtmputils.h create mode 100644 srtserver.c create mode 100644 srtserver.h diff --git a/.gitignore b/.gitignore index 8dd1137..13d6daf 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ config rootfs conf *.tar.gz +podman-start.sh +config.back diff --git a/Makefile b/Makefile index 95e41c2..c47dc4c 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC := gcc CXX := g++ CFLAGS := -g -Wall CXXFLAGS := -g -Wall -std=c++14 -LDFLAGS := -g \ +LDFLAGS := -g -lsrt \ -lavformat -lavutil -lavcodec \ -laws-cpp-sdk-core -laws-cpp-sdk-s3 diff --git a/README.md b/README.md index 5a0ac7a..36016aa 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ EZLive is a minimal self-hosted livestream solution built on top of S3-compatible object storage. -It runs a local RTMP server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream. +It runs a local SRT server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream. # Build @@ -24,7 +24,7 @@ Then create a config file `config`: ``` listening_addr=127.0.0.1 -listening_port=1935 +listening_port=61935 bucket=YOUR_BUCKET_NAME endpoint=https://your-s3.com s3_path=ezlive/ @@ -74,7 +74,7 @@ Start EZLive: ./ezlive ``` -Open OBS, streaming to `rtmp://127.0.0.1/live`, no streaming key needed. The streaming format must be H.264 + AAC. +Open OBS, streaming to `srt://127.0.0.1:61935`, with streaming key. The streaming format must be H.264 + AAC. Then use a HLS player to load `https://YOUR_BUCKET_NAME.your-s3.com/ezlive/stream.m3u8` to watch the stream. @@ -123,6 +123,4 @@ To start using, unzip the windows tarball, create a `config` file in the same di # Credits -The built-in RTMP server is modified from [pine](https://github.com/deboot/pine). - Thank [@uonr](https://github.com/uonr) for making nix flake. diff --git a/amf.cpp b/amf.cpp deleted file mode 100644 index 2794419..0000000 --- a/amf.cpp +++ /dev/null @@ -1,429 +0,0 @@ -#include "amf.h" -#include "rtmputils.h" - -#include -#include -#include - -namespace { - -uint8_t peek(const Decoder *dec) -{ - if (dec->pos >= dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - return uint8_t(dec->buf[dec->pos]); -} - -uint8_t get_byte(Decoder *dec) -{ - if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) { - debug("entering AMF3 mode\n"); - dec->pos++; - dec->version = 3; - } - - if (dec->pos >= dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - return uint8_t(dec->buf[dec->pos++]); -} - -} - -AMFValue::AMFValue(AMFType type) : - m_type(type) -{ - switch (m_type) { - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - m_value.object = new amf_object_t; - break; - case AMF_NUMBER: - case AMF_INTEGER: - case AMF_NULL: - case AMF_UNDEFINED: - break; - default: - assert(0); - } -} - -AMFValue::AMFValue(const std::string &s) : - m_type(AMF_STRING) -{ - m_value.string = new std::string(s); -} - -AMFValue::AMFValue(double n) : - m_type(AMF_NUMBER) -{ - m_value.number = n; -} - -AMFValue::AMFValue(int i) : - m_type(AMF_INTEGER) -{ - m_value.integer = i; -} - -AMFValue::AMFValue(bool b) : - m_type(AMF_BOOLEAN) -{ - m_value.boolean = b; -} - -AMFValue::AMFValue(const amf_object_t &object) : - m_type(AMF_OBJECT) -{ - m_value.object = new amf_object_t(object); -} - -AMFValue::AMFValue(const AMFValue &from) : - m_type(AMF_NULL) -{ - *this = from; -} - -AMFValue::~AMFValue() -{ - destroy(); -} - -void AMFValue::destroy() -{ - switch (m_type) { - case AMF_STRING: - delete m_value.string; - break; - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - delete m_value.object; - break; - case AMF_NULL: - case AMF_NUMBER: - case AMF_INTEGER: - case AMF_BOOLEAN: - case AMF_UNDEFINED: - break; - } -} - -void AMFValue::operator = (const AMFValue &from) -{ - destroy(); - m_type = from.m_type; - switch (m_type) { - case AMF_STRING: - m_value.string = new std::string(*from.m_value.string); - break; - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - m_value.object = new amf_object_t(*from.m_value.object); - break; - case AMF_NUMBER: - m_value.number = from.m_value.number; - break; - case AMF_INTEGER: - m_value.integer = from.m_value.integer; - break; - case AMF_BOOLEAN: - m_value.boolean = from.m_value.boolean; - break; - default: - break; - } -} - -void amf_write(Encoder *enc, const std::string &s) -{ - enc->buf += char(AMF0_STRING); - uint16_t str_len = htons(s.size()); - enc->buf.append((char *) &str_len, 2); - enc->buf += s; -} - -void amf_write(Encoder *enc, int i) -{ - throw std::runtime_error("AMF0 does not have integers"); -} - -void amf_write(Encoder *enc, double n) -{ - enc->buf += char(AMF0_NUMBER); - uint64_t encoded = 0; -//#if defined(__i386__) || defined(__x86_64__) - /* Flash uses same floating point format as x86 */ - memcpy(&encoded, &n, 8); -//#endif - uint32_t val = htonl(encoded >> 32); - enc->buf.append((char *) &val, 4); - val = htonl(encoded); - enc->buf.append((char *) &val, 4); -} - -void amf_write(Encoder *enc, bool b) -{ - enc->buf += char(AMF0_BOOLEAN); - enc->buf += char(b); -} - -void amf_write_key(Encoder *enc, const std::string &s) -{ - uint16_t str_len = htons(s.size()); - enc->buf.append((char *) &str_len, 2); - enc->buf += s; -} - -void amf_write(Encoder *enc, const amf_object_t &object) -{ - enc->buf += char(AMF0_OBJECT); - for (amf_object_t::const_iterator i = object.begin(); - i != object.end(); ++i) { - amf_write_key(enc, i->first); - amf_write(enc, i->second); - } - amf_write_key(enc, ""); - enc->buf += char(AMF0_OBJECT_END); -} - -void amf_write_ecma(Encoder *enc, const amf_object_t &object) -{ - enc->buf += char(AMF0_ECMA_ARRAY); - uint32_t zero = 0; - enc->buf.append((char *) &zero, 4); - for (amf_object_t::const_iterator i = object.begin(); - i != object.end(); ++i) { - amf_write_key(enc, i->first); - amf_write(enc, i->second); - } - amf_write_key(enc, ""); - enc->buf += char(AMF0_OBJECT_END); -} - -void amf_write_null(Encoder *enc) -{ - enc->buf += char(AMF0_NULL); -} - -void amf_write(Encoder *enc, const AMFValue &value) -{ - switch (value.type()) { - case AMF_STRING: - amf_write(enc, value.as_string()); - break; - case AMF_NUMBER: - amf_write(enc, value.as_number()); - break; - case AMF_INTEGER: - amf_write(enc, value.as_integer()); - break; - case AMF_BOOLEAN: - amf_write(enc, value.as_boolean()); - break; - case AMF_OBJECT: - amf_write(enc, value.as_object()); - break; - case AMF_ECMA_ARRAY: - amf_write_ecma(enc, value.as_object()); - break; - case AMF_NULL: - amf_write_null(enc); - break; - default: - break; - } -} - -unsigned int load_amf3_integer(Decoder *dec) -{ - unsigned int value = 0; - for (int i = 0; i < 4; ++i) { - uint8_t b = get_byte(dec); - if (i == 3) { - /* use all bits from 4th byte */ - value = (value << 8) | b; - break; - } - value = (value << 7) | (b & 0x7f); - if ((b & 0x80) == 0) - break; - } - return value; -} - -std::string amf_load_string(Decoder *dec) -{ - size_t str_len = 0; - uint8_t type = get_byte(dec); - if (dec->version == 3) { - if (type != AMF3_STRING) { - throw std::runtime_error("Expected a string"); - } - str_len = load_amf3_integer(dec) / 2; - - } else { - if (type != AMF0_STRING) { - throw std::runtime_error("Expected a string"); - } - if (dec->pos + 2 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - str_len = load_be16(&dec->buf[dec->pos]); - dec->pos += 2; - } - if (dec->pos + str_len > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - std::string s(dec->buf, dec->pos, str_len); - dec->pos += str_len; - return s; -} - -double amf_load_number(Decoder *dec) -{ - if (get_byte(dec) != AMF0_NUMBER) { - throw std::runtime_error("Expected a string"); - } - if (dec->pos + 8 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) | - load_be32(&dec->buf[dec->pos + 4]); - double n = 0; -//#if defined(__i386__) || defined(__x86_64__) - /* Flash uses same floating point format as x86 */ - memcpy(&n, &val, 8); -//#endif - dec->pos += 8; - return n; -} - -int amf_load_integer(Decoder *dec) -{ - if (dec->version == 3) { - return load_amf3_integer(dec); - } else { - return amf_load_number(dec); - } -} - -bool amf_load_boolean(Decoder *dec) -{ - if (get_byte(dec) != AMF0_BOOLEAN) { - throw std::runtime_error("Expected a boolean"); - } - return get_byte(dec) != 0; -} - -std::string amf_load_key(Decoder *dec) -{ - if (dec->pos + 2 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - size_t str_len = load_be16(&dec->buf[dec->pos]); - dec->pos += 2; - if (dec->pos + str_len > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - std::string s(dec->buf, dec->pos, str_len); - dec->pos += str_len; - return s; -} - -amf_object_t amf_load_object(Decoder *dec) -{ - amf_object_t object; - if (get_byte(dec) != AMF0_OBJECT) { - throw std::runtime_error("Expected an object"); - } - while (1) { - std::string key = amf_load_key(dec); - if (key.empty()) - break; - AMFValue value = amf_load(dec); - object.insert(std::make_pair(key, value)); - } - if (get_byte(dec) != AMF0_OBJECT_END) { - throw std::runtime_error("expected object end"); - } - return object; -} - -amf_object_t amf_load_ecma(Decoder *dec) -{ - /* ECMA array is the same as object, with 4 extra zero bytes */ - amf_object_t object; - if (get_byte(dec) != AMF0_ECMA_ARRAY) { - throw std::runtime_error("Expected an ECMA array"); - } - if (dec->pos + 4 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - dec->pos += 4; - while (1) { - std::string key = amf_load_key(dec); - if (key.empty()) - break; - AMFValue value = amf_load(dec); - object.insert(std::make_pair(key, value)); - } - if (get_byte(dec) != AMF0_OBJECT_END) { - throw std::runtime_error("expected object end"); - } - return object; -} - -AMFValue amf_load(Decoder *dec) -{ - uint8_t type = peek(dec); - if (dec->version == 3) { - switch (type) { - case AMF3_STRING: - return AMFValue(amf_load_string(dec)); - case AMF3_NUMBER: - return AMFValue(amf_load_number(dec)); - case AMF3_INTEGER: - return AMFValue(amf_load_integer(dec)); - case AMF3_FALSE: - dec->pos++; - return AMFValue(false); - case AMF3_TRUE: - dec->pos++; - return AMFValue(true); - case AMF3_OBJECT: - return AMFValue(amf_load_object(dec)); - case AMF3_ARRAY: - return AMFValue(amf_load_ecma(dec)); - case AMF3_NULL: - dec->pos++; - return AMFValue(AMF_NULL); - case AMF3_UNDEFINED: - dec->pos++; - return AMFValue(AMF_UNDEFINED); - default: - throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type)); - } - } else { - switch (type) { - case AMF0_STRING: - return AMFValue(amf_load_string(dec)); - case AMF0_NUMBER: - return AMFValue(amf_load_number(dec)); - case AMF0_BOOLEAN: - return AMFValue(amf_load_boolean(dec)); - case AMF0_OBJECT: - return AMFValue(amf_load_object(dec)); - case AMF0_ECMA_ARRAY: - return AMFValue(amf_load_ecma(dec)); - case AMF0_NULL: - dec->pos++; - return AMFValue(AMF_NULL); - case AMF0_UNDEFINED: - dec->pos++; - return AMFValue(AMF_UNDEFINED); - default: - throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type)); - } - } -} diff --git a/amf.h b/amf.h deleted file mode 100644 index 6d5738e..0000000 --- a/amf.h +++ /dev/null @@ -1,167 +0,0 @@ -/** - * @file amf.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ - -#ifndef AMF_H_ -#define AMF_H_ - -#include -#include -#include - -enum AMFType { - AMF_NUMBER, - AMF_INTEGER, - AMF_BOOLEAN, - AMF_STRING, - AMF_OBJECT, - AMF_NULL, - AMF_UNDEFINED, - AMF_ECMA_ARRAY, -}; - -enum { - AMF0_NUMBER, - AMF0_BOOLEAN, - AMF0_STRING, - AMF0_OBJECT, - AMF0_MOVIECLIP, - AMF0_NULL, - AMF0_UNDEFINED, - AMF0_REFERENCE, - AMF0_ECMA_ARRAY, - AMF0_OBJECT_END, - AMF0_STRICT_ARRAY, - AMF0_DATE, - AMF0_LONG_STRING, - AMF0_UNSUPPORTED, - AMF0_RECORD_SET, - AMF0_XML_OBJECT, - AMF0_TYPED_OBJECT, - AMF0_SWITCH_AMF3, -}; - -enum { - AMF3_UNDEFINED, - AMF3_NULL, - AMF3_FALSE, - AMF3_TRUE, - AMF3_INTEGER, - AMF3_NUMBER, - AMF3_STRING, - AMF3_LEGACY_XML, - AMF3_DATE, - AMF3_ARRAY, - AMF3_OBJECT, - AMF3_XML, - AMF3_BYTE_ARRAY, -}; - -struct Decoder { - std::string buf; - size_t pos; - int version; -}; - -struct Encoder { - std::string buf; -}; - -class AMFValue; - -typedef std::map amf_object_t; - -class AMFValue { -public: - AMFValue(AMFType type = AMF_NULL); - AMFValue(const std::string &s); - AMFValue(double n); - AMFValue(int i); - AMFValue(bool b); - AMFValue(const amf_object_t &object); - AMFValue(const AMFValue &from); - ~AMFValue(); - - AMFType type() const { return m_type; } - - std::string as_string() const - { - assert(m_type == AMF_STRING); - return *m_value.string; - } - double as_number() const - { - assert(m_type == AMF_NUMBER); - return m_value.number; - } - double as_integer() const - { - assert(m_type == AMF_INTEGER); - return m_value.integer; - } - bool as_boolean() const - { - assert(m_type == AMF_BOOLEAN); - return m_value.boolean; - } - amf_object_t as_object() const - { - assert(m_type == AMF_OBJECT); - return *m_value.object; - } - - AMFValue get(const std::string &s) const - { - assert(m_type == AMF_OBJECT); - amf_object_t::const_iterator i = m_value.object->find(s); - if (i == m_value.object->end()) - return AMFValue(); - return i->second; - } - - void set(const std::string &s, const AMFValue &val) - { - assert(m_type == AMF_OBJECT); - m_value.object->insert(std::make_pair(s, val)); - } - - void operator = (const AMFValue &from); - -private: - AMFType m_type; - union { - std::string *string; - double number; - int integer; - bool boolean; - amf_object_t *object; - } m_value; - - void destroy(); -}; - -void amf_write(Encoder *enc, const std::string &s); -void amf_write(Encoder *enc, double n); -void amf_write(Encoder *enc, bool b); -void amf_write_key(Encoder *enc, const std::string &s); -void amf_write(Encoder *enc, const amf_object_t &object); -void amf_write_ecma(Encoder *enc, const amf_object_t &object); -void amf_write_null(Encoder *enc); -void amf_write(Encoder *enc, const AMFValue &value); - -std::string amf_load_string(Decoder *dec); -double amf_load_number(Decoder *dec); -bool amf_load_boolean(Decoder *dec); -std::string amf_load_key(Decoder *dec); -amf_object_t amf_load_object(Decoder *dec); -amf_object_t amf_load_ecma(Decoder *dec); -AMFValue amf_load(Decoder *dec); - -#endif diff --git a/main.c b/main.c index 2abe3d0..47c33d9 100644 --- a/main.c +++ b/main.c @@ -58,6 +58,25 @@ void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) { RingBuffer_write_word32be(rb, size + 11); } +void on_srt_start(void *ctx) { + MainCtx *main_ctx = ctx; + main_ctx->ringbuf = malloc(sizeof(RingBuffer)); + RingBuffer_init(main_ctx->ringbuf, 4096); + RingBuffer *rb = main_ctx->ringbuf; + TranscodeTalker_new_stream(&main_ctx->transcode_talker, rb); +} + +void on_srt_stop(void *ctx) { + MainCtx *main_ctx = ctx; + RingBuffer_end(main_ctx->ringbuf); +} + +void on_srt_data(void *ctx, char *buf, size_t size) { + MainCtx *main_ctx = ctx; + RingBuffer *rb = main_ctx->ringbuf; + RingBuffer_write(rb, (const uint8_t*)buf, size); +} + int main(int argc, char **argv) { ezlive_config = malloc(sizeof(EZLiveConfig)); EZLiveConfig_init(ezlive_config); @@ -76,11 +95,10 @@ int main(int argc, char **argv) { } srand((unsigned) time(NULL)); MainCtx main_ctx; - RtmpCallbacks rtmp_cbs = { - .on_audio = &on_rtmp_audio, - .on_video = &on_rtmp_video, - .on_start = &on_rtmp_start, - .on_stop = &on_rtmp_stop, + SrtCallbacks srt_cbs = { + .on_data = &on_srt_data, + .on_stop = &on_srt_stop, + .on_start = &on_srt_start, }; TranscodeTalker_init(&main_ctx.transcode_talker); @@ -92,6 +110,7 @@ int main(int argc, char **argv) { pthread_t s3worker_thread; pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL); - start_rtmpserver(rtmp_cbs, &main_ctx); + // start_rtmpserver(rtmp_cbs, &main_ctx); + start_srt_server(srt_cbs, &main_ctx); return 0; } \ No newline at end of file diff --git a/rtmp.h b/rtmp.h deleted file mode 100644 index bdc97cc..0000000 --- a/rtmp.h +++ /dev/null @@ -1,74 +0,0 @@ -/** - * @file rtmp.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ - -#ifndef RTMP_H_ -#define RTMP_H_ - -#include - -#define PORT 1935 - -#define DEFAULT_CHUNK_LEN 128 - -#define PACKED __attribute__((packed)) - -#define HANDSHAKE_PLAINTEXT 0x03 - -#define RANDOM_LEN (1536 - 8) - -#define MSG_SET_CHUNK 0x01 -#define MSG_BYTES_READ 0x03 -#define MSG_USER_CONTROL 0x04 -#define MSG_RESPONSE 0x05 -#define MSG_REQUEST 0x06 -#define MSG_AUDIO 0x08 -#define MSG_VIDEO 0x09 -#define MSG_INVOKE3 0x11 /* AMF3 */ -#define MSG_NOTIFY 0x12 -#define MSG_OBJECT 0x13 -#define MSG_INVOKE 0x14 /* AMF0 */ -#define MSG_FLASH_VIDEO 0x16 - -#define CONTROL_CLEAR_STREAM 0x00 -#define CONTROL_CLEAR_BUFFER 0x01 -#define CONTROL_STREAM_DRY 0x02 -#define CONTROL_BUFFER_TIME 0x03 -#define CONTROL_RESET_STREAM 0x04 -#define CONTROL_PING 0x06 -#define CONTROL_REQUEST_VERIFY 0x1a -#define CONTROL_RESPOND_VERIFY 0x1b -#define CONTROL_BUFFER_EMPTY 0x1f -#define CONTROL_BUFFER_READY 0x20 - -#define CONTROL_ID 0 -#define STREAM_ID 1337 - -#define CHAN_CONTROL 2 -#define CHAN_RESULT 3 -#define CHAN_STREAM 4 - -#define FLV_KEY_FRAME 0x01 -#define FLV_INTER_FRAME 0x02 - -struct Handshake { - uint8_t flags[8]; - uint8_t random[RANDOM_LEN]; -} PACKED; - -struct RTMP_Header { - uint8_t flags; - char timestamp[3]; - char msg_len[3]; - uint8_t msg_type; - char endpoint[4]; /* Note, this is little-endian while others are BE */ -} PACKED; - -#endif diff --git a/rtmpserver.cpp b/rtmpserver.cpp deleted file mode 100644 index cb0766e..0000000 --- a/rtmpserver.cpp +++ /dev/null @@ -1,862 +0,0 @@ -#include "amf.h" -#include "rtmputils.h" -#include "rtmp.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rtmpserver.h" - -extern "C" { -#include "ezlive_config.h" -} - -#define APP_NAME "live" - -RtmpCallbacks g_rtmp_server_cbs; -void *g_rtmp_server_ctx; - -struct RTMP_Message { - uint8_t type; - size_t len; - unsigned long timestamp; - uint32_t endpoint; - std::string buf; -}; - -struct Client { - int fd; - bool publisher; - bool playing; /* Wants to receive the stream? */ - bool ready; /* Wants to receive and seen a keyframe */ - RTMP_Message messages[64]; - std::string buf; - std::string send_queue; - std::string path; - size_t chunk_len; - uint32_t written_seq; - uint32_t read_seq; - int64_t livets; -}; - -namespace { - -amf_object_t metadata; -int listen_fd; -std::vector poll_table; -std::vector clients; - -int set_nonblock(int fd, bool enabled) -{ - int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK; - if (enabled) { - flags |= O_NONBLOCK; - } - return fcntl(fd, F_SETFL, flags); -} - -size_t recv_all(int fd, void *buf, size_t len) -{ - size_t pos = 0; - while (pos < len) { - ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0); - if (bytes < 0) { - if (errno == EAGAIN || errno == EINTR) - continue; - throw std::runtime_error(strf("unable to recv: %s", strerror(errno))); - } - if (bytes == 0) - break; - pos += bytes; - } - return pos; -} - -size_t send_all(int fd, const void *buf, size_t len) -{ - size_t pos = 0; - while (pos < len) { - ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0); - if (written < 0) { - if (errno == EAGAIN || errno == EINTR) - continue; - throw std::runtime_error(strf("unable to send: %s", strerror(errno))); - } - if (written == 0) - break; - pos += written; - } - return pos; -} - -bool is_safe(uint8_t b) -{ - return b >= ' ' && b < 128; -} - -void hexdump(const void *buf, size_t len) -{ - const uint8_t *data = (const uint8_t *) buf; - for (size_t i = 0; i < len; i += 16) { - for (int j = 0; j < 16; ++j) { - if (i + j < len) - debug("%.2x ", data[i + j]); - else - debug(" "); - } - for (int j = 0; j < 16; ++j) { - if (i + j < len) { - putc(is_safe(data[i + j]) ? data[i + j] : '.', - stdout); - } else { - putc(' ', stdout); - } - } - putc('\n', stdout); - } -} - -void try_to_send(void *data) -{ - Client *client = (Client *)data; - size_t len = client->send_queue.size(); - if (len > 4096) - len = 4096; - - ssize_t written = send(client->fd, client->send_queue.data(), len, 0); - if (written < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - throw std::runtime_error(strf("unable to write to a client: %s", - strerror(errno))); - } - - client->send_queue.erase(0, written); -} - -void rtmp_send(Client *client, uint8_t type, uint32_t endpoint, - const std::string &buf, unsigned long timestamp = 0, - int channel_num = CHAN_CONTROL) -{ - if (endpoint == STREAM_ID) { - /* - * For some unknown reason, stream-related msgs must be sent - * on a specific channel. - */ - channel_num = CHAN_STREAM; - } - - RTMP_Header header; - header.flags = (channel_num & 0x3f) | (0 << 6); - header.msg_type = type; - set_be24(header.timestamp, timestamp); - set_be24(header.msg_len, buf.size()); - set_le32(header.endpoint, endpoint); - - client->send_queue.append((char *) &header, sizeof header); - client->written_seq += sizeof header; - - size_t pos = 0; - while (pos < buf.size()) { - if (pos) { - uint8_t flags = (channel_num & 0x3f) | (3 << 6); - client->send_queue += char(flags); - - client->written_seq += 1; - } - - size_t chunk = buf.size() - pos; - if (chunk > client->chunk_len) - chunk = client->chunk_len; - client->send_queue.append(buf, pos, chunk); - - client->written_seq += chunk; - pos += chunk; - } - - try_to_send(client); -} - -void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(), - const AMFValue &status = AMFValue()) -{ - if (txid <= 0.0) - return; - Encoder invoke; - amf_write(&invoke, std::string("_result")); - amf_write(&invoke, txid); - amf_write(&invoke, reply); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT); -} - -void handle_connect(Client *client, double txid, Decoder *dec) -{ - amf_object_t params = amf_load_object(dec); - std::string app = get(params, std::string("app")).as_string(); - std::string ver = "(unknown)"; - - AMFValue flashver = get(params, std::string("flashVer")); - if (flashver.type() == AMF_STRING) { - ver = flashver.as_string(); - } - - /* - if (app != APP_NAME) { - throw std::runtime_error("Unsupported application: " + app); - } - */ - - printf("connect: %s (version %s)\n", app.c_str(), ver.c_str()); - - amf_object_t version; - version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484"))); - version.insert(std::make_pair("capabilities", 255.0)); - version.insert(std::make_pair("mode", 1.0)); - - amf_object_t status; - status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success"))); - /* report support for AMF3 */ - // status.insert(std::make_pair("objectEncoding", 3.0)); - - send_reply(client, txid, version, status); - - uint32_t chunk_len = htonl(1024); - std::string set_chunk((char *) &chunk_len, 4); - rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk); -/* - client->chunk_len = 1024; -*/ -} - -int publishernum = 0; - -void handle_fcpublish(Client *client, double txid, Decoder *dec) -{ - if (publishernum > 0) { - throw std::runtime_error{"only one publisher."}; - } - publishernum = 1; - g_rtmp_server_cbs.on_start(g_rtmp_server_ctx); - client->publisher = true; - printf( "publisher connected."); - - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - debug("fcpublish %s\n", path.c_str()); - printf("fcpublish %s\n", path.c_str()); - if (strlen(ezlive_config->key) > 0) { - if (strcmp(ezlive_config->key, path.c_str()) != 0) { - publishernum = 0; - printf( "wrong live key.\n"); - throw std::runtime_error{"wrong live key."}; - } - } - - client->path = path; - - amf_object_t status; - status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); - status.insert(std::make_pair("description", path)); - - Encoder invoke; - amf_write(&invoke, std::string("onFCPublish")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf); - - send_reply(client, txid); -} - -void handle_createstream(Client *client, double txid, Decoder *dec) -{ - send_reply(client, txid, AMFValue(), double(STREAM_ID)); -} - -void handle_publish(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - debug("publish %s\n", path.c_str()); - - client->path = path; - - amf_object_t status; - status.insert(std::make_pair("level", std::string("status"))); - status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); - status.insert(std::make_pair("description", std::string("Stream is now published."))); - status.insert(std::make_pair("details", path)); - - Encoder invoke; - amf_write(&invoke, std::string("onStatus")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); - - send_reply(client, txid); -} - -void start_playback(Client *client) -{ - throw std::runtime_error{"playback not supported."}; -} - -void handle_play(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - - debug("play %s\n", path.c_str()); - client->path = path; - - start_playback(client); - - send_reply(client, txid); -} - -void handle_play2(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - amf_object_t params = amf_load_object(dec); - std::string path = get(params, std::string("streamName")).as_string(); - - debug("play %s\n", path.c_str()); - client->path = path; - start_playback(client); - - send_reply(client, txid); -} - -void handle_pause(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - bool paused = amf_load_boolean(dec); - - if (paused) { - debug("pausing\n"); - - amf_object_t status; - status.insert(std::make_pair("level", std::string("status"))); - status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify"))); - status.insert(std::make_pair("description", std::string("Pausing."))); - - Encoder invoke; - amf_write(&invoke, std::string("onStatus")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); - client->playing = false; - } else { - start_playback(client); - } - - send_reply(client, txid); -} - -void handle_setdataframe(Client *client, Decoder *dec) -{ - if(client->publisher == false) { - printf("not a publisher"); - throw std::runtime_error("not a publisher"); - } - - std::string type = amf_load_string(dec); - if (type != "onMetaData") { - throw std::runtime_error("can only set metadata"); - } - - metadata = amf_load_ecma(dec); - - Encoder notify; - amf_write(¬ify, std::string("onMetaData")); - amf_write_ecma(¬ify, metadata); - - FOR_EACH(std::vector, i, clients) { - Client *receiver = *i; - if (receiver != NULL && receiver->playing && receiver->path == client->path) { - rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf); - } - } -} - -void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec) -{ - std::string method = amf_load_string(dec); - double txid = amf_load_number(dec); - - debug( "invoked %s txid %f\n", method.c_str(), txid); - - if (msg->endpoint == CONTROL_ID) { - if (method == "connect") { - handle_connect(client, txid, dec); - } else if (method == "FCPublish") { - handle_fcpublish(client, txid, dec); - } else if (method == "createStream") { - handle_createstream(client, txid, dec); - } - - } else if (msg->endpoint == STREAM_ID) { - if (method == "publish") { - handle_publish(client, txid, dec); - } else if (method == "play") { - handle_play(client, txid, dec); - } else if (method == "play2") { - handle_play2(client, txid, dec); - } else if (method == "pause") { - handle_pause(client, txid, dec); - } - } -} - -void handle_message(Client *client, RTMP_Message *msg) -{ - - debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len, - msg->timestamp); - - size_t pos = 0; - - switch (msg->type) { - case MSG_BYTES_READ: - if (pos + 4 > msg->buf.size()) { - throw std::runtime_error("Not enough data"); - } - client->read_seq = load_be32(&msg->buf[pos]); - debug("%d in queue\n", - int(client->written_seq - client->read_seq)); - break; - - case MSG_SET_CHUNK: - if (pos + 4 > msg->buf.size()) { - throw std::runtime_error("Not enough data"); - } - client->chunk_len = load_be32(&msg->buf[pos]); - debug("chunk size set to %zu\n", client->chunk_len); - break; - - case MSG_INVOKE: { - debug("handling message invoke 0 \n"); - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 0; - handle_invoke(client, msg, &dec); - } - break; - - case MSG_INVOKE3: { - debug("handling message invoke 3 \n"); - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 1; - handle_invoke(client, msg, &dec); - } - break; - - case MSG_NOTIFY: { - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 0; - std::string type = amf_load_string(&dec); - debug("notify %s\n", type.c_str()); - if (msg->endpoint == STREAM_ID) { - if (type == "@setDataFrame") { - handle_setdataframe(client, &dec); - } - } - } - break; - - case MSG_AUDIO: - if(client->publisher == false) { - throw std::runtime_error("not a publisher"); - } - g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); - // FOR_EACH(std::vector, i, clients) { - // Client *receiver = *i; - // if (receiver != NULL && receiver->ready) { - // rtmp_send(receiver, MSG_AUDIO, STREAM_ID, - // msg->buf, msg->timestamp); - // } - // } - break; - case MSG_VIDEO: { - if(client->publisher == false){ - throw std::runtime_error("not a publisher"); - } - // uint8_t flags = msg->buf[0]; - g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); - // FOR_EACH(std::vector, i, clients) { - // Client *receiver = *i; - // if (receiver != NULL && receiver->playing && - // client->path == receiver->path) { - // if (flags >> 4 == FLV_KEY_FRAME && - // !receiver->ready) { - // std::string control; - // uint16_t type = htons(CONTROL_CLEAR_STREAM); - // control.append((char *) &type, 2); - // uint32_t stream = htonl(STREAM_ID); - // control.append((char *) &stream, 4); - // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control); - // receiver->ready = true; - // } - // if (receiver->ready) { - // static int flag = 0; - // if(flag < 10) { - // flag ++; - // } - // rtmp_send(receiver, MSG_VIDEO, - // STREAM_ID, msg->buf, - // msg->timestamp); - // } - // } - // } - } - break; - - case MSG_FLASH_VIDEO: - //printf( "streaming FLV not supported"); - throw std::runtime_error("streaming FLV not supported"); - break; - - default: - debug("unhandled message: %02x\n", msg->type); - hexdump(msg->buf.data(), msg->buf.size()); - break; - } -} - -/* TODO: Make this asynchronous */ -void do_handshake(Client *client) -{ - Handshake serversig; - Handshake clientsig; - - uint8_t c; - if (recv_all(client->fd, &c, 1) < 1) - return; - if (c != HANDSHAKE_PLAINTEXT) { - //printf( "only plaintext handshake supported"); - throw std::runtime_error("only plaintext handshake supported"); - } - - if (send_all(client->fd, &c, 1) < 1) - return; - - memset(&serversig, 0, sizeof serversig); - serversig.flags[0] = 0x03; - for (int i = 0; i < RANDOM_LEN; ++i) { - serversig.random[i] = rand(); - } - - if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig) - return; - - /* Echo client's signature back */ - if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - - if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) { - //printf( "invalid handshake"); - throw std::runtime_error("invalid handshake"); - } - - client->read_seq = 1 + sizeof serversig * 2; - client->written_seq = 1 + sizeof serversig * 2; -} - -void recv_from_client(void *data) -{ - Client *client = (Client *)data; - - std::string chunk(4096, 0); - ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0); - if (got == 0) { - //printf( "end of life from a client"); - throw std::runtime_error("EOF from a client"); - } else if (got < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - //printf( "unable to read from a client: %s",strerror(errno)); - throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno))); - } - client->buf.append(chunk, 0, got); - - //printf( "got data and length is %d",client->buf.size()); - - while (!client->buf.empty()) { - uint8_t flags = client->buf[0]; - - static const size_t HEADER_LENGTH[] = {12, 8, 4, 1}; - size_t header_len = HEADER_LENGTH[flags >> 6]; - - if (client->buf.size() < header_len) { - /* need more data */ - break; - } - - RTMP_Header header; - memcpy(&header, client->buf.data(), header_len); - - RTMP_Message *msg = &client->messages[flags & 0x3f]; - - if (header_len >= 8) { - msg->len = load_be24(header.msg_len); - if (msg->len < msg->buf.size()) { - throw std::runtime_error("invalid msg length"); - } - msg->type = header.msg_type; - } - if (header_len >= 12) { - msg->endpoint = load_le32(header.endpoint); - } - - if (msg->len == 0) { - throw std::runtime_error("message without a header"); - } - size_t chunk = msg->len - msg->buf.size(); - if (chunk > client->chunk_len) - chunk = client->chunk_len; - - if (client->buf.size() < header_len + chunk) { - /* need more data */ - break; - } - - if (header_len >= 4) { - unsigned long ts = load_be24(header.timestamp); - if (ts == 0xffffff) { - throw std::runtime_error("ext timestamp not supported"); - } - if (header_len < 12) { - ts += msg->timestamp; - } - msg->timestamp = ts; - } - - msg->buf.append(client->buf, header_len, chunk); - client->buf.erase(0, header_len + chunk); - - if (msg->buf.size() == msg->len) { - handle_message(client, msg); - msg->buf.clear(); - } - } -} - -Client *new_client() -{ - sockaddr_in sin; - socklen_t addrlen = sizeof sin; - int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen); - if (fd < 0) { - printf("Unable to accept a client: %s\n", strerror(errno)); - return NULL; - } - - Client *client = new Client; - client->publisher = false; - client->playing = false; - client->ready = false; - client->fd = fd; - client->written_seq = 0; - client->read_seq = 0; - client->chunk_len = DEFAULT_CHUNK_LEN; - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - for (int i = 0; i < 64; ++i) { - client->messages[i].timestamp = 0; - client->messages[i].len = 0; - } - - try { - do_handshake(client); - } catch (const std::runtime_error &e) { - printf("handshake failed: %s\n", e.what()); - close(fd); - delete client; - return NULL; - } - - set_nonblock(fd, true); - - pollfd entry; - entry.events = POLLIN; - entry.revents = 0; - entry.fd = fd; - poll_table.push_back(entry); - clients.push_back(client); - - return client; -} - -void close_client(Client *client, size_t i) -{ - clients.erase(clients.begin() + i); - poll_table.erase(poll_table.begin() + i); - close(client->fd); - - if (client->publisher == true) { - printf("publisher disconnected.\n"); - client->publisher = false; - publishernum = 0; - g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx); - FOR_EACH(std::vector, i, clients) { - Client *client = *i; - if (client != NULL) { - client->ready = false; - } - } - } - delete client; -} - -void do_poll(void) -{ - for (size_t i = 0; i < poll_table.size(); ++i) { - Client *client = clients[i]; - if (client != NULL) { - if (!client->send_queue.empty()) { - //debug("waiting for pollout\n"); - poll_table[i].events = POLLIN | POLLOUT; - } else { - poll_table[i].events = POLLIN; - } - } - } - - if (poll(&poll_table[0], poll_table.size(), -1) < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - throw std::runtime_error(strf("poll() failed: %s", - strerror(errno))); - } - - for (size_t i = 0; i < poll_table.size(); ++i) { - Client *client = clients[i]; - if (poll_table[i].revents & POLLOUT) { - try { - try_to_send(client); - } catch (const std::runtime_error &e) { - printf("client error: %s\n", e.what()); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - } - if (poll_table[i].revents & POLLIN) { - if (client == NULL) { - new_client(); - } else { - try { - recv_from_client(client); - } catch (const std::runtime_error &e) { - printf("client error: %s\n", e.what()); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - } - } - if ((poll_table[i].revents & POLLHUP) - || (poll_table[i].revents & POLLERR)) { - fprintf(stderr, "client error.\n"); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - if (client != NULL && ts.tv_sec - client->livets > 60) { - fprintf(stderr, "client timeout.\n"); - close_client(client, i); - --i; - } - } -} - -} - -int g_rtmp_server_quit = 0; - -void start_rtmpserver(RtmpCallbacks cbs, void *ctx) { - g_rtmp_server_cbs = cbs; - g_rtmp_server_ctx = ctx; - try { - listen_fd = socket(AF_INET, SOCK_STREAM, 0); - if (listen_fd < 0) - return; - int opt = 1; - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { - perror("setsockopt SO_REUSEADDR"); - exit(EXIT_FAILURE); - } - sockaddr_in sin; - sin.sin_family = AF_INET; - sin.sin_port = htons(ezlive_config->listening_port); - struct in_addr addr; - if (inet_pton(AF_INET, ezlive_config->listening_addr, &addr) <= 0) { - fprintf(stderr, "Invalid IP address\n"); - exit(-1); - } - sin.sin_addr.s_addr = addr.s_addr; - if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) { - throw std::runtime_error(strf("Unable to listen: %s",strerror(errno))); - return; - } - - listen(listen_fd, 10); - - pollfd entry; - entry.events = POLLIN; - entry.revents = 0; - entry.fd = listen_fd; - poll_table.push_back(entry); - clients.push_back(NULL); - - for (;;) { - if (g_rtmp_server_quit) { - return; - } - do_poll(); - } - return; - } catch (const std::runtime_error &e) { - fprintf(stderr, "ERROR: %s\n", e.what()); - return; - } -} diff --git a/rtmpserver.h b/rtmpserver.h deleted file mode 100644 index c9383b4..0000000 --- a/rtmpserver.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef RTMPSERVER_H_ -#define RTMPSERVER_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include - -typedef struct { - void (*on_start)(void *ctx); - void (*on_stop)(void *ctx); - void (*on_video)(void* ctx, int64_t timestamp, char *buf, size_t size); - void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size); -} RtmpCallbacks; - -void start_rtmpserver(RtmpCallbacks cbs, void *ctx); - -#ifdef __cplusplus -} -#endif - - -#endif \ No newline at end of file diff --git a/rtmputils.cpp b/rtmputils.cpp deleted file mode 100644 index 59088f1..0000000 --- a/rtmputils.cpp +++ /dev/null @@ -1,67 +0,0 @@ -#include "rtmputils.h" -#include -#include -#include -#include -#include -#include - -/* - * Used to do unaligned loads on archs that don't support them. GCC can mostly - * optimize these away. - */ -uint32_t load_be32(const void *p) -{ - uint32_t val; - memcpy(&val, p, sizeof val); - return ntohl(val); -} - -uint16_t load_be16(const void *p) -{ - uint16_t val; - memcpy(&val, p, sizeof val); - return ntohs(val); -} - -uint32_t load_le32(const void *p) -{ - const uint8_t *data = (const uint8_t *) p; - return data[0] | ((uint32_t) data[1] << 8) | - ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24); -} - -uint32_t load_be24(const void *p) -{ - const uint8_t *data = (const uint8_t *) p; - return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16); -} - -void set_be24(void *p, uint32_t val) -{ - uint8_t *data = (uint8_t *) p; - data[0] = val >> 16; - data[1] = val >> 8; - data[2] = val; -} - -void set_le32(void *p, uint32_t val) -{ - uint8_t *data = (uint8_t *) p; - data[0] = val; - data[1] = val >> 8; - data[2] = val >> 16; - data[3] = val >> 24; -} - -const std::string strf(const char *fmt, ...) -{ - va_list vl; - va_start(vl, fmt); - char *buf = NULL; - vasprintf(&buf, fmt, vl); - va_end(vl); - std::string s(buf); - free(buf); - return s; -} diff --git a/rtmputils.h b/rtmputils.h deleted file mode 100644 index f873232..0000000 --- a/rtmputils.h +++ /dev/null @@ -1,49 +0,0 @@ -/** - * @file utils.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ -#ifndef RTMPUTILS_H_ -#define RTMPUTILS_H_ - -#include -#include -#include -#include - -#define FOR_EACH(type, i, where) \ - for (typename type::iterator i = (where).begin(); i != (where).end(); ++i) - -#define FOR_EACH_CONST(type, i, where) \ - for (typename type::const_iterator i = (where).begin(); \ - i != (where).end(); ++i) - -// #define debug(fmt...) fprintf(stderr, fmt) - -#define debug(fmt...) - -template -Value get(const std::map &map, const Key &k, - const Value &def = Value()) -{ - typename std::map::const_iterator i = map.find(k); - if (i == map.end()) - return def; - return i->second; -} - -uint32_t load_be32(const void *p); -uint16_t load_be16(const void *p); -uint32_t load_be24(const void *p); -uint32_t load_le32(const void *p); -void set_be24(void *p, uint32_t val); -void set_le32(void *p, uint32_t val); - -const std::string strf(const char *fmt, ...); - -#endif diff --git a/srtserver.c b/srtserver.c new file mode 100644 index 0000000..97ce272 --- /dev/null +++ b/srtserver.c @@ -0,0 +1,127 @@ +#include "srtserver.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ezlive_config.h" + +#define BUFFER_SIZE 1500 + +int handshake_callback(void* opaq, SRTSOCKET ns, int hs_version, const struct sockaddr* peeraddr, const char* streamid) { + char addr_str[INET_ADDRSTRLEN]; + struct sockaddr_in* sin = (struct sockaddr_in*)peeraddr; + inet_ntop(AF_INET, &(sin->sin_addr), addr_str, INET_ADDRSTRLEN); + + printf("[Callback] Incoming connection from: %s:%d\n", addr_str, ntohs(sin->sin_port)); + printf("[Callback] Client Stream ID: %s\n", streamid); + + if (streamid == NULL || strlen(streamid) == 0) { + printf("[Callback] Rejected: No Stream ID provided.\n"); + return -1; + } + + if (strlen(ezlive_config->key) == 0) { + printf("[Callback] No key. Skip auth.\n"); + return 0; + } + if (strcmp(streamid, ezlive_config->key) != 0) { + printf("[Callback] Rejected: Invalid Key. Expected '%s', got '%s'\n", ezlive_config->key, streamid); + return -1; + } + + printf("[Callback] Key validated. Connection Accepted.\n"); + return 0; +} + +void start_srt_server(SrtCallbacks srtcb, void *ctx) { + if (srt_startup() != 0) { + fprintf(stderr, "SRT startup failed.\n"); + return; + } + + SRTSOCKET bind_sock = srt_create_socket(); + if (bind_sock == SRT_INVALID_SOCK) { + fprintf(stderr, "Error creating socket: %s\n", srt_getlasterror_str()); + return; + } + + int yes = 1; + srt_setsockopt(bind_sock, 0, SRTO_RCVSYN, &yes, sizeof(yes)); + + struct sockaddr_in sa; + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_port = htons(ezlive_config->listening_port); + if (inet_pton(AF_INET, ezlive_config->listening_addr, &sa.sin_addr.s_addr) <= 0) { + fprintf(stderr, "Invalid IP address\n"); + exit(-1); + } + + if (srt_bind(bind_sock, (struct sockaddr*)&sa, sizeof(sa)) == SRT_ERROR) { + fprintf(stderr, "Bind error: %s\n", srt_getlasterror_str()); + srt_close(bind_sock); + srt_cleanup(); + return; + } + srt_listen_callback(bind_sock, handshake_callback, NULL); + printf("SRT Server listening on port %d...\n", ezlive_config->listening_port); + if (srt_listen(bind_sock, 5) == SRT_ERROR) { + fprintf(stderr, "Listen error: %s\n", srt_getlasterror_str()); + return; + } + + while (1) { + struct sockaddr_storage client_sa; + int sa_len = sizeof(client_sa); + + printf("Waiting for client to connect...\n"); + + SRTSOCKET client_sock = srt_accept(bind_sock, (struct sockaddr*)&client_sa, &sa_len); + if (client_sock == SRT_INVALID_SOCK) { + fprintf(stderr, "Accept error: %s\n", srt_getlasterror_str()); + continue; + } + + printf("Client connected! Starting to receive data.\n"); + srtcb.on_start(ctx); + + char buffer[BUFFER_SIZE]; + + while (1) { + int n = srt_recvmsg(client_sock, buffer, sizeof(buffer)); + + if (n == SRT_ERROR) { + fprintf(stderr, "Connection lost or error: %s\n", srt_getlasterror_str()); + srtcb.on_stop(ctx); + break; + } + + if (n == 0) { + printf("Client closed connection.\n"); + srtcb.on_stop(ctx); + break; + } + srtcb.on_data(ctx, buffer, n); + } + printf("Closing client socket.\n"); + srt_close(client_sock); + } + + srt_close(bind_sock); + srt_cleanup(); + return; +} \ No newline at end of file diff --git a/srtserver.h b/srtserver.h new file mode 100644 index 0000000..b228884 --- /dev/null +++ b/srtserver.h @@ -0,0 +1,32 @@ +#ifndef RTMPSERVER_H_ +#define RTMPSERVER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +typedef struct { + void (*on_start)(void *ctx); + void (*on_stop)(void *ctx); + void (*on_video)(void* ctx, int64_t timestamp, char *buf, size_t size); + void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size); +} RtmpCallbacks; + +typedef struct { + void (*on_start)(void *ctx); + void (*on_stop)(void *ctx); + void (*on_data)(void* ctx, char *buf, size_t size); +} SrtCallbacks; + +void start_rtmpserver(RtmpCallbacks cbs, void *ctx); +void start_srt_server(SrtCallbacks cbs, void *ctx); + +#ifdef __cplusplus +} +#endif + + +#endif \ No newline at end of file diff --git a/transcode_talker.c b/transcode_talker.c index 3e2f05c..11fa53a 100644 --- a/transcode_talker.c +++ b/transcode_talker.c @@ -177,8 +177,8 @@ void* TranscodeTalker_main (void *vself) { while (wait_for_new_stream(self)) { AVFormatContext *in_fmt_ctx = avformat_alloc_context(); in_fmt_ctx->pb = create_avio_from_ringbuffer(self->stream, 4096); - const AVInputFormat *flv_fmt = av_find_input_format("flv"); - if (avformat_open_input(&in_fmt_ctx, NULL, flv_fmt, NULL) < 0) { + const AVInputFormat *input_fmt = av_find_input_format("mpegts"); + if (avformat_open_input(&in_fmt_ctx, NULL, input_fmt, NULL) < 0) { fprintf(stderr, "Could not open input file\n"); continue; } -- cgit v1.0