diff options
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | README.md | 8 | ||||
| -rw-r--r-- | amf.cpp | 429 | ||||
| -rw-r--r-- | amf.h | 167 | ||||
| -rw-r--r-- | main.c | 31 | ||||
| -rw-r--r-- | rtmp.h | 74 | ||||
| -rw-r--r-- | rtmpserver.cpp | 862 | ||||
| -rw-r--r-- | rtmputils.cpp | 67 | ||||
| -rw-r--r-- | rtmputils.h | 49 | ||||
| -rw-r--r-- | srtserver.c | 127 | ||||
| -rw-r--r-- | srtserver.h (renamed from rtmpserver.h) | 7 | ||||
| -rw-r--r-- | transcode_talker.c | 4 |
13 files changed, 167 insertions, 1662 deletions
@@ -8,3 +8,5 @@ config rootfs conf *.tar.gz +podman-start.sh +config.back @@ -2,7 +2,7 @@ CC := gcc CXX := g++ CFLAGS := -g -Wall CXXFLAGS := -g -Wall -std=c++14 -LDFLAGS := -g \ +LDFLAGS := -g -lsrt \ -lavformat -lavutil -lavcodec \ -laws-cpp-sdk-core -laws-cpp-sdk-s3 @@ -2,7 +2,7 @@ EZLive is a minimal self-hosted livestream solution built on top of S3-compatible object storage. -It runs a local RTMP server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream. +It runs a local SRT server, receive live video, turns it into HLS segments (.m3u8 + .ts) and serves them as static files through any S3-compatible object storage. No dedicated streaming server is required — everything runs serverlessly. Then you can easily setup a HTML5 HLS player to watch the stream. # Build @@ -24,7 +24,7 @@ Then create a config file `config`: ``` listening_addr=127.0.0.1 -listening_port=1935 +listening_port=61935 bucket=YOUR_BUCKET_NAME endpoint=https://your-s3.com s3_path=ezlive/ @@ -74,7 +74,7 @@ Start EZLive: ./ezlive ``` -Open OBS, streaming to `rtmp://127.0.0.1/live`, no streaming key needed. The streaming format must be H.264 + AAC. +Open OBS, streaming to `srt://127.0.0.1:61935`, with streaming key. The streaming format must be H.264 + AAC. Then use a HLS player to load `https://YOUR_BUCKET_NAME.your-s3.com/ezlive/stream.m3u8` to watch the stream. @@ -123,6 +123,4 @@ To start using, unzip the windows tarball, create a `config` file in the same di # Credits -The built-in RTMP server is modified from [pine](https://github.com/deboot/pine). - Thank [@uonr](https://github.com/uonr) for making nix flake. diff --git a/amf.cpp b/amf.cpp deleted file mode 100644 index 2794419..0000000 --- a/amf.cpp +++ /dev/null @@ -1,429 +0,0 @@ -#include "amf.h" -#include "rtmputils.h" - -#include <stdexcept> -#include <string.h> -#include <arpa/inet.h> - -namespace { - -uint8_t peek(const Decoder *dec) -{ - if (dec->pos >= dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - return uint8_t(dec->buf[dec->pos]); -} - -uint8_t get_byte(Decoder *dec) -{ - if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) { - debug("entering AMF3 mode\n"); - dec->pos++; - dec->version = 3; - } - - if (dec->pos >= dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - return uint8_t(dec->buf[dec->pos++]); -} - -} - -AMFValue::AMFValue(AMFType type) : - m_type(type) -{ - switch (m_type) { - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - m_value.object = new amf_object_t; - break; - case AMF_NUMBER: - case AMF_INTEGER: - case AMF_NULL: - case AMF_UNDEFINED: - break; - default: - assert(0); - } -} - -AMFValue::AMFValue(const std::string &s) : - m_type(AMF_STRING) -{ - m_value.string = new std::string(s); -} - -AMFValue::AMFValue(double n) : - m_type(AMF_NUMBER) -{ - m_value.number = n; -} - -AMFValue::AMFValue(int i) : - m_type(AMF_INTEGER) -{ - m_value.integer = i; -} - -AMFValue::AMFValue(bool b) : - m_type(AMF_BOOLEAN) -{ - m_value.boolean = b; -} - -AMFValue::AMFValue(const amf_object_t &object) : - m_type(AMF_OBJECT) -{ - m_value.object = new amf_object_t(object); -} - -AMFValue::AMFValue(const AMFValue &from) : - m_type(AMF_NULL) -{ - *this = from; -} - -AMFValue::~AMFValue() -{ - destroy(); -} - -void AMFValue::destroy() -{ - switch (m_type) { - case AMF_STRING: - delete m_value.string; - break; - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - delete m_value.object; - break; - case AMF_NULL: - case AMF_NUMBER: - case AMF_INTEGER: - case AMF_BOOLEAN: - case AMF_UNDEFINED: - break; - } -} - -void AMFValue::operator = (const AMFValue &from) -{ - destroy(); - m_type = from.m_type; - switch (m_type) { - case AMF_STRING: - m_value.string = new std::string(*from.m_value.string); - break; - case AMF_OBJECT: - case AMF_ECMA_ARRAY: - m_value.object = new amf_object_t(*from.m_value.object); - break; - case AMF_NUMBER: - m_value.number = from.m_value.number; - break; - case AMF_INTEGER: - m_value.integer = from.m_value.integer; - break; - case AMF_BOOLEAN: - m_value.boolean = from.m_value.boolean; - break; - default: - break; - } -} - -void amf_write(Encoder *enc, const std::string &s) -{ - enc->buf += char(AMF0_STRING); - uint16_t str_len = htons(s.size()); - enc->buf.append((char *) &str_len, 2); - enc->buf += s; -} - -void amf_write(Encoder *enc, int i) -{ - throw std::runtime_error("AMF0 does not have integers"); -} - -void amf_write(Encoder *enc, double n) -{ - enc->buf += char(AMF0_NUMBER); - uint64_t encoded = 0; -//#if defined(__i386__) || defined(__x86_64__) - /* Flash uses same floating point format as x86 */ - memcpy(&encoded, &n, 8); -//#endif - uint32_t val = htonl(encoded >> 32); - enc->buf.append((char *) &val, 4); - val = htonl(encoded); - enc->buf.append((char *) &val, 4); -} - -void amf_write(Encoder *enc, bool b) -{ - enc->buf += char(AMF0_BOOLEAN); - enc->buf += char(b); -} - -void amf_write_key(Encoder *enc, const std::string &s) -{ - uint16_t str_len = htons(s.size()); - enc->buf.append((char *) &str_len, 2); - enc->buf += s; -} - -void amf_write(Encoder *enc, const amf_object_t &object) -{ - enc->buf += char(AMF0_OBJECT); - for (amf_object_t::const_iterator i = object.begin(); - i != object.end(); ++i) { - amf_write_key(enc, i->first); - amf_write(enc, i->second); - } - amf_write_key(enc, ""); - enc->buf += char(AMF0_OBJECT_END); -} - -void amf_write_ecma(Encoder *enc, const amf_object_t &object) -{ - enc->buf += char(AMF0_ECMA_ARRAY); - uint32_t zero = 0; - enc->buf.append((char *) &zero, 4); - for (amf_object_t::const_iterator i = object.begin(); - i != object.end(); ++i) { - amf_write_key(enc, i->first); - amf_write(enc, i->second); - } - amf_write_key(enc, ""); - enc->buf += char(AMF0_OBJECT_END); -} - -void amf_write_null(Encoder *enc) -{ - enc->buf += char(AMF0_NULL); -} - -void amf_write(Encoder *enc, const AMFValue &value) -{ - switch (value.type()) { - case AMF_STRING: - amf_write(enc, value.as_string()); - break; - case AMF_NUMBER: - amf_write(enc, value.as_number()); - break; - case AMF_INTEGER: - amf_write(enc, value.as_integer()); - break; - case AMF_BOOLEAN: - amf_write(enc, value.as_boolean()); - break; - case AMF_OBJECT: - amf_write(enc, value.as_object()); - break; - case AMF_ECMA_ARRAY: - amf_write_ecma(enc, value.as_object()); - break; - case AMF_NULL: - amf_write_null(enc); - break; - default: - break; - } -} - -unsigned int load_amf3_integer(Decoder *dec) -{ - unsigned int value = 0; - for (int i = 0; i < 4; ++i) { - uint8_t b = get_byte(dec); - if (i == 3) { - /* use all bits from 4th byte */ - value = (value << 8) | b; - break; - } - value = (value << 7) | (b & 0x7f); - if ((b & 0x80) == 0) - break; - } - return value; -} - -std::string amf_load_string(Decoder *dec) -{ - size_t str_len = 0; - uint8_t type = get_byte(dec); - if (dec->version == 3) { - if (type != AMF3_STRING) { - throw std::runtime_error("Expected a string"); - } - str_len = load_amf3_integer(dec) / 2; - - } else { - if (type != AMF0_STRING) { - throw std::runtime_error("Expected a string"); - } - if (dec->pos + 2 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - str_len = load_be16(&dec->buf[dec->pos]); - dec->pos += 2; - } - if (dec->pos + str_len > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - std::string s(dec->buf, dec->pos, str_len); - dec->pos += str_len; - return s; -} - -double amf_load_number(Decoder *dec) -{ - if (get_byte(dec) != AMF0_NUMBER) { - throw std::runtime_error("Expected a string"); - } - if (dec->pos + 8 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) | - load_be32(&dec->buf[dec->pos + 4]); - double n = 0; -//#if defined(__i386__) || defined(__x86_64__) - /* Flash uses same floating point format as x86 */ - memcpy(&n, &val, 8); -//#endif - dec->pos += 8; - return n; -} - -int amf_load_integer(Decoder *dec) -{ - if (dec->version == 3) { - return load_amf3_integer(dec); - } else { - return amf_load_number(dec); - } -} - -bool amf_load_boolean(Decoder *dec) -{ - if (get_byte(dec) != AMF0_BOOLEAN) { - throw std::runtime_error("Expected a boolean"); - } - return get_byte(dec) != 0; -} - -std::string amf_load_key(Decoder *dec) -{ - if (dec->pos + 2 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - size_t str_len = load_be16(&dec->buf[dec->pos]); - dec->pos += 2; - if (dec->pos + str_len > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - std::string s(dec->buf, dec->pos, str_len); - dec->pos += str_len; - return s; -} - -amf_object_t amf_load_object(Decoder *dec) -{ - amf_object_t object; - if (get_byte(dec) != AMF0_OBJECT) { - throw std::runtime_error("Expected an object"); - } - while (1) { - std::string key = amf_load_key(dec); - if (key.empty()) - break; - AMFValue value = amf_load(dec); - object.insert(std::make_pair(key, value)); - } - if (get_byte(dec) != AMF0_OBJECT_END) { - throw std::runtime_error("expected object end"); - } - return object; -} - -amf_object_t amf_load_ecma(Decoder *dec) -{ - /* ECMA array is the same as object, with 4 extra zero bytes */ - amf_object_t object; - if (get_byte(dec) != AMF0_ECMA_ARRAY) { - throw std::runtime_error("Expected an ECMA array"); - } - if (dec->pos + 4 > dec->buf.size()) { - throw std::runtime_error("Not enough data"); - } - dec->pos += 4; - while (1) { - std::string key = amf_load_key(dec); - if (key.empty()) - break; - AMFValue value = amf_load(dec); - object.insert(std::make_pair(key, value)); - } - if (get_byte(dec) != AMF0_OBJECT_END) { - throw std::runtime_error("expected object end"); - } - return object; -} - -AMFValue amf_load(Decoder *dec) -{ - uint8_t type = peek(dec); - if (dec->version == 3) { - switch (type) { - case AMF3_STRING: - return AMFValue(amf_load_string(dec)); - case AMF3_NUMBER: - return AMFValue(amf_load_number(dec)); - case AMF3_INTEGER: - return AMFValue(amf_load_integer(dec)); - case AMF3_FALSE: - dec->pos++; - return AMFValue(false); - case AMF3_TRUE: - dec->pos++; - return AMFValue(true); - case AMF3_OBJECT: - return AMFValue(amf_load_object(dec)); - case AMF3_ARRAY: - return AMFValue(amf_load_ecma(dec)); - case AMF3_NULL: - dec->pos++; - return AMFValue(AMF_NULL); - case AMF3_UNDEFINED: - dec->pos++; - return AMFValue(AMF_UNDEFINED); - default: - throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type)); - } - } else { - switch (type) { - case AMF0_STRING: - return AMFValue(amf_load_string(dec)); - case AMF0_NUMBER: - return AMFValue(amf_load_number(dec)); - case AMF0_BOOLEAN: - return AMFValue(amf_load_boolean(dec)); - case AMF0_OBJECT: - return AMFValue(amf_load_object(dec)); - case AMF0_ECMA_ARRAY: - return AMFValue(amf_load_ecma(dec)); - case AMF0_NULL: - dec->pos++; - return AMFValue(AMF_NULL); - case AMF0_UNDEFINED: - dec->pos++; - return AMFValue(AMF_UNDEFINED); - default: - throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type)); - } - } -} @@ -1,167 +0,0 @@ -/** - * @file amf.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ - -#ifndef AMF_H_ -#define AMF_H_ - -#include <string> -#include <map> -#include <assert.h> - -enum AMFType { - AMF_NUMBER, - AMF_INTEGER, - AMF_BOOLEAN, - AMF_STRING, - AMF_OBJECT, - AMF_NULL, - AMF_UNDEFINED, - AMF_ECMA_ARRAY, -}; - -enum { - AMF0_NUMBER, - AMF0_BOOLEAN, - AMF0_STRING, - AMF0_OBJECT, - AMF0_MOVIECLIP, - AMF0_NULL, - AMF0_UNDEFINED, - AMF0_REFERENCE, - AMF0_ECMA_ARRAY, - AMF0_OBJECT_END, - AMF0_STRICT_ARRAY, - AMF0_DATE, - AMF0_LONG_STRING, - AMF0_UNSUPPORTED, - AMF0_RECORD_SET, - AMF0_XML_OBJECT, - AMF0_TYPED_OBJECT, - AMF0_SWITCH_AMF3, -}; - -enum { - AMF3_UNDEFINED, - AMF3_NULL, - AMF3_FALSE, - AMF3_TRUE, - AMF3_INTEGER, - AMF3_NUMBER, - AMF3_STRING, - AMF3_LEGACY_XML, - AMF3_DATE, - AMF3_ARRAY, - AMF3_OBJECT, - AMF3_XML, - AMF3_BYTE_ARRAY, -}; - -struct Decoder { - std::string buf; - size_t pos; - int version; -}; - -struct Encoder { - std::string buf; -}; - -class AMFValue; - -typedef std::map<std::string, AMFValue> amf_object_t; - -class AMFValue { -public: - AMFValue(AMFType type = AMF_NULL); - AMFValue(const std::string &s); - AMFValue(double n); - AMFValue(int i); - AMFValue(bool b); - AMFValue(const amf_object_t &object); - AMFValue(const AMFValue &from); - ~AMFValue(); - - AMFType type() const { return m_type; } - - std::string as_string() const - { - assert(m_type == AMF_STRING); - return *m_value.string; - } - double as_number() const - { - assert(m_type == AMF_NUMBER); - return m_value.number; - } - double as_integer() const - { - assert(m_type == AMF_INTEGER); - return m_value.integer; - } - bool as_boolean() const - { - assert(m_type == AMF_BOOLEAN); - return m_value.boolean; - } - amf_object_t as_object() const - { - assert(m_type == AMF_OBJECT); - return *m_value.object; - } - - AMFValue get(const std::string &s) const - { - assert(m_type == AMF_OBJECT); - amf_object_t::const_iterator i = m_value.object->find(s); - if (i == m_value.object->end()) - return AMFValue(); - return i->second; - } - - void set(const std::string &s, const AMFValue &val) - { - assert(m_type == AMF_OBJECT); - m_value.object->insert(std::make_pair(s, val)); - } - - void operator = (const AMFValue &from); - -private: - AMFType m_type; - union { - std::string *string; - double number; - int integer; - bool boolean; - amf_object_t *object; - } m_value; - - void destroy(); -}; - -void amf_write(Encoder *enc, const std::string &s); -void amf_write(Encoder *enc, double n); -void amf_write(Encoder *enc, bool b); -void amf_write_key(Encoder *enc, const std::string &s); -void amf_write(Encoder *enc, const amf_object_t &object); -void amf_write_ecma(Encoder *enc, const amf_object_t &object); -void amf_write_null(Encoder *enc); -void amf_write(Encoder *enc, const AMFValue &value); - -std::string amf_load_string(Decoder *dec); -double amf_load_number(Decoder *dec); -bool amf_load_boolean(Decoder *dec); -std::string amf_load_key(Decoder *dec); -amf_object_t amf_load_object(Decoder *dec); -amf_object_t amf_load_ecma(Decoder *dec); -AMFValue amf_load(Decoder *dec); - -#endif @@ -58,6 +58,25 @@ void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) { RingBuffer_write_word32be(rb, size + 11); } +void on_srt_start(void *ctx) { + MainCtx *main_ctx = ctx; + main_ctx->ringbuf = malloc(sizeof(RingBuffer)); + RingBuffer_init(main_ctx->ringbuf, 4096); + RingBuffer *rb = main_ctx->ringbuf; + TranscodeTalker_new_stream(&main_ctx->transcode_talker, rb); +} + +void on_srt_stop(void *ctx) { + MainCtx *main_ctx = ctx; + RingBuffer_end(main_ctx->ringbuf); +} + +void on_srt_data(void *ctx, char *buf, size_t size) { + MainCtx *main_ctx = ctx; + RingBuffer *rb = main_ctx->ringbuf; + RingBuffer_write(rb, (const uint8_t*)buf, size); +} + int main(int argc, char **argv) { ezlive_config = malloc(sizeof(EZLiveConfig)); EZLiveConfig_init(ezlive_config); @@ -76,11 +95,10 @@ int main(int argc, char **argv) { } srand((unsigned) time(NULL)); MainCtx main_ctx; - RtmpCallbacks rtmp_cbs = { - .on_audio = &on_rtmp_audio, - .on_video = &on_rtmp_video, - .on_start = &on_rtmp_start, - .on_stop = &on_rtmp_stop, + SrtCallbacks srt_cbs = { + .on_data = &on_srt_data, + .on_stop = &on_srt_stop, + .on_start = &on_srt_start, }; TranscodeTalker_init(&main_ctx.transcode_talker); @@ -92,6 +110,7 @@ int main(int argc, char **argv) { pthread_t s3worker_thread; pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL); - start_rtmpserver(rtmp_cbs, &main_ctx); + // start_rtmpserver(rtmp_cbs, &main_ctx); + start_srt_server(srt_cbs, &main_ctx); return 0; }
\ No newline at end of file @@ -1,74 +0,0 @@ -/** - * @file rtmp.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ - -#ifndef RTMP_H_ -#define RTMP_H_ - -#include <stdint.h> - -#define PORT 1935 - -#define DEFAULT_CHUNK_LEN 128 - -#define PACKED __attribute__((packed)) - -#define HANDSHAKE_PLAINTEXT 0x03 - -#define RANDOM_LEN (1536 - 8) - -#define MSG_SET_CHUNK 0x01 -#define MSG_BYTES_READ 0x03 -#define MSG_USER_CONTROL 0x04 -#define MSG_RESPONSE 0x05 -#define MSG_REQUEST 0x06 -#define MSG_AUDIO 0x08 -#define MSG_VIDEO 0x09 -#define MSG_INVOKE3 0x11 /* AMF3 */ -#define MSG_NOTIFY 0x12 -#define MSG_OBJECT 0x13 -#define MSG_INVOKE 0x14 /* AMF0 */ -#define MSG_FLASH_VIDEO 0x16 - -#define CONTROL_CLEAR_STREAM 0x00 -#define CONTROL_CLEAR_BUFFER 0x01 -#define CONTROL_STREAM_DRY 0x02 -#define CONTROL_BUFFER_TIME 0x03 -#define CONTROL_RESET_STREAM 0x04 -#define CONTROL_PING 0x06 -#define CONTROL_REQUEST_VERIFY 0x1a -#define CONTROL_RESPOND_VERIFY 0x1b -#define CONTROL_BUFFER_EMPTY 0x1f -#define CONTROL_BUFFER_READY 0x20 - -#define CONTROL_ID 0 -#define STREAM_ID 1337 - -#define CHAN_CONTROL 2 -#define CHAN_RESULT 3 -#define CHAN_STREAM 4 - -#define FLV_KEY_FRAME 0x01 -#define FLV_INTER_FRAME 0x02 - -struct Handshake { - uint8_t flags[8]; - uint8_t random[RANDOM_LEN]; -} PACKED; - -struct RTMP_Header { - uint8_t flags; - char timestamp[3]; - char msg_len[3]; - uint8_t msg_type; - char endpoint[4]; /* Note, this is little-endian while others are BE */ -} PACKED; - -#endif diff --git a/rtmpserver.cpp b/rtmpserver.cpp deleted file mode 100644 index cb0766e..0000000 --- a/rtmpserver.cpp +++ /dev/null @@ -1,862 +0,0 @@ -#include "amf.h" -#include "rtmputils.h" -#include "rtmp.h" -#include <vector> -#include <stdexcept> -#include <stdio.h> -#include <string.h> -#include <stdlib.h> -#include <errno.h> -#include <assert.h> -#include <stdarg.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <sys/poll.h> -#include <sys/time.h> -#include <unistd.h> -#include <fcntl.h> -#include <arpa/inet.h> -#include <time.h> - -#include "rtmpserver.h" - -extern "C" { -#include "ezlive_config.h" -} - -#define APP_NAME "live" - -RtmpCallbacks g_rtmp_server_cbs; -void *g_rtmp_server_ctx; - -struct RTMP_Message { - uint8_t type; - size_t len; - unsigned long timestamp; - uint32_t endpoint; - std::string buf; -}; - -struct Client { - int fd; - bool publisher; - bool playing; /* Wants to receive the stream? */ - bool ready; /* Wants to receive and seen a keyframe */ - RTMP_Message messages[64]; - std::string buf; - std::string send_queue; - std::string path; - size_t chunk_len; - uint32_t written_seq; - uint32_t read_seq; - int64_t livets; -}; - -namespace { - -amf_object_t metadata; -int listen_fd; -std::vector<pollfd> poll_table; -std::vector<Client *> clients; - -int set_nonblock(int fd, bool enabled) -{ - int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK; - if (enabled) { - flags |= O_NONBLOCK; - } - return fcntl(fd, F_SETFL, flags); -} - -size_t recv_all(int fd, void *buf, size_t len) -{ - size_t pos = 0; - while (pos < len) { - ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0); - if (bytes < 0) { - if (errno == EAGAIN || errno == EINTR) - continue; - throw std::runtime_error(strf("unable to recv: %s", strerror(errno))); - } - if (bytes == 0) - break; - pos += bytes; - } - return pos; -} - -size_t send_all(int fd, const void *buf, size_t len) -{ - size_t pos = 0; - while (pos < len) { - ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0); - if (written < 0) { - if (errno == EAGAIN || errno == EINTR) - continue; - throw std::runtime_error(strf("unable to send: %s", strerror(errno))); - } - if (written == 0) - break; - pos += written; - } - return pos; -} - -bool is_safe(uint8_t b) -{ - return b >= ' ' && b < 128; -} - -void hexdump(const void *buf, size_t len) -{ - const uint8_t *data = (const uint8_t *) buf; - for (size_t i = 0; i < len; i += 16) { - for (int j = 0; j < 16; ++j) { - if (i + j < len) - debug("%.2x ", data[i + j]); - else - debug(" "); - } - for (int j = 0; j < 16; ++j) { - if (i + j < len) { - putc(is_safe(data[i + j]) ? data[i + j] : '.', - stdout); - } else { - putc(' ', stdout); - } - } - putc('\n', stdout); - } -} - -void try_to_send(void *data) -{ - Client *client = (Client *)data; - size_t len = client->send_queue.size(); - if (len > 4096) - len = 4096; - - ssize_t written = send(client->fd, client->send_queue.data(), len, 0); - if (written < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - throw std::runtime_error(strf("unable to write to a client: %s", - strerror(errno))); - } - - client->send_queue.erase(0, written); -} - -void rtmp_send(Client *client, uint8_t type, uint32_t endpoint, - const std::string &buf, unsigned long timestamp = 0, - int channel_num = CHAN_CONTROL) -{ - if (endpoint == STREAM_ID) { - /* - * For some unknown reason, stream-related msgs must be sent - * on a specific channel. - */ - channel_num = CHAN_STREAM; - } - - RTMP_Header header; - header.flags = (channel_num & 0x3f) | (0 << 6); - header.msg_type = type; - set_be24(header.timestamp, timestamp); - set_be24(header.msg_len, buf.size()); - set_le32(header.endpoint, endpoint); - - client->send_queue.append((char *) &header, sizeof header); - client->written_seq += sizeof header; - - size_t pos = 0; - while (pos < buf.size()) { - if (pos) { - uint8_t flags = (channel_num & 0x3f) | (3 << 6); - client->send_queue += char(flags); - - client->written_seq += 1; - } - - size_t chunk = buf.size() - pos; - if (chunk > client->chunk_len) - chunk = client->chunk_len; - client->send_queue.append(buf, pos, chunk); - - client->written_seq += chunk; - pos += chunk; - } - - try_to_send(client); -} - -void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(), - const AMFValue &status = AMFValue()) -{ - if (txid <= 0.0) - return; - Encoder invoke; - amf_write(&invoke, std::string("_result")); - amf_write(&invoke, txid); - amf_write(&invoke, reply); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT); -} - -void handle_connect(Client *client, double txid, Decoder *dec) -{ - amf_object_t params = amf_load_object(dec); - std::string app = get(params, std::string("app")).as_string(); - std::string ver = "(unknown)"; - - AMFValue flashver = get(params, std::string("flashVer")); - if (flashver.type() == AMF_STRING) { - ver = flashver.as_string(); - } - - /* - if (app != APP_NAME) { - throw std::runtime_error("Unsupported application: " + app); - } - */ - - printf("connect: %s (version %s)\n", app.c_str(), ver.c_str()); - - amf_object_t version; - version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484"))); - version.insert(std::make_pair("capabilities", 255.0)); - version.insert(std::make_pair("mode", 1.0)); - - amf_object_t status; - status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success"))); - /* report support for AMF3 */ - // status.insert(std::make_pair("objectEncoding", 3.0)); - - send_reply(client, txid, version, status); - - uint32_t chunk_len = htonl(1024); - std::string set_chunk((char *) &chunk_len, 4); - rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk); -/* - client->chunk_len = 1024; -*/ -} - -int publishernum = 0; - -void handle_fcpublish(Client *client, double txid, Decoder *dec) -{ - if (publishernum > 0) { - throw std::runtime_error{"only one publisher."}; - } - publishernum = 1; - g_rtmp_server_cbs.on_start(g_rtmp_server_ctx); - client->publisher = true; - printf( "publisher connected."); - - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - debug("fcpublish %s\n", path.c_str()); - printf("fcpublish %s\n", path.c_str()); - if (strlen(ezlive_config->key) > 0) { - if (strcmp(ezlive_config->key, path.c_str()) != 0) { - publishernum = 0; - printf( "wrong live key.\n"); - throw std::runtime_error{"wrong live key."}; - } - } - - client->path = path; - - amf_object_t status; - status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); - status.insert(std::make_pair("description", path)); - - Encoder invoke; - amf_write(&invoke, std::string("onFCPublish")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf); - - send_reply(client, txid); -} - -void handle_createstream(Client *client, double txid, Decoder *dec) -{ - send_reply(client, txid, AMFValue(), double(STREAM_ID)); -} - -void handle_publish(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - debug("publish %s\n", path.c_str()); - - client->path = path; - - amf_object_t status; - status.insert(std::make_pair("level", std::string("status"))); - status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); - status.insert(std::make_pair("description", std::string("Stream is now published."))); - status.insert(std::make_pair("details", path)); - - Encoder invoke; - amf_write(&invoke, std::string("onStatus")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); - - send_reply(client, txid); -} - -void start_playback(Client *client) -{ - throw std::runtime_error{"playback not supported."}; -} - -void handle_play(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - std::string path = amf_load_string(dec); - - debug("play %s\n", path.c_str()); - client->path = path; - - start_playback(client); - - send_reply(client, txid); -} - -void handle_play2(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - amf_object_t params = amf_load_object(dec); - std::string path = get(params, std::string("streamName")).as_string(); - - debug("play %s\n", path.c_str()); - client->path = path; - start_playback(client); - - send_reply(client, txid); -} - -void handle_pause(Client *client, double txid, Decoder *dec) -{ - amf_load(dec); /* NULL */ - - bool paused = amf_load_boolean(dec); - - if (paused) { - debug("pausing\n"); - - amf_object_t status; - status.insert(std::make_pair("level", std::string("status"))); - status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify"))); - status.insert(std::make_pair("description", std::string("Pausing."))); - - Encoder invoke; - amf_write(&invoke, std::string("onStatus")); - amf_write(&invoke, 0.0); - amf_write_null(&invoke); - amf_write(&invoke, status); - rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); - client->playing = false; - } else { - start_playback(client); - } - - send_reply(client, txid); -} - -void handle_setdataframe(Client *client, Decoder *dec) -{ - if(client->publisher == false) { - printf("not a publisher"); - throw std::runtime_error("not a publisher"); - } - - std::string type = amf_load_string(dec); - if (type != "onMetaData") { - throw std::runtime_error("can only set metadata"); - } - - metadata = amf_load_ecma(dec); - - Encoder notify; - amf_write(¬ify, std::string("onMetaData")); - amf_write_ecma(¬ify, metadata); - - FOR_EACH(std::vector<Client *>, i, clients) { - Client *receiver = *i; - if (receiver != NULL && receiver->playing && receiver->path == client->path) { - rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf); - } - } -} - -void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec) -{ - std::string method = amf_load_string(dec); - double txid = amf_load_number(dec); - - debug( "invoked %s txid %f\n", method.c_str(), txid); - - if (msg->endpoint == CONTROL_ID) { - if (method == "connect") { - handle_connect(client, txid, dec); - } else if (method == "FCPublish") { - handle_fcpublish(client, txid, dec); - } else if (method == "createStream") { - handle_createstream(client, txid, dec); - } - - } else if (msg->endpoint == STREAM_ID) { - if (method == "publish") { - handle_publish(client, txid, dec); - } else if (method == "play") { - handle_play(client, txid, dec); - } else if (method == "play2") { - handle_play2(client, txid, dec); - } else if (method == "pause") { - handle_pause(client, txid, dec); - } - } -} - -void handle_message(Client *client, RTMP_Message *msg) -{ - - debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len, - msg->timestamp); - - size_t pos = 0; - - switch (msg->type) { - case MSG_BYTES_READ: - if (pos + 4 > msg->buf.size()) { - throw std::runtime_error("Not enough data"); - } - client->read_seq = load_be32(&msg->buf[pos]); - debug("%d in queue\n", - int(client->written_seq - client->read_seq)); - break; - - case MSG_SET_CHUNK: - if (pos + 4 > msg->buf.size()) { - throw std::runtime_error("Not enough data"); - } - client->chunk_len = load_be32(&msg->buf[pos]); - debug("chunk size set to %zu\n", client->chunk_len); - break; - - case MSG_INVOKE: { - debug("handling message invoke 0 \n"); - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 0; - handle_invoke(client, msg, &dec); - } - break; - - case MSG_INVOKE3: { - debug("handling message invoke 3 \n"); - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 1; - handle_invoke(client, msg, &dec); - } - break; - - case MSG_NOTIFY: { - Decoder dec; - dec.version = 0; - dec.buf = msg->buf; - dec.pos = 0; - std::string type = amf_load_string(&dec); - debug("notify %s\n", type.c_str()); - if (msg->endpoint == STREAM_ID) { - if (type == "@setDataFrame") { - handle_setdataframe(client, &dec); - } - } - } - break; - - case MSG_AUDIO: - if(client->publisher == false) { - throw std::runtime_error("not a publisher"); - } - g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); - // FOR_EACH(std::vector<Client *>, i, clients) { - // Client *receiver = *i; - // if (receiver != NULL && receiver->ready) { - // rtmp_send(receiver, MSG_AUDIO, STREAM_ID, - // msg->buf, msg->timestamp); - // } - // } - break; - case MSG_VIDEO: { - if(client->publisher == false){ - throw std::runtime_error("not a publisher"); - } - // uint8_t flags = msg->buf[0]; - g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); - // FOR_EACH(std::vector<Client *>, i, clients) { - // Client *receiver = *i; - // if (receiver != NULL && receiver->playing && - // client->path == receiver->path) { - // if (flags >> 4 == FLV_KEY_FRAME && - // !receiver->ready) { - // std::string control; - // uint16_t type = htons(CONTROL_CLEAR_STREAM); - // control.append((char *) &type, 2); - // uint32_t stream = htonl(STREAM_ID); - // control.append((char *) &stream, 4); - // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control); - // receiver->ready = true; - // } - // if (receiver->ready) { - // static int flag = 0; - // if(flag < 10) { - // flag ++; - // } - // rtmp_send(receiver, MSG_VIDEO, - // STREAM_ID, msg->buf, - // msg->timestamp); - // } - // } - // } - } - break; - - case MSG_FLASH_VIDEO: - //printf( "streaming FLV not supported"); - throw std::runtime_error("streaming FLV not supported"); - break; - - default: - debug("unhandled message: %02x\n", msg->type); - hexdump(msg->buf.data(), msg->buf.size()); - break; - } -} - -/* TODO: Make this asynchronous */ -void do_handshake(Client *client) -{ - Handshake serversig; - Handshake clientsig; - - uint8_t c; - if (recv_all(client->fd, &c, 1) < 1) - return; - if (c != HANDSHAKE_PLAINTEXT) { - //printf( "only plaintext handshake supported"); - throw std::runtime_error("only plaintext handshake supported"); - } - - if (send_all(client->fd, &c, 1) < 1) - return; - - memset(&serversig, 0, sizeof serversig); - serversig.flags[0] = 0x03; - for (int i = 0; i < RANDOM_LEN; ++i) { - serversig.random[i] = rand(); - } - - if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig) - return; - - /* Echo client's signature back */ - if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - - if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) - return; - if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) { - //printf( "invalid handshake"); - throw std::runtime_error("invalid handshake"); - } - - client->read_seq = 1 + sizeof serversig * 2; - client->written_seq = 1 + sizeof serversig * 2; -} - -void recv_from_client(void *data) -{ - Client *client = (Client *)data; - - std::string chunk(4096, 0); - ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0); - if (got == 0) { - //printf( "end of life from a client"); - throw std::runtime_error("EOF from a client"); - } else if (got < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - //printf( "unable to read from a client: %s",strerror(errno)); - throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno))); - } - client->buf.append(chunk, 0, got); - - //printf( "got data and length is %d",client->buf.size()); - - while (!client->buf.empty()) { - uint8_t flags = client->buf[0]; - - static const size_t HEADER_LENGTH[] = {12, 8, 4, 1}; - size_t header_len = HEADER_LENGTH[flags >> 6]; - - if (client->buf.size() < header_len) { - /* need more data */ - break; - } - - RTMP_Header header; - memcpy(&header, client->buf.data(), header_len); - - RTMP_Message *msg = &client->messages[flags & 0x3f]; - - if (header_len >= 8) { - msg->len = load_be24(header.msg_len); - if (msg->len < msg->buf.size()) { - throw std::runtime_error("invalid msg length"); - } - msg->type = header.msg_type; - } - if (header_len >= 12) { - msg->endpoint = load_le32(header.endpoint); - } - - if (msg->len == 0) { - throw std::runtime_error("message without a header"); - } - size_t chunk = msg->len - msg->buf.size(); - if (chunk > client->chunk_len) - chunk = client->chunk_len; - - if (client->buf.size() < header_len + chunk) { - /* need more data */ - break; - } - - if (header_len >= 4) { - unsigned long ts = load_be24(header.timestamp); - if (ts == 0xffffff) { - throw std::runtime_error("ext timestamp not supported"); - } - if (header_len < 12) { - ts += msg->timestamp; - } - msg->timestamp = ts; - } - - msg->buf.append(client->buf, header_len, chunk); - client->buf.erase(0, header_len + chunk); - - if (msg->buf.size() == msg->len) { - handle_message(client, msg); - msg->buf.clear(); - } - } -} - -Client *new_client() -{ - sockaddr_in sin; - socklen_t addrlen = sizeof sin; - int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen); - if (fd < 0) { - printf("Unable to accept a client: %s\n", strerror(errno)); - return NULL; - } - - Client *client = new Client; - client->publisher = false; - client->playing = false; - client->ready = false; - client->fd = fd; - client->written_seq = 0; - client->read_seq = 0; - client->chunk_len = DEFAULT_CHUNK_LEN; - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - for (int i = 0; i < 64; ++i) { - client->messages[i].timestamp = 0; - client->messages[i].len = 0; - } - - try { - do_handshake(client); - } catch (const std::runtime_error &e) { - printf("handshake failed: %s\n", e.what()); - close(fd); - delete client; - return NULL; - } - - set_nonblock(fd, true); - - pollfd entry; - entry.events = POLLIN; - entry.revents = 0; - entry.fd = fd; - poll_table.push_back(entry); - clients.push_back(client); - - return client; -} - -void close_client(Client *client, size_t i) -{ - clients.erase(clients.begin() + i); - poll_table.erase(poll_table.begin() + i); - close(client->fd); - - if (client->publisher == true) { - printf("publisher disconnected.\n"); - client->publisher = false; - publishernum = 0; - g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx); - FOR_EACH(std::vector<Client *>, i, clients) { - Client *client = *i; - if (client != NULL) { - client->ready = false; - } - } - } - delete client; -} - -void do_poll(void) -{ - for (size_t i = 0; i < poll_table.size(); ++i) { - Client *client = clients[i]; - if (client != NULL) { - if (!client->send_queue.empty()) { - //debug("waiting for pollout\n"); - poll_table[i].events = POLLIN | POLLOUT; - } else { - poll_table[i].events = POLLIN; - } - } - } - - if (poll(&poll_table[0], poll_table.size(), -1) < 0) { - if (errno == EAGAIN || errno == EINTR) - return; - throw std::runtime_error(strf("poll() failed: %s", - strerror(errno))); - } - - for (size_t i = 0; i < poll_table.size(); ++i) { - Client *client = clients[i]; - if (poll_table[i].revents & POLLOUT) { - try { - try_to_send(client); - } catch (const std::runtime_error &e) { - printf("client error: %s\n", e.what()); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - } - if (poll_table[i].revents & POLLIN) { - if (client == NULL) { - new_client(); - } else { - try { - recv_from_client(client); - } catch (const std::runtime_error &e) { - printf("client error: %s\n", e.what()); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - client->livets = ts.tv_sec; - } - } - if ((poll_table[i].revents & POLLHUP) - || (poll_table[i].revents & POLLERR)) { - fprintf(stderr, "client error.\n"); - close_client(client, i); - --i; - continue; - } - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - if (client != NULL && ts.tv_sec - client->livets > 60) { - fprintf(stderr, "client timeout.\n"); - close_client(client, i); - --i; - } - } -} - -} - -int g_rtmp_server_quit = 0; - -void start_rtmpserver(RtmpCallbacks cbs, void *ctx) { - g_rtmp_server_cbs = cbs; - g_rtmp_server_ctx = ctx; - try { - listen_fd = socket(AF_INET, SOCK_STREAM, 0); - if (listen_fd < 0) - return; - int opt = 1; - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { - perror("setsockopt SO_REUSEADDR"); - exit(EXIT_FAILURE); - } - sockaddr_in sin; - sin.sin_family = AF_INET; - sin.sin_port = htons(ezlive_config->listening_port); - struct in_addr addr; - if (inet_pton(AF_INET, ezlive_config->listening_addr, &addr) <= 0) { - fprintf(stderr, "Invalid IP address\n"); - exit(-1); - } - sin.sin_addr.s_addr = addr.s_addr; - if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) { - throw std::runtime_error(strf("Unable to listen: %s",strerror(errno))); - return; - } - - listen(listen_fd, 10); - - pollfd entry; - entry.events = POLLIN; - entry.revents = 0; - entry.fd = listen_fd; - poll_table.push_back(entry); - clients.push_back(NULL); - - for (;;) { - if (g_rtmp_server_quit) { - return; - } - do_poll(); - } - return; - } catch (const std::runtime_error &e) { - fprintf(stderr, "ERROR: %s\n", e.what()); - return; - } -} diff --git a/rtmputils.cpp b/rtmputils.cpp deleted file mode 100644 index 59088f1..0000000 --- a/rtmputils.cpp +++ /dev/null @@ -1,67 +0,0 @@ -#include "rtmputils.h" -#include <string> -#include <string.h> -#include <stdio.h> -#include <stdlib.h> -#include <stdarg.h> -#include <arpa/inet.h> - -/* - * Used to do unaligned loads on archs that don't support them. GCC can mostly - * optimize these away. - */ -uint32_t load_be32(const void *p) -{ - uint32_t val; - memcpy(&val, p, sizeof val); - return ntohl(val); -} - -uint16_t load_be16(const void *p) -{ - uint16_t val; - memcpy(&val, p, sizeof val); - return ntohs(val); -} - -uint32_t load_le32(const void *p) -{ - const uint8_t *data = (const uint8_t *) p; - return data[0] | ((uint32_t) data[1] << 8) | - ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24); -} - -uint32_t load_be24(const void *p) -{ - const uint8_t *data = (const uint8_t *) p; - return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16); -} - -void set_be24(void *p, uint32_t val) -{ - uint8_t *data = (uint8_t *) p; - data[0] = val >> 16; - data[1] = val >> 8; - data[2] = val; -} - -void set_le32(void *p, uint32_t val) -{ - uint8_t *data = (uint8_t *) p; - data[0] = val; - data[1] = val >> 8; - data[2] = val >> 16; - data[3] = val >> 24; -} - -const std::string strf(const char *fmt, ...) -{ - va_list vl; - va_start(vl, fmt); - char *buf = NULL; - vasprintf(&buf, fmt, vl); - va_end(vl); - std::string s(buf); - free(buf); - return s; -} diff --git a/rtmputils.h b/rtmputils.h deleted file mode 100644 index f873232..0000000 --- a/rtmputils.h +++ /dev/null @@ -1,49 +0,0 @@ -/** - * @file utils.h - * @author Dean Zou (zoudingyuan@junjietech.com) - * @brief - * @version 1.0 - * @date 2019-06-27 - * - * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD - * - */ -#ifndef RTMPUTILS_H_ -#define RTMPUTILS_H_ - -#include <map> -#include <string> -#include <stdio.h> -#include <stdint.h> - -#define FOR_EACH(type, i, where) \ - for (typename type::iterator i = (where).begin(); i != (where).end(); ++i) - -#define FOR_EACH_CONST(type, i, where) \ - for (typename type::const_iterator i = (where).begin(); \ - i != (where).end(); ++i) - -// #define debug(fmt...) fprintf(stderr, fmt) - -#define debug(fmt...) - -template<class Key, class Value> -Value get(const std::map<Key, Value> &map, const Key &k, - const Value &def = Value()) -{ - typename std::map<Key, Value>::const_iterator i = map.find(k); - if (i == map.end()) - return def; - return i->second; -} - -uint32_t load_be32(const void *p); -uint16_t load_be16(const void *p); -uint32_t load_be24(const void *p); -uint32_t load_le32(const void *p); -void set_be24(void *p, uint32_t val); -void set_le32(void *p, uint32_t val); - -const std::string strf(const char *fmt, ...); - -#endif diff --git a/srtserver.c b/srtserver.c new file mode 100644 index 0000000..97ce272 --- /dev/null +++ b/srtserver.c @@ -0,0 +1,127 @@ +#include "srtserver.h" + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <assert.h> +#include <stdarg.h> + +#include <sys/socket.h> +#include <netinet/in.h> +#include <sys/poll.h> +#include <sys/time.h> +#include <unistd.h> +#include <fcntl.h> +#include <arpa/inet.h> +#include <time.h> + +#include <srt/srt.h> + +#include "ezlive_config.h" + +#define BUFFER_SIZE 1500 + +int handshake_callback(void* opaq, SRTSOCKET ns, int hs_version, const struct sockaddr* peeraddr, const char* streamid) { + char addr_str[INET_ADDRSTRLEN]; + struct sockaddr_in* sin = (struct sockaddr_in*)peeraddr; + inet_ntop(AF_INET, &(sin->sin_addr), addr_str, INET_ADDRSTRLEN); + + printf("[Callback] Incoming connection from: %s:%d\n", addr_str, ntohs(sin->sin_port)); + printf("[Callback] Client Stream ID: %s\n", streamid); + + if (streamid == NULL || strlen(streamid) == 0) { + printf("[Callback] Rejected: No Stream ID provided.\n"); + return -1; + } + + if (strlen(ezlive_config->key) == 0) { + printf("[Callback] No key. Skip auth.\n"); + return 0; + } + if (strcmp(streamid, ezlive_config->key) != 0) { + printf("[Callback] Rejected: Invalid Key. Expected '%s', got '%s'\n", ezlive_config->key, streamid); + return -1; + } + + printf("[Callback] Key validated. Connection Accepted.\n"); + return 0; +} + +void start_srt_server(SrtCallbacks srtcb, void *ctx) { + if (srt_startup() != 0) { + fprintf(stderr, "SRT startup failed.\n"); + return; + } + + SRTSOCKET bind_sock = srt_create_socket(); + if (bind_sock == SRT_INVALID_SOCK) { + fprintf(stderr, "Error creating socket: %s\n", srt_getlasterror_str()); + return; + } + + int yes = 1; + srt_setsockopt(bind_sock, 0, SRTO_RCVSYN, &yes, sizeof(yes)); + + struct sockaddr_in sa; + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_port = htons(ezlive_config->listening_port); + if (inet_pton(AF_INET, ezlive_config->listening_addr, &sa.sin_addr.s_addr) <= 0) { + fprintf(stderr, "Invalid IP address\n"); + exit(-1); + } + + if (srt_bind(bind_sock, (struct sockaddr*)&sa, sizeof(sa)) == SRT_ERROR) { + fprintf(stderr, "Bind error: %s\n", srt_getlasterror_str()); + srt_close(bind_sock); + srt_cleanup(); + return; + } + srt_listen_callback(bind_sock, handshake_callback, NULL); + printf("SRT Server listening on port %d...\n", ezlive_config->listening_port); + if (srt_listen(bind_sock, 5) == SRT_ERROR) { + fprintf(stderr, "Listen error: %s\n", srt_getlasterror_str()); + return; + } + + while (1) { + struct sockaddr_storage client_sa; + int sa_len = sizeof(client_sa); + + printf("Waiting for client to connect...\n"); + + SRTSOCKET client_sock = srt_accept(bind_sock, (struct sockaddr*)&client_sa, &sa_len); + if (client_sock == SRT_INVALID_SOCK) { + fprintf(stderr, "Accept error: %s\n", srt_getlasterror_str()); + continue; + } + + printf("Client connected! Starting to receive data.\n"); + srtcb.on_start(ctx); + + char buffer[BUFFER_SIZE]; + + while (1) { + int n = srt_recvmsg(client_sock, buffer, sizeof(buffer)); + + if (n == SRT_ERROR) { + fprintf(stderr, "Connection lost or error: %s\n", srt_getlasterror_str()); + srtcb.on_stop(ctx); + break; + } + + if (n == 0) { + printf("Client closed connection.\n"); + srtcb.on_stop(ctx); + break; + } + srtcb.on_data(ctx, buffer, n); + } + printf("Closing client socket.\n"); + srt_close(client_sock); + } + + srt_close(bind_sock); + srt_cleanup(); + return; +}
\ No newline at end of file diff --git a/rtmpserver.h b/srtserver.h index c9383b4..b228884 100644 --- a/rtmpserver.h +++ b/srtserver.h @@ -15,7 +15,14 @@ typedef struct { void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size); } RtmpCallbacks; +typedef struct { + void (*on_start)(void *ctx); + void (*on_stop)(void *ctx); + void (*on_data)(void* ctx, char *buf, size_t size); +} SrtCallbacks; + void start_rtmpserver(RtmpCallbacks cbs, void *ctx); +void start_srt_server(SrtCallbacks cbs, void *ctx); #ifdef __cplusplus } diff --git a/transcode_talker.c b/transcode_talker.c index 3e2f05c..11fa53a 100644 --- a/transcode_talker.c +++ b/transcode_talker.c @@ -177,8 +177,8 @@ void* TranscodeTalker_main (void *vself) { while (wait_for_new_stream(self)) { AVFormatContext *in_fmt_ctx = avformat_alloc_context(); in_fmt_ctx->pb = create_avio_from_ringbuffer(self->stream, 4096); - const AVInputFormat *flv_fmt = av_find_input_format("flv"); - if (avformat_open_input(&in_fmt_ctx, NULL, flv_fmt, NULL) < 0) { + const AVInputFormat *input_fmt = av_find_input_format("mpegts"); + if (avformat_open_input(&in_fmt_ctx, NULL, input_fmt, NULL) < 0) { fprintf(stderr, "Could not open input file\n"); continue; } |
