From 8d089010b37ee193b2b8c809a4f317fa2b11a758 Mon Sep 17 00:00:00 2001 From: Mistivia Date: Wed, 10 Sep 2025 17:41:52 +0800 Subject: rtmp server --- .gitignore | 3 + LICENSE | 0 Makefile | 28 ++ amf.cpp | 429 ++++++++++++++++++++++++++++++ amf.h | 167 ++++++++++++ fileutils.c | 72 +++++ fileutils.h | 18 ++ main.c | 56 ++++ rtmp.h | 74 ++++++ rtmpserver.cpp | 820 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ rtmpserver.h | 25 ++ rtmputils.cpp | 67 +++++ rtmputils.h | 49 ++++ 13 files changed, 1808 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 amf.cpp create mode 100644 amf.h create mode 100644 fileutils.c create mode 100644 fileutils.h create mode 100644 main.c create mode 100644 rtmp.h create mode 100644 rtmpserver.cpp create mode 100644 rtmpserver.h create mode 100644 rtmputils.cpp create mode 100644 rtmputils.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0de0710 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +ezlive +*.flv \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e69de29 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6f6d633 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +CC := gcc +CXX := g++ +CFLAGS := -g -Wall +CXXFLAGS := -g -Wall -std=c++14 + +C_SOURCES := $(shell find . -maxdepth 1 -name '*.c') +CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp') + +C_OBJS := $(C_SOURCES:.c=.o) +CPP_OBJS := $(CPP_SOURCES:.cpp=.o) + +TARGET := ezlive + +all: $(TARGET) + +$(TARGET): $(C_OBJS) $(CPP_OBJS) + $(CXX) $(C_OBJS) $(CPP_OBJS) -o $@ + +%.o: %.c + $(CC) $(CFLAGS) -c $< -o $@ + +%.o: %.cpp + $(CXX) $(CXXFLAGS) -c $< -o $@ + +clean: + rm -f $(C_OBJS) $(CPP_OBJS) $(TARGET) + +.PHONY: all clean \ No newline at end of file diff --git a/amf.cpp b/amf.cpp new file mode 100644 index 0000000..2794419 --- /dev/null +++ b/amf.cpp @@ -0,0 +1,429 @@ +#include "amf.h" +#include "rtmputils.h" + +#include +#include +#include + +namespace { + +uint8_t peek(const Decoder *dec) +{ + if (dec->pos >= dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + return uint8_t(dec->buf[dec->pos]); +} + +uint8_t get_byte(Decoder *dec) +{ + if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) { + debug("entering AMF3 mode\n"); + dec->pos++; + dec->version = 3; + } + + if (dec->pos >= dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + return uint8_t(dec->buf[dec->pos++]); +} + +} + +AMFValue::AMFValue(AMFType type) : + m_type(type) +{ + switch (m_type) { + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + m_value.object = new amf_object_t; + break; + case AMF_NUMBER: + case AMF_INTEGER: + case AMF_NULL: + case AMF_UNDEFINED: + break; + default: + assert(0); + } +} + +AMFValue::AMFValue(const std::string &s) : + m_type(AMF_STRING) +{ + m_value.string = new std::string(s); +} + +AMFValue::AMFValue(double n) : + m_type(AMF_NUMBER) +{ + m_value.number = n; +} + +AMFValue::AMFValue(int i) : + m_type(AMF_INTEGER) +{ + m_value.integer = i; +} + +AMFValue::AMFValue(bool b) : + m_type(AMF_BOOLEAN) +{ + m_value.boolean = b; +} + +AMFValue::AMFValue(const amf_object_t &object) : + m_type(AMF_OBJECT) +{ + m_value.object = new amf_object_t(object); +} + +AMFValue::AMFValue(const AMFValue &from) : + m_type(AMF_NULL) +{ + *this = from; +} + +AMFValue::~AMFValue() +{ + destroy(); +} + +void AMFValue::destroy() +{ + switch (m_type) { + case AMF_STRING: + delete m_value.string; + break; + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + delete m_value.object; + break; + case AMF_NULL: + case AMF_NUMBER: + case AMF_INTEGER: + case AMF_BOOLEAN: + case AMF_UNDEFINED: + break; + } +} + +void AMFValue::operator = (const AMFValue &from) +{ + destroy(); + m_type = from.m_type; + switch (m_type) { + case AMF_STRING: + m_value.string = new std::string(*from.m_value.string); + break; + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + m_value.object = new amf_object_t(*from.m_value.object); + break; + case AMF_NUMBER: + m_value.number = from.m_value.number; + break; + case AMF_INTEGER: + m_value.integer = from.m_value.integer; + break; + case AMF_BOOLEAN: + m_value.boolean = from.m_value.boolean; + break; + default: + break; + } +} + +void amf_write(Encoder *enc, const std::string &s) +{ + enc->buf += char(AMF0_STRING); + uint16_t str_len = htons(s.size()); + enc->buf.append((char *) &str_len, 2); + enc->buf += s; +} + +void amf_write(Encoder *enc, int i) +{ + throw std::runtime_error("AMF0 does not have integers"); +} + +void amf_write(Encoder *enc, double n) +{ + enc->buf += char(AMF0_NUMBER); + uint64_t encoded = 0; +//#if defined(__i386__) || defined(__x86_64__) + /* Flash uses same floating point format as x86 */ + memcpy(&encoded, &n, 8); +//#endif + uint32_t val = htonl(encoded >> 32); + enc->buf.append((char *) &val, 4); + val = htonl(encoded); + enc->buf.append((char *) &val, 4); +} + +void amf_write(Encoder *enc, bool b) +{ + enc->buf += char(AMF0_BOOLEAN); + enc->buf += char(b); +} + +void amf_write_key(Encoder *enc, const std::string &s) +{ + uint16_t str_len = htons(s.size()); + enc->buf.append((char *) &str_len, 2); + enc->buf += s; +} + +void amf_write(Encoder *enc, const amf_object_t &object) +{ + enc->buf += char(AMF0_OBJECT); + for (amf_object_t::const_iterator i = object.begin(); + i != object.end(); ++i) { + amf_write_key(enc, i->first); + amf_write(enc, i->second); + } + amf_write_key(enc, ""); + enc->buf += char(AMF0_OBJECT_END); +} + +void amf_write_ecma(Encoder *enc, const amf_object_t &object) +{ + enc->buf += char(AMF0_ECMA_ARRAY); + uint32_t zero = 0; + enc->buf.append((char *) &zero, 4); + for (amf_object_t::const_iterator i = object.begin(); + i != object.end(); ++i) { + amf_write_key(enc, i->first); + amf_write(enc, i->second); + } + amf_write_key(enc, ""); + enc->buf += char(AMF0_OBJECT_END); +} + +void amf_write_null(Encoder *enc) +{ + enc->buf += char(AMF0_NULL); +} + +void amf_write(Encoder *enc, const AMFValue &value) +{ + switch (value.type()) { + case AMF_STRING: + amf_write(enc, value.as_string()); + break; + case AMF_NUMBER: + amf_write(enc, value.as_number()); + break; + case AMF_INTEGER: + amf_write(enc, value.as_integer()); + break; + case AMF_BOOLEAN: + amf_write(enc, value.as_boolean()); + break; + case AMF_OBJECT: + amf_write(enc, value.as_object()); + break; + case AMF_ECMA_ARRAY: + amf_write_ecma(enc, value.as_object()); + break; + case AMF_NULL: + amf_write_null(enc); + break; + default: + break; + } +} + +unsigned int load_amf3_integer(Decoder *dec) +{ + unsigned int value = 0; + for (int i = 0; i < 4; ++i) { + uint8_t b = get_byte(dec); + if (i == 3) { + /* use all bits from 4th byte */ + value = (value << 8) | b; + break; + } + value = (value << 7) | (b & 0x7f); + if ((b & 0x80) == 0) + break; + } + return value; +} + +std::string amf_load_string(Decoder *dec) +{ + size_t str_len = 0; + uint8_t type = get_byte(dec); + if (dec->version == 3) { + if (type != AMF3_STRING) { + throw std::runtime_error("Expected a string"); + } + str_len = load_amf3_integer(dec) / 2; + + } else { + if (type != AMF0_STRING) { + throw std::runtime_error("Expected a string"); + } + if (dec->pos + 2 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + str_len = load_be16(&dec->buf[dec->pos]); + dec->pos += 2; + } + if (dec->pos + str_len > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + std::string s(dec->buf, dec->pos, str_len); + dec->pos += str_len; + return s; +} + +double amf_load_number(Decoder *dec) +{ + if (get_byte(dec) != AMF0_NUMBER) { + throw std::runtime_error("Expected a string"); + } + if (dec->pos + 8 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) | + load_be32(&dec->buf[dec->pos + 4]); + double n = 0; +//#if defined(__i386__) || defined(__x86_64__) + /* Flash uses same floating point format as x86 */ + memcpy(&n, &val, 8); +//#endif + dec->pos += 8; + return n; +} + +int amf_load_integer(Decoder *dec) +{ + if (dec->version == 3) { + return load_amf3_integer(dec); + } else { + return amf_load_number(dec); + } +} + +bool amf_load_boolean(Decoder *dec) +{ + if (get_byte(dec) != AMF0_BOOLEAN) { + throw std::runtime_error("Expected a boolean"); + } + return get_byte(dec) != 0; +} + +std::string amf_load_key(Decoder *dec) +{ + if (dec->pos + 2 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + size_t str_len = load_be16(&dec->buf[dec->pos]); + dec->pos += 2; + if (dec->pos + str_len > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + std::string s(dec->buf, dec->pos, str_len); + dec->pos += str_len; + return s; +} + +amf_object_t amf_load_object(Decoder *dec) +{ + amf_object_t object; + if (get_byte(dec) != AMF0_OBJECT) { + throw std::runtime_error("Expected an object"); + } + while (1) { + std::string key = amf_load_key(dec); + if (key.empty()) + break; + AMFValue value = amf_load(dec); + object.insert(std::make_pair(key, value)); + } + if (get_byte(dec) != AMF0_OBJECT_END) { + throw std::runtime_error("expected object end"); + } + return object; +} + +amf_object_t amf_load_ecma(Decoder *dec) +{ + /* ECMA array is the same as object, with 4 extra zero bytes */ + amf_object_t object; + if (get_byte(dec) != AMF0_ECMA_ARRAY) { + throw std::runtime_error("Expected an ECMA array"); + } + if (dec->pos + 4 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + dec->pos += 4; + while (1) { + std::string key = amf_load_key(dec); + if (key.empty()) + break; + AMFValue value = amf_load(dec); + object.insert(std::make_pair(key, value)); + } + if (get_byte(dec) != AMF0_OBJECT_END) { + throw std::runtime_error("expected object end"); + } + return object; +} + +AMFValue amf_load(Decoder *dec) +{ + uint8_t type = peek(dec); + if (dec->version == 3) { + switch (type) { + case AMF3_STRING: + return AMFValue(amf_load_string(dec)); + case AMF3_NUMBER: + return AMFValue(amf_load_number(dec)); + case AMF3_INTEGER: + return AMFValue(amf_load_integer(dec)); + case AMF3_FALSE: + dec->pos++; + return AMFValue(false); + case AMF3_TRUE: + dec->pos++; + return AMFValue(true); + case AMF3_OBJECT: + return AMFValue(amf_load_object(dec)); + case AMF3_ARRAY: + return AMFValue(amf_load_ecma(dec)); + case AMF3_NULL: + dec->pos++; + return AMFValue(AMF_NULL); + case AMF3_UNDEFINED: + dec->pos++; + return AMFValue(AMF_UNDEFINED); + default: + throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type)); + } + } else { + switch (type) { + case AMF0_STRING: + return AMFValue(amf_load_string(dec)); + case AMF0_NUMBER: + return AMFValue(amf_load_number(dec)); + case AMF0_BOOLEAN: + return AMFValue(amf_load_boolean(dec)); + case AMF0_OBJECT: + return AMFValue(amf_load_object(dec)); + case AMF0_ECMA_ARRAY: + return AMFValue(amf_load_ecma(dec)); + case AMF0_NULL: + dec->pos++; + return AMFValue(AMF_NULL); + case AMF0_UNDEFINED: + dec->pos++; + return AMFValue(AMF_UNDEFINED); + default: + throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type)); + } + } +} diff --git a/amf.h b/amf.h new file mode 100644 index 0000000..6d5738e --- /dev/null +++ b/amf.h @@ -0,0 +1,167 @@ +/** + * @file amf.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ + +#ifndef AMF_H_ +#define AMF_H_ + +#include +#include +#include + +enum AMFType { + AMF_NUMBER, + AMF_INTEGER, + AMF_BOOLEAN, + AMF_STRING, + AMF_OBJECT, + AMF_NULL, + AMF_UNDEFINED, + AMF_ECMA_ARRAY, +}; + +enum { + AMF0_NUMBER, + AMF0_BOOLEAN, + AMF0_STRING, + AMF0_OBJECT, + AMF0_MOVIECLIP, + AMF0_NULL, + AMF0_UNDEFINED, + AMF0_REFERENCE, + AMF0_ECMA_ARRAY, + AMF0_OBJECT_END, + AMF0_STRICT_ARRAY, + AMF0_DATE, + AMF0_LONG_STRING, + AMF0_UNSUPPORTED, + AMF0_RECORD_SET, + AMF0_XML_OBJECT, + AMF0_TYPED_OBJECT, + AMF0_SWITCH_AMF3, +}; + +enum { + AMF3_UNDEFINED, + AMF3_NULL, + AMF3_FALSE, + AMF3_TRUE, + AMF3_INTEGER, + AMF3_NUMBER, + AMF3_STRING, + AMF3_LEGACY_XML, + AMF3_DATE, + AMF3_ARRAY, + AMF3_OBJECT, + AMF3_XML, + AMF3_BYTE_ARRAY, +}; + +struct Decoder { + std::string buf; + size_t pos; + int version; +}; + +struct Encoder { + std::string buf; +}; + +class AMFValue; + +typedef std::map amf_object_t; + +class AMFValue { +public: + AMFValue(AMFType type = AMF_NULL); + AMFValue(const std::string &s); + AMFValue(double n); + AMFValue(int i); + AMFValue(bool b); + AMFValue(const amf_object_t &object); + AMFValue(const AMFValue &from); + ~AMFValue(); + + AMFType type() const { return m_type; } + + std::string as_string() const + { + assert(m_type == AMF_STRING); + return *m_value.string; + } + double as_number() const + { + assert(m_type == AMF_NUMBER); + return m_value.number; + } + double as_integer() const + { + assert(m_type == AMF_INTEGER); + return m_value.integer; + } + bool as_boolean() const + { + assert(m_type == AMF_BOOLEAN); + return m_value.boolean; + } + amf_object_t as_object() const + { + assert(m_type == AMF_OBJECT); + return *m_value.object; + } + + AMFValue get(const std::string &s) const + { + assert(m_type == AMF_OBJECT); + amf_object_t::const_iterator i = m_value.object->find(s); + if (i == m_value.object->end()) + return AMFValue(); + return i->second; + } + + void set(const std::string &s, const AMFValue &val) + { + assert(m_type == AMF_OBJECT); + m_value.object->insert(std::make_pair(s, val)); + } + + void operator = (const AMFValue &from); + +private: + AMFType m_type; + union { + std::string *string; + double number; + int integer; + bool boolean; + amf_object_t *object; + } m_value; + + void destroy(); +}; + +void amf_write(Encoder *enc, const std::string &s); +void amf_write(Encoder *enc, double n); +void amf_write(Encoder *enc, bool b); +void amf_write_key(Encoder *enc, const std::string &s); +void amf_write(Encoder *enc, const amf_object_t &object); +void amf_write_ecma(Encoder *enc, const amf_object_t &object); +void amf_write_null(Encoder *enc); +void amf_write(Encoder *enc, const AMFValue &value); + +std::string amf_load_string(Decoder *dec); +double amf_load_number(Decoder *dec); +bool amf_load_boolean(Decoder *dec); +std::string amf_load_key(Decoder *dec); +amf_object_t amf_load_object(Decoder *dec); +amf_object_t amf_load_ecma(Decoder *dec); +AMFValue amf_load(Decoder *dec); + +#endif diff --git a/fileutils.c b/fileutils.c new file mode 100644 index 0000000..a9d9efc --- /dev/null +++ b/fileutils.c @@ -0,0 +1,72 @@ +#include "fileutils.h" + +bool fwrite_word16le(FILE* fp, uint16_t x) { + uint8_t buf[2]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + int r = fwrite(buf, 1, 2, fp); + if (r != 2) return false; + return true; +} + +bool fwrite_word32le(FILE* fp, uint32_t x) { + bool ret = false; + uint16_t buf[2]; + + buf[0] = x & 0xffff; + buf[1] = (x >> 16) & 0xffff; + ret = fwrite_word16le(fp, buf[0]); + if (!ret) return ret; + ret = fwrite_word16le(fp, buf[1]); + if (!ret) return ret; + return true; +} + +bool fwrite_word16be(FILE* fp, uint16_t x) { + uint8_t buf[2]; + buf[1] = x & 0xff; + buf[0] = (x >> 8) & 0xff; + int r = fwrite(buf, 1, 2, fp); + if (r != 2) return false; + return true; +} + +bool fwrite_word32be(FILE* fp, uint32_t x) { + bool ret = false; + uint16_t buf[2]; + + buf[1] = x & 0xffff; + buf[0] = (x >> 16) & 0xffff; + ret = fwrite_word16be(fp, buf[0]); + if (!ret) return ret; + ret = fwrite_word16be(fp, buf[1]); + if (!ret) return ret; + return true; +} + +bool fwrite_word24le(FILE* fp, uint32_t x) { + uint8_t buf[3]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[2] = (x >> 16) & 0xff; + int r = fwrite(buf, 1, 3, fp); + if (r != 3) return false; + return true; +} + +bool fwrite_word24be(FILE* fp, uint32_t x) { + uint8_t buf[3]; + buf[2] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[0] = (x >> 16) & 0xff; + int r = fwrite(buf, 1, 3, fp); + if (r != 3) return false; + return true; +} + +bool fwrite_char(FILE* fp, uint8_t x) { + uint8_t buf[1]; + buf[0] = x; + int ret = fwrite(buf, 1, 1, fp); + return ret == 1; +} \ No newline at end of file diff --git a/fileutils.h b/fileutils.h new file mode 100644 index 0000000..b9888a3 --- /dev/null +++ b/fileutils.h @@ -0,0 +1,18 @@ +#ifndef FILEUTILS_H +#define FILEUTILS_H + +#include +#include +#include + +bool fwrite_word16le(FILE* fp, uint16_t x); +bool fwrite_word24le(FILE* fp, uint32_t x); +bool fwrite_word32le(FILE* fp, uint32_t x); + +bool fwrite_word16be(FILE* fp, uint16_t x); +bool fwrite_word24be(FILE* fp, uint32_t x); +bool fwrite_word32be(FILE* fp, uint32_t x); + +bool fwrite_char(FILE* fp, uint8_t x); + +#endif \ No newline at end of file diff --git a/main.c b/main.c new file mode 100644 index 0000000..a1052fd --- /dev/null +++ b/main.c @@ -0,0 +1,56 @@ +#include + +#include "rtmpserver.h" +#include "fileutils.h" + +void on_rtmp_start(void *ctx) { + *(FILE**)ctx = fopen("test.flv", "wb"); + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 'F'); + fwrite_char(fp, 'L'); + fwrite_char(fp, 'V'); + fwrite_char(fp, 1); + fwrite_char(fp, 5); + fwrite_word32be(fp, 9); + fwrite_word32be(fp, 0); +} + +void on_rtmp_stop(void *ctx) { + FILE *fp = *(FILE**)ctx; + fclose(fp); + exit(0); +} + +void on_rtmp_video(void *ctx, int64_t timestamp, char *buf, size_t size) { + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 9); + fwrite_word24be(fp, size); + fwrite_word24be(fp, timestamp); + fwrite_char(fp, timestamp >> 24); + fwrite_word24be(fp, 0); + fwrite(buf, 1, size, fp); + fwrite_word32be(fp, size + 11); +} + +void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) { + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 8); + fwrite_word24be(fp, size); + fwrite_word24be(fp, timestamp); + fwrite_char(fp, timestamp >> 24); + fwrite_word24be(fp, 0); + fwrite(buf, 1, size, fp); + fwrite_word32be(fp, size + 11); +} + +int main() { + RtmpCallbacks rtmp_cbs = { + .on_audio = &on_rtmp_audio, + .on_video = &on_rtmp_video, + .on_start = &on_rtmp_start, + .on_stop = &on_rtmp_stop, + }; + FILE* fp = NULL; + start_rtmpserver(rtmp_cbs, &fp); + return 0; +} \ No newline at end of file diff --git a/rtmp.h b/rtmp.h new file mode 100644 index 0000000..bdc97cc --- /dev/null +++ b/rtmp.h @@ -0,0 +1,74 @@ +/** + * @file rtmp.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ + +#ifndef RTMP_H_ +#define RTMP_H_ + +#include + +#define PORT 1935 + +#define DEFAULT_CHUNK_LEN 128 + +#define PACKED __attribute__((packed)) + +#define HANDSHAKE_PLAINTEXT 0x03 + +#define RANDOM_LEN (1536 - 8) + +#define MSG_SET_CHUNK 0x01 +#define MSG_BYTES_READ 0x03 +#define MSG_USER_CONTROL 0x04 +#define MSG_RESPONSE 0x05 +#define MSG_REQUEST 0x06 +#define MSG_AUDIO 0x08 +#define MSG_VIDEO 0x09 +#define MSG_INVOKE3 0x11 /* AMF3 */ +#define MSG_NOTIFY 0x12 +#define MSG_OBJECT 0x13 +#define MSG_INVOKE 0x14 /* AMF0 */ +#define MSG_FLASH_VIDEO 0x16 + +#define CONTROL_CLEAR_STREAM 0x00 +#define CONTROL_CLEAR_BUFFER 0x01 +#define CONTROL_STREAM_DRY 0x02 +#define CONTROL_BUFFER_TIME 0x03 +#define CONTROL_RESET_STREAM 0x04 +#define CONTROL_PING 0x06 +#define CONTROL_REQUEST_VERIFY 0x1a +#define CONTROL_RESPOND_VERIFY 0x1b +#define CONTROL_BUFFER_EMPTY 0x1f +#define CONTROL_BUFFER_READY 0x20 + +#define CONTROL_ID 0 +#define STREAM_ID 1337 + +#define CHAN_CONTROL 2 +#define CHAN_RESULT 3 +#define CHAN_STREAM 4 + +#define FLV_KEY_FRAME 0x01 +#define FLV_INTER_FRAME 0x02 + +struct Handshake { + uint8_t flags[8]; + uint8_t random[RANDOM_LEN]; +} PACKED; + +struct RTMP_Header { + uint8_t flags; + char timestamp[3]; + char msg_len[3]; + uint8_t msg_type; + char endpoint[4]; /* Note, this is little-endian while others are BE */ +} PACKED; + +#endif diff --git a/rtmpserver.cpp b/rtmpserver.cpp new file mode 100644 index 0000000..3d0d4f0 --- /dev/null +++ b/rtmpserver.cpp @@ -0,0 +1,820 @@ +#include "amf.h" +#include "rtmputils.h" +#include "rtmp.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rtmpserver.h" + +#define APP_NAME "live" + +RtmpCallbacks g_rtmp_server_cbs; +void *g_rtmp_server_ctx; + +struct RTMP_Message { + uint8_t type; + size_t len; + unsigned long timestamp; + uint32_t endpoint; + std::string buf; +}; + +struct Client { + int fd; + bool publisher; + bool playing; /* Wants to receive the stream? */ + bool ready; /* Wants to receive and seen a keyframe */ + RTMP_Message messages[64]; + std::string buf; + std::string send_queue; + std::string path; + size_t chunk_len; + uint32_t written_seq; + uint32_t read_seq; +}; + +namespace { + +amf_object_t metadata; +int listen_fd; +std::vector poll_table; +std::vector clients; + +int set_nonblock(int fd, bool enabled) +{ + int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK; + if (enabled) { + flags |= O_NONBLOCK; + } + return fcntl(fd, F_SETFL, flags); +} + +size_t recv_all(int fd, void *buf, size_t len) +{ + size_t pos = 0; + while (pos < len) { + ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0); + if (bytes < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + throw std::runtime_error(strf("unable to recv: %s", strerror(errno))); + } + if (bytes == 0) + break; + pos += bytes; + } + return pos; +} + +size_t send_all(int fd, const void *buf, size_t len) +{ + size_t pos = 0; + while (pos < len) { + ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + throw std::runtime_error(strf("unable to send: %s", strerror(errno))); + } + if (written == 0) + break; + pos += written; + } + return pos; +} + +bool is_safe(uint8_t b) +{ + return b >= ' ' && b < 128; +} + +void hexdump(const void *buf, size_t len) +{ + const uint8_t *data = (const uint8_t *) buf; + for (size_t i = 0; i < len; i += 16) { + for (int j = 0; j < 16; ++j) { + if (i + j < len) + debug("%.2x ", data[i + j]); + else + debug(" "); + } + for (int j = 0; j < 16; ++j) { + if (i + j < len) { + putc(is_safe(data[i + j]) ? data[i + j] : '.', + stdout); + } else { + putc(' ', stdout); + } + } + putc('\n', stdout); + } +} + +void try_to_send(void *data) +{ + Client *client = (Client *)data; + size_t len = client->send_queue.size(); + if (len > 4096) + len = 4096; + + ssize_t written = send(client->fd, client->send_queue.data(), len, 0); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + throw std::runtime_error(strf("unable to write to a client: %s", + strerror(errno))); + } + + client->send_queue.erase(0, written); +} + +void rtmp_send(Client *client, uint8_t type, uint32_t endpoint, + const std::string &buf, unsigned long timestamp = 0, + int channel_num = CHAN_CONTROL) +{ + if (endpoint == STREAM_ID) { + /* + * For some unknown reason, stream-related msgs must be sent + * on a specific channel. + */ + channel_num = CHAN_STREAM; + } + + RTMP_Header header; + header.flags = (channel_num & 0x3f) | (0 << 6); + header.msg_type = type; + set_be24(header.timestamp, timestamp); + set_be24(header.msg_len, buf.size()); + set_le32(header.endpoint, endpoint); + + client->send_queue.append((char *) &header, sizeof header); + client->written_seq += sizeof header; + + size_t pos = 0; + while (pos < buf.size()) { + if (pos) { + uint8_t flags = (channel_num & 0x3f) | (3 << 6); + client->send_queue += char(flags); + + client->written_seq += 1; + } + + size_t chunk = buf.size() - pos; + if (chunk > client->chunk_len) + chunk = client->chunk_len; + client->send_queue.append(buf, pos, chunk); + + client->written_seq += chunk; + pos += chunk; + } + + try_to_send(client); +} + +void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(), + const AMFValue &status = AMFValue()) +{ + if (txid <= 0.0) + return; + Encoder invoke; + amf_write(&invoke, std::string("_result")); + amf_write(&invoke, txid); + amf_write(&invoke, reply); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT); +} + +void handle_connect(Client *client, double txid, Decoder *dec) +{ + amf_object_t params = amf_load_object(dec); + std::string app = get(params, std::string("app")).as_string(); + std::string ver = "(unknown)"; + + AMFValue flashver = get(params, std::string("flashVer")); + if (flashver.type() == AMF_STRING) { + ver = flashver.as_string(); + } + + /* + if (app != APP_NAME) { + throw std::runtime_error("Unsupported application: " + app); + } + */ + + printf("connect: %s (version %s)\n", app.c_str(), ver.c_str()); + + amf_object_t version; + version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484"))); + version.insert(std::make_pair("capabilities", 255.0)); + version.insert(std::make_pair("mode", 1.0)); + + amf_object_t status; + status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success"))); + /* report support for AMF3 */ + // status.insert(std::make_pair("objectEncoding", 3.0)); + + send_reply(client, txid, version, status); + +/* + uint32_t chunk_len = htonl(1024); + std::string set_chunk((char *) &chunk_len, 4); + rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk, 0, + MEDIA_CHANNEL); + + client->chunk_len = 1024; +*/ +} + +int publishernum = 0; + +void handle_fcpublish(Client *client, double txid, Decoder *dec) +{ + if (publishernum > 0) { + throw std::runtime_error{"only one publisher."}; + } + publishernum = 1; + g_rtmp_server_cbs.on_start(g_rtmp_server_ctx); + client->publisher = true; + printf( "publisher connected."); + + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + debug("fcpublish %s\n", path.c_str()); + + client->path = path; + + amf_object_t status; + status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); + status.insert(std::make_pair("description", path)); + + Encoder invoke; + amf_write(&invoke, std::string("onFCPublish")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf); + + send_reply(client, txid); +} + +void handle_createstream(Client *client, double txid, Decoder *dec) +{ + send_reply(client, txid, AMFValue(), double(STREAM_ID)); +} + +void handle_publish(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + debug("publish %s\n", path.c_str()); + + client->path = path; + + amf_object_t status; + status.insert(std::make_pair("level", std::string("status"))); + status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); + status.insert(std::make_pair("description", std::string("Stream is now published."))); + status.insert(std::make_pair("details", path)); + + Encoder invoke; + amf_write(&invoke, std::string("onStatus")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); + + send_reply(client, txid); +} + +void start_playback(Client *client) +{ + throw std::runtime_error{"playback not supported."}; +} + +void handle_play(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + + debug("play %s\n", path.c_str()); + client->path = path; + + start_playback(client); + + send_reply(client, txid); +} + +void handle_play2(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + amf_object_t params = amf_load_object(dec); + std::string path = get(params, std::string("streamName")).as_string(); + + debug("play %s\n", path.c_str()); + client->path = path; + start_playback(client); + + send_reply(client, txid); +} + +void handle_pause(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + bool paused = amf_load_boolean(dec); + + if (paused) { + debug("pausing\n"); + + amf_object_t status; + status.insert(std::make_pair("level", std::string("status"))); + status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify"))); + status.insert(std::make_pair("description", std::string("Pausing."))); + + Encoder invoke; + amf_write(&invoke, std::string("onStatus")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); + client->playing = false; + } else { + start_playback(client); + } + + send_reply(client, txid); +} + +void handle_setdataframe(Client *client, Decoder *dec) +{ + if(client->publisher == false) { + printf("not a publisher"); + throw std::runtime_error("not a publisher"); + } + + printf( "client paht is %s",client->path.c_str()); + + std::string type = amf_load_string(dec); + if (type != "onMetaData") { + throw std::runtime_error("can only set metadata"); + } + + metadata = amf_load_ecma(dec); + + Encoder notify; + amf_write(¬ify, std::string("onMetaData")); + amf_write_ecma(¬ify, metadata); + + FOR_EACH(std::vector, i, clients) { + Client *receiver = *i; + if (receiver != NULL && receiver->playing && receiver->path == client->path) { + rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf); + } + } +} + +void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec) +{ + std::string method = amf_load_string(dec); + double txid = amf_load_number(dec); + + debug( "invoked %s txid %f\n", method.c_str(), txid); + + if (msg->endpoint == CONTROL_ID) { + if (method == "connect") { + handle_connect(client, txid, dec); + } else if (method == "FCPublish") { + handle_fcpublish(client, txid, dec); + } else if (method == "createStream") { + handle_createstream(client, txid, dec); + } + + } else if (msg->endpoint == STREAM_ID) { + if (method == "publish") { + handle_publish(client, txid, dec); + } else if (method == "play") { + handle_play(client, txid, dec); + } else if (method == "play2") { + handle_play2(client, txid, dec); + } else if (method == "pause") { + handle_pause(client, txid, dec); + } + } +} + +void handle_message(Client *client, RTMP_Message *msg) +{ + + debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len, + msg->timestamp); + + size_t pos = 0; + + switch (msg->type) { + case MSG_BYTES_READ: + if (pos + 4 > msg->buf.size()) { + throw std::runtime_error("Not enough data"); + } + client->read_seq = load_be32(&msg->buf[pos]); + debug("%d in queue\n", + int(client->written_seq - client->read_seq)); + break; + + case MSG_SET_CHUNK: + if (pos + 4 > msg->buf.size()) { + throw std::runtime_error("Not enough data"); + } + client->chunk_len = load_be32(&msg->buf[pos]); + debug("chunk size set to %zu\n", client->chunk_len); + break; + + case MSG_INVOKE: { + debug("handling message invoke 0 \n"); + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 0; + handle_invoke(client, msg, &dec); + } + break; + + case MSG_INVOKE3: { + debug("handling message invoke 3 \n"); + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 1; + handle_invoke(client, msg, &dec); + } + break; + + case MSG_NOTIFY: { + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 0; + std::string type = amf_load_string(&dec); + debug("notify %s\n", type.c_str()); + if (msg->endpoint == STREAM_ID) { + if (type == "@setDataFrame") { + handle_setdataframe(client, &dec); + } + } + } + break; + + case MSG_AUDIO: + if(client->publisher == false) { + throw std::runtime_error("not a publisher"); + } + g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); + // FOR_EACH(std::vector, i, clients) { + // Client *receiver = *i; + // if (receiver != NULL && receiver->ready) { + // rtmp_send(receiver, MSG_AUDIO, STREAM_ID, + // msg->buf, msg->timestamp); + // } + // } + break; + case MSG_VIDEO: { + if(client->publisher == false){ + throw std::runtime_error("not a publisher"); + } + // uint8_t flags = msg->buf[0]; + g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); + // FOR_EACH(std::vector, i, clients) { + // Client *receiver = *i; + // if (receiver != NULL && receiver->playing && + // client->path == receiver->path) { + // if (flags >> 4 == FLV_KEY_FRAME && + // !receiver->ready) { + // std::string control; + // uint16_t type = htons(CONTROL_CLEAR_STREAM); + // control.append((char *) &type, 2); + // uint32_t stream = htonl(STREAM_ID); + // control.append((char *) &stream, 4); + // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control); + // receiver->ready = true; + // } + // if (receiver->ready) { + // static int flag = 0; + // if(flag < 10) { + // flag ++; + // } + // rtmp_send(receiver, MSG_VIDEO, + // STREAM_ID, msg->buf, + // msg->timestamp); + // } + // } + // } + } + break; + + case MSG_FLASH_VIDEO: + //printf( "streaming FLV not supported"); + throw std::runtime_error("streaming FLV not supported"); + break; + + default: + debug("unhandled message: %02x\n", msg->type); + hexdump(msg->buf.data(), msg->buf.size()); + break; + } +} + +/* TODO: Make this asynchronous */ +void do_handshake(Client *client) +{ + Handshake serversig; + Handshake clientsig; + + uint8_t c; + if (recv_all(client->fd, &c, 1) < 1) + return; + if (c != HANDSHAKE_PLAINTEXT) { + //printf( "only plaintext handshake supported"); + throw std::runtime_error("only plaintext handshake supported"); + } + + if (send_all(client->fd, &c, 1) < 1) + return; + + memset(&serversig, 0, sizeof serversig); + serversig.flags[0] = 0x03; + for (int i = 0; i < RANDOM_LEN; ++i) { + serversig.random[i] = rand(); + } + + if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig) + return; + + /* Echo client's signature back */ + if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + + if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) { + //printf( "invalid handshake"); + throw std::runtime_error("invalid handshake"); + } + + client->read_seq = 1 + sizeof serversig * 2; + client->written_seq = 1 + sizeof serversig * 2; +} + +void recv_from_client(void *data) +{ + Client *client = (Client *)data; + + std::string chunk(4096, 0); + ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0); + if (got == 0) { + //printf( "end of life from a client"); + throw std::runtime_error("EOF from a client"); + } else if (got < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + //printf( "unable to read from a client: %s",strerror(errno)); + throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno))); + } + client->buf.append(chunk, 0, got); + + //printf( "got data and length is %d",client->buf.size()); + + while (!client->buf.empty()) { + uint8_t flags = client->buf[0]; + + static const size_t HEADER_LENGTH[] = {12, 8, 4, 1}; + size_t header_len = HEADER_LENGTH[flags >> 6]; + + if (client->buf.size() < header_len) { + /* need more data */ + break; + } + + RTMP_Header header; + memcpy(&header, client->buf.data(), header_len); + + RTMP_Message *msg = &client->messages[flags & 0x3f]; + + if (header_len >= 8) { + msg->len = load_be24(header.msg_len); + if (msg->len < msg->buf.size()) { + throw std::runtime_error("invalid msg length"); + } + msg->type = header.msg_type; + } + if (header_len >= 12) { + msg->endpoint = load_le32(header.endpoint); + } + + if (msg->len == 0) { + throw std::runtime_error("message without a header"); + } + size_t chunk = msg->len - msg->buf.size(); + if (chunk > client->chunk_len) + chunk = client->chunk_len; + + if (client->buf.size() < header_len + chunk) { + /* need more data */ + break; + } + + if (header_len >= 4) { + unsigned long ts = load_be24(header.timestamp); + if (ts == 0xffffff) { + throw std::runtime_error("ext timestamp not supported"); + } + if (header_len < 12) { + ts += msg->timestamp; + } + msg->timestamp = ts; + } + + msg->buf.append(client->buf, header_len, chunk); + client->buf.erase(0, header_len + chunk); + + if (msg->buf.size() == msg->len) { + handle_message(client, msg); + msg->buf.clear(); + } + } +} + +Client *new_client() +{ + sockaddr_in sin; + socklen_t addrlen = sizeof sin; + int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen); + if (fd < 0) { + printf("Unable to accept a client: %s\n", strerror(errno)); + return NULL; + } + + Client *client = new Client; + client->publisher = false; + client->playing = false; + client->ready = false; + client->fd = fd; + client->written_seq = 0; + client->read_seq = 0; + client->chunk_len = DEFAULT_CHUNK_LEN; + for (int i = 0; i < 64; ++i) { + client->messages[i].timestamp = 0; + client->messages[i].len = 0; + } + + try { + do_handshake(client); + } catch (const std::runtime_error &e) { + printf("handshake failed: %s\n", e.what()); + close(fd); + delete client; + return NULL; + } + + set_nonblock(fd, true); + + pollfd entry; + entry.events = POLLIN; + entry.revents = 0; + entry.fd = fd; + poll_table.push_back(entry); + clients.push_back(client); + + return client; +} + +void close_client(Client *client, size_t i) +{ + clients.erase(clients.begin() + i); + poll_table.erase(poll_table.begin() + i); + close(client->fd); + + if (client->publisher == true) { + printf("publisher disconnected.\n"); + client->publisher = false; + publishernum = 0; + g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx); + FOR_EACH(std::vector, i, clients) { + Client *client = *i; + if (client != NULL) { + client->ready = false; + } + } + } + delete client; +} + +void do_poll(void) +{ + for (size_t i = 0; i < poll_table.size(); ++i) { + Client *client = clients[i]; + if (client != NULL) { + if (!client->send_queue.empty()) { + //debug("waiting for pollout\n"); + poll_table[i].events = POLLIN | POLLOUT; + } else { + poll_table[i].events = POLLIN; + } + } + } + + if (poll(&poll_table[0], poll_table.size(), -1) < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + throw std::runtime_error(strf("poll() failed: %s", + strerror(errno))); + } + + for (size_t i = 0; i < poll_table.size(); ++i) { + Client *client = clients[i]; + if (poll_table[i].revents & POLLOUT) { + try { + try_to_send(client); + } catch (const std::runtime_error &e) { + printf("client error: %s\n", e.what()); + close_client(client, i); + --i; + continue; + } + } + if (poll_table[i].revents & POLLIN) { + if (client == NULL) { + new_client(); + } else try { + recv_from_client(client); + } catch (const std::runtime_error &e) { + printf("client error: %s\n", e.what()); + close_client(client, i); + --i; + } + } + } +} + +} + +int g_rtmp_server_quit = 0; + +void start_rtmpserver(RtmpCallbacks cbs, void *ctx) { + g_rtmp_server_cbs = cbs; + g_rtmp_server_ctx = ctx; + try { + listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + return; + int opt = 1; + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + perror("setsockopt SO_REUSEADDR"); + exit(EXIT_FAILURE); + } + sockaddr_in sin; + sin.sin_family = AF_INET; + sin.sin_port = htons(PORT); + sin.sin_addr.s_addr = INADDR_ANY; + if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) { + throw std::runtime_error(strf("Unable to listen: %s",strerror(errno))); + return; + } + + listen(listen_fd, 10); + + pollfd entry; + entry.events = POLLIN; + entry.revents = 0; + entry.fd = listen_fd; + poll_table.push_back(entry); + clients.push_back(NULL); + + for (;;) { + if (g_rtmp_server_quit) { + return; + } + do_poll(); + } + return; + } catch (const std::runtime_error &e) { + fprintf(stderr, "ERROR: %s\n", e.what()); + return; + } +} diff --git a/rtmpserver.h b/rtmpserver.h new file mode 100644 index 0000000..cae56a5 --- /dev/null +++ b/rtmpserver.h @@ -0,0 +1,25 @@ +#ifndef RTMPSERVER_H_ +#define RTMPSERVER_H_ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + void (*on_start)(void *ctx); + void (*on_stop)(void *ctx); + void (*on_video)(void* ctx, int64_t timestamp, char *buf, size_t size); + void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size); +} RtmpCallbacks; + +void start_rtmpserver(RtmpCallbacks cbs, void *ctx); + +#ifdef __cplusplus +} +#endif + + +#endif \ No newline at end of file diff --git a/rtmputils.cpp b/rtmputils.cpp new file mode 100644 index 0000000..59088f1 --- /dev/null +++ b/rtmputils.cpp @@ -0,0 +1,67 @@ +#include "rtmputils.h" +#include +#include +#include +#include +#include +#include + +/* + * Used to do unaligned loads on archs that don't support them. GCC can mostly + * optimize these away. + */ +uint32_t load_be32(const void *p) +{ + uint32_t val; + memcpy(&val, p, sizeof val); + return ntohl(val); +} + +uint16_t load_be16(const void *p) +{ + uint16_t val; + memcpy(&val, p, sizeof val); + return ntohs(val); +} + +uint32_t load_le32(const void *p) +{ + const uint8_t *data = (const uint8_t *) p; + return data[0] | ((uint32_t) data[1] << 8) | + ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24); +} + +uint32_t load_be24(const void *p) +{ + const uint8_t *data = (const uint8_t *) p; + return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16); +} + +void set_be24(void *p, uint32_t val) +{ + uint8_t *data = (uint8_t *) p; + data[0] = val >> 16; + data[1] = val >> 8; + data[2] = val; +} + +void set_le32(void *p, uint32_t val) +{ + uint8_t *data = (uint8_t *) p; + data[0] = val; + data[1] = val >> 8; + data[2] = val >> 16; + data[3] = val >> 24; +} + +const std::string strf(const char *fmt, ...) +{ + va_list vl; + va_start(vl, fmt); + char *buf = NULL; + vasprintf(&buf, fmt, vl); + va_end(vl); + std::string s(buf); + free(buf); + return s; +} diff --git a/rtmputils.h b/rtmputils.h new file mode 100644 index 0000000..f873232 --- /dev/null +++ b/rtmputils.h @@ -0,0 +1,49 @@ +/** + * @file utils.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ +#ifndef RTMPUTILS_H_ +#define RTMPUTILS_H_ + +#include +#include +#include +#include + +#define FOR_EACH(type, i, where) \ + for (typename type::iterator i = (where).begin(); i != (where).end(); ++i) + +#define FOR_EACH_CONST(type, i, where) \ + for (typename type::const_iterator i = (where).begin(); \ + i != (where).end(); ++i) + +// #define debug(fmt...) fprintf(stderr, fmt) + +#define debug(fmt...) + +template +Value get(const std::map &map, const Key &k, + const Value &def = Value()) +{ + typename std::map::const_iterator i = map.find(k); + if (i == map.end()) + return def; + return i->second; +} + +uint32_t load_be32(const void *p); +uint16_t load_be16(const void *p); +uint32_t load_be24(const void *p); +uint32_t load_le32(const void *p); +void set_be24(void *p, uint32_t val); +void set_le32(void *p, uint32_t val); + +const std::string strf(const char *fmt, ...); + +#endif -- cgit v1.0