diff options
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | LICENSE | 0 | ||||
| -rw-r--r-- | Makefile | 28 | ||||
| -rw-r--r-- | amf.cpp | 429 | ||||
| -rw-r--r-- | amf.h | 167 | ||||
| -rw-r--r-- | fileutils.c | 72 | ||||
| -rw-r--r-- | fileutils.h | 18 | ||||
| -rw-r--r-- | main.c | 56 | ||||
| -rw-r--r-- | rtmp.h | 74 | ||||
| -rw-r--r-- | rtmpserver.cpp | 820 | ||||
| -rw-r--r-- | rtmpserver.h | 25 | ||||
| -rw-r--r-- | rtmputils.cpp | 67 | ||||
| -rw-r--r-- | rtmputils.h | 49 |
13 files changed, 1808 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0de0710 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +ezlive +*.flv
\ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6f6d633 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +CC := gcc +CXX := g++ +CFLAGS := -g -Wall +CXXFLAGS := -g -Wall -std=c++14 + +C_SOURCES := $(shell find . -maxdepth 1 -name '*.c') +CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp') + +C_OBJS := $(C_SOURCES:.c=.o) +CPP_OBJS := $(CPP_SOURCES:.cpp=.o) + +TARGET := ezlive + +all: $(TARGET) + +$(TARGET): $(C_OBJS) $(CPP_OBJS) + $(CXX) $(C_OBJS) $(CPP_OBJS) -o $@ + +%.o: %.c + $(CC) $(CFLAGS) -c $< -o $@ + +%.o: %.cpp + $(CXX) $(CXXFLAGS) -c $< -o $@ + +clean: + rm -f $(C_OBJS) $(CPP_OBJS) $(TARGET) + +.PHONY: all clean
\ No newline at end of file @@ -0,0 +1,429 @@ +#include "amf.h" +#include "rtmputils.h" + +#include <stdexcept> +#include <string.h> +#include <arpa/inet.h> + +namespace { + +uint8_t peek(const Decoder *dec) +{ + if (dec->pos >= dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + return uint8_t(dec->buf[dec->pos]); +} + +uint8_t get_byte(Decoder *dec) +{ + if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) { + debug("entering AMF3 mode\n"); + dec->pos++; + dec->version = 3; + } + + if (dec->pos >= dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + return uint8_t(dec->buf[dec->pos++]); +} + +} + +AMFValue::AMFValue(AMFType type) : + m_type(type) +{ + switch (m_type) { + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + m_value.object = new amf_object_t; + break; + case AMF_NUMBER: + case AMF_INTEGER: + case AMF_NULL: + case AMF_UNDEFINED: + break; + default: + assert(0); + } +} + +AMFValue::AMFValue(const std::string &s) : + m_type(AMF_STRING) +{ + m_value.string = new std::string(s); +} + +AMFValue::AMFValue(double n) : + m_type(AMF_NUMBER) +{ + m_value.number = n; +} + +AMFValue::AMFValue(int i) : + m_type(AMF_INTEGER) +{ + m_value.integer = i; +} + +AMFValue::AMFValue(bool b) : + m_type(AMF_BOOLEAN) +{ + m_value.boolean = b; +} + +AMFValue::AMFValue(const amf_object_t &object) : + m_type(AMF_OBJECT) +{ + m_value.object = new amf_object_t(object); +} + +AMFValue::AMFValue(const AMFValue &from) : + m_type(AMF_NULL) +{ + *this = from; +} + +AMFValue::~AMFValue() +{ + destroy(); +} + +void AMFValue::destroy() +{ + switch (m_type) { + case AMF_STRING: + delete m_value.string; + break; + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + delete m_value.object; + break; + case AMF_NULL: + case AMF_NUMBER: + case AMF_INTEGER: + case AMF_BOOLEAN: + case AMF_UNDEFINED: + break; + } +} + +void AMFValue::operator = (const AMFValue &from) +{ + destroy(); + m_type = from.m_type; + switch (m_type) { + case AMF_STRING: + m_value.string = new std::string(*from.m_value.string); + break; + case AMF_OBJECT: + case AMF_ECMA_ARRAY: + m_value.object = new amf_object_t(*from.m_value.object); + break; + case AMF_NUMBER: + m_value.number = from.m_value.number; + break; + case AMF_INTEGER: + m_value.integer = from.m_value.integer; + break; + case AMF_BOOLEAN: + m_value.boolean = from.m_value.boolean; + break; + default: + break; + } +} + +void amf_write(Encoder *enc, const std::string &s) +{ + enc->buf += char(AMF0_STRING); + uint16_t str_len = htons(s.size()); + enc->buf.append((char *) &str_len, 2); + enc->buf += s; +} + +void amf_write(Encoder *enc, int i) +{ + throw std::runtime_error("AMF0 does not have integers"); +} + +void amf_write(Encoder *enc, double n) +{ + enc->buf += char(AMF0_NUMBER); + uint64_t encoded = 0; +//#if defined(__i386__) || defined(__x86_64__) + /* Flash uses same floating point format as x86 */ + memcpy(&encoded, &n, 8); +//#endif + uint32_t val = htonl(encoded >> 32); + enc->buf.append((char *) &val, 4); + val = htonl(encoded); + enc->buf.append((char *) &val, 4); +} + +void amf_write(Encoder *enc, bool b) +{ + enc->buf += char(AMF0_BOOLEAN); + enc->buf += char(b); +} + +void amf_write_key(Encoder *enc, const std::string &s) +{ + uint16_t str_len = htons(s.size()); + enc->buf.append((char *) &str_len, 2); + enc->buf += s; +} + +void amf_write(Encoder *enc, const amf_object_t &object) +{ + enc->buf += char(AMF0_OBJECT); + for (amf_object_t::const_iterator i = object.begin(); + i != object.end(); ++i) { + amf_write_key(enc, i->first); + amf_write(enc, i->second); + } + amf_write_key(enc, ""); + enc->buf += char(AMF0_OBJECT_END); +} + +void amf_write_ecma(Encoder *enc, const amf_object_t &object) +{ + enc->buf += char(AMF0_ECMA_ARRAY); + uint32_t zero = 0; + enc->buf.append((char *) &zero, 4); + for (amf_object_t::const_iterator i = object.begin(); + i != object.end(); ++i) { + amf_write_key(enc, i->first); + amf_write(enc, i->second); + } + amf_write_key(enc, ""); + enc->buf += char(AMF0_OBJECT_END); +} + +void amf_write_null(Encoder *enc) +{ + enc->buf += char(AMF0_NULL); +} + +void amf_write(Encoder *enc, const AMFValue &value) +{ + switch (value.type()) { + case AMF_STRING: + amf_write(enc, value.as_string()); + break; + case AMF_NUMBER: + amf_write(enc, value.as_number()); + break; + case AMF_INTEGER: + amf_write(enc, value.as_integer()); + break; + case AMF_BOOLEAN: + amf_write(enc, value.as_boolean()); + break; + case AMF_OBJECT: + amf_write(enc, value.as_object()); + break; + case AMF_ECMA_ARRAY: + amf_write_ecma(enc, value.as_object()); + break; + case AMF_NULL: + amf_write_null(enc); + break; + default: + break; + } +} + +unsigned int load_amf3_integer(Decoder *dec) +{ + unsigned int value = 0; + for (int i = 0; i < 4; ++i) { + uint8_t b = get_byte(dec); + if (i == 3) { + /* use all bits from 4th byte */ + value = (value << 8) | b; + break; + } + value = (value << 7) | (b & 0x7f); + if ((b & 0x80) == 0) + break; + } + return value; +} + +std::string amf_load_string(Decoder *dec) +{ + size_t str_len = 0; + uint8_t type = get_byte(dec); + if (dec->version == 3) { + if (type != AMF3_STRING) { + throw std::runtime_error("Expected a string"); + } + str_len = load_amf3_integer(dec) / 2; + + } else { + if (type != AMF0_STRING) { + throw std::runtime_error("Expected a string"); + } + if (dec->pos + 2 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + str_len = load_be16(&dec->buf[dec->pos]); + dec->pos += 2; + } + if (dec->pos + str_len > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + std::string s(dec->buf, dec->pos, str_len); + dec->pos += str_len; + return s; +} + +double amf_load_number(Decoder *dec) +{ + if (get_byte(dec) != AMF0_NUMBER) { + throw std::runtime_error("Expected a string"); + } + if (dec->pos + 8 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) | + load_be32(&dec->buf[dec->pos + 4]); + double n = 0; +//#if defined(__i386__) || defined(__x86_64__) + /* Flash uses same floating point format as x86 */ + memcpy(&n, &val, 8); +//#endif + dec->pos += 8; + return n; +} + +int amf_load_integer(Decoder *dec) +{ + if (dec->version == 3) { + return load_amf3_integer(dec); + } else { + return amf_load_number(dec); + } +} + +bool amf_load_boolean(Decoder *dec) +{ + if (get_byte(dec) != AMF0_BOOLEAN) { + throw std::runtime_error("Expected a boolean"); + } + return get_byte(dec) != 0; +} + +std::string amf_load_key(Decoder *dec) +{ + if (dec->pos + 2 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + size_t str_len = load_be16(&dec->buf[dec->pos]); + dec->pos += 2; + if (dec->pos + str_len > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + std::string s(dec->buf, dec->pos, str_len); + dec->pos += str_len; + return s; +} + +amf_object_t amf_load_object(Decoder *dec) +{ + amf_object_t object; + if (get_byte(dec) != AMF0_OBJECT) { + throw std::runtime_error("Expected an object"); + } + while (1) { + std::string key = amf_load_key(dec); + if (key.empty()) + break; + AMFValue value = amf_load(dec); + object.insert(std::make_pair(key, value)); + } + if (get_byte(dec) != AMF0_OBJECT_END) { + throw std::runtime_error("expected object end"); + } + return object; +} + +amf_object_t amf_load_ecma(Decoder *dec) +{ + /* ECMA array is the same as object, with 4 extra zero bytes */ + amf_object_t object; + if (get_byte(dec) != AMF0_ECMA_ARRAY) { + throw std::runtime_error("Expected an ECMA array"); + } + if (dec->pos + 4 > dec->buf.size()) { + throw std::runtime_error("Not enough data"); + } + dec->pos += 4; + while (1) { + std::string key = amf_load_key(dec); + if (key.empty()) + break; + AMFValue value = amf_load(dec); + object.insert(std::make_pair(key, value)); + } + if (get_byte(dec) != AMF0_OBJECT_END) { + throw std::runtime_error("expected object end"); + } + return object; +} + +AMFValue amf_load(Decoder *dec) +{ + uint8_t type = peek(dec); + if (dec->version == 3) { + switch (type) { + case AMF3_STRING: + return AMFValue(amf_load_string(dec)); + case AMF3_NUMBER: + return AMFValue(amf_load_number(dec)); + case AMF3_INTEGER: + return AMFValue(amf_load_integer(dec)); + case AMF3_FALSE: + dec->pos++; + return AMFValue(false); + case AMF3_TRUE: + dec->pos++; + return AMFValue(true); + case AMF3_OBJECT: + return AMFValue(amf_load_object(dec)); + case AMF3_ARRAY: + return AMFValue(amf_load_ecma(dec)); + case AMF3_NULL: + dec->pos++; + return AMFValue(AMF_NULL); + case AMF3_UNDEFINED: + dec->pos++; + return AMFValue(AMF_UNDEFINED); + default: + throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type)); + } + } else { + switch (type) { + case AMF0_STRING: + return AMFValue(amf_load_string(dec)); + case AMF0_NUMBER: + return AMFValue(amf_load_number(dec)); + case AMF0_BOOLEAN: + return AMFValue(amf_load_boolean(dec)); + case AMF0_OBJECT: + return AMFValue(amf_load_object(dec)); + case AMF0_ECMA_ARRAY: + return AMFValue(amf_load_ecma(dec)); + case AMF0_NULL: + dec->pos++; + return AMFValue(AMF_NULL); + case AMF0_UNDEFINED: + dec->pos++; + return AMFValue(AMF_UNDEFINED); + default: + throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type)); + } + } +} @@ -0,0 +1,167 @@ +/** + * @file amf.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ + +#ifndef AMF_H_ +#define AMF_H_ + +#include <string> +#include <map> +#include <assert.h> + +enum AMFType { + AMF_NUMBER, + AMF_INTEGER, + AMF_BOOLEAN, + AMF_STRING, + AMF_OBJECT, + AMF_NULL, + AMF_UNDEFINED, + AMF_ECMA_ARRAY, +}; + +enum { + AMF0_NUMBER, + AMF0_BOOLEAN, + AMF0_STRING, + AMF0_OBJECT, + AMF0_MOVIECLIP, + AMF0_NULL, + AMF0_UNDEFINED, + AMF0_REFERENCE, + AMF0_ECMA_ARRAY, + AMF0_OBJECT_END, + AMF0_STRICT_ARRAY, + AMF0_DATE, + AMF0_LONG_STRING, + AMF0_UNSUPPORTED, + AMF0_RECORD_SET, + AMF0_XML_OBJECT, + AMF0_TYPED_OBJECT, + AMF0_SWITCH_AMF3, +}; + +enum { + AMF3_UNDEFINED, + AMF3_NULL, + AMF3_FALSE, + AMF3_TRUE, + AMF3_INTEGER, + AMF3_NUMBER, + AMF3_STRING, + AMF3_LEGACY_XML, + AMF3_DATE, + AMF3_ARRAY, + AMF3_OBJECT, + AMF3_XML, + AMF3_BYTE_ARRAY, +}; + +struct Decoder { + std::string buf; + size_t pos; + int version; +}; + +struct Encoder { + std::string buf; +}; + +class AMFValue; + +typedef std::map<std::string, AMFValue> amf_object_t; + +class AMFValue { +public: + AMFValue(AMFType type = AMF_NULL); + AMFValue(const std::string &s); + AMFValue(double n); + AMFValue(int i); + AMFValue(bool b); + AMFValue(const amf_object_t &object); + AMFValue(const AMFValue &from); + ~AMFValue(); + + AMFType type() const { return m_type; } + + std::string as_string() const + { + assert(m_type == AMF_STRING); + return *m_value.string; + } + double as_number() const + { + assert(m_type == AMF_NUMBER); + return m_value.number; + } + double as_integer() const + { + assert(m_type == AMF_INTEGER); + return m_value.integer; + } + bool as_boolean() const + { + assert(m_type == AMF_BOOLEAN); + return m_value.boolean; + } + amf_object_t as_object() const + { + assert(m_type == AMF_OBJECT); + return *m_value.object; + } + + AMFValue get(const std::string &s) const + { + assert(m_type == AMF_OBJECT); + amf_object_t::const_iterator i = m_value.object->find(s); + if (i == m_value.object->end()) + return AMFValue(); + return i->second; + } + + void set(const std::string &s, const AMFValue &val) + { + assert(m_type == AMF_OBJECT); + m_value.object->insert(std::make_pair(s, val)); + } + + void operator = (const AMFValue &from); + +private: + AMFType m_type; + union { + std::string *string; + double number; + int integer; + bool boolean; + amf_object_t *object; + } m_value; + + void destroy(); +}; + +void amf_write(Encoder *enc, const std::string &s); +void amf_write(Encoder *enc, double n); +void amf_write(Encoder *enc, bool b); +void amf_write_key(Encoder *enc, const std::string &s); +void amf_write(Encoder *enc, const amf_object_t &object); +void amf_write_ecma(Encoder *enc, const amf_object_t &object); +void amf_write_null(Encoder *enc); +void amf_write(Encoder *enc, const AMFValue &value); + +std::string amf_load_string(Decoder *dec); +double amf_load_number(Decoder *dec); +bool amf_load_boolean(Decoder *dec); +std::string amf_load_key(Decoder *dec); +amf_object_t amf_load_object(Decoder *dec); +amf_object_t amf_load_ecma(Decoder *dec); +AMFValue amf_load(Decoder *dec); + +#endif diff --git a/fileutils.c b/fileutils.c new file mode 100644 index 0000000..a9d9efc --- /dev/null +++ b/fileutils.c @@ -0,0 +1,72 @@ +#include "fileutils.h" + +bool fwrite_word16le(FILE* fp, uint16_t x) { + uint8_t buf[2]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + int r = fwrite(buf, 1, 2, fp); + if (r != 2) return false; + return true; +} + +bool fwrite_word32le(FILE* fp, uint32_t x) { + bool ret = false; + uint16_t buf[2]; + + buf[0] = x & 0xffff; + buf[1] = (x >> 16) & 0xffff; + ret = fwrite_word16le(fp, buf[0]); + if (!ret) return ret; + ret = fwrite_word16le(fp, buf[1]); + if (!ret) return ret; + return true; +} + +bool fwrite_word16be(FILE* fp, uint16_t x) { + uint8_t buf[2]; + buf[1] = x & 0xff; + buf[0] = (x >> 8) & 0xff; + int r = fwrite(buf, 1, 2, fp); + if (r != 2) return false; + return true; +} + +bool fwrite_word32be(FILE* fp, uint32_t x) { + bool ret = false; + uint16_t buf[2]; + + buf[1] = x & 0xffff; + buf[0] = (x >> 16) & 0xffff; + ret = fwrite_word16be(fp, buf[0]); + if (!ret) return ret; + ret = fwrite_word16be(fp, buf[1]); + if (!ret) return ret; + return true; +} + +bool fwrite_word24le(FILE* fp, uint32_t x) { + uint8_t buf[3]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[2] = (x >> 16) & 0xff; + int r = fwrite(buf, 1, 3, fp); + if (r != 3) return false; + return true; +} + +bool fwrite_word24be(FILE* fp, uint32_t x) { + uint8_t buf[3]; + buf[2] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[0] = (x >> 16) & 0xff; + int r = fwrite(buf, 1, 3, fp); + if (r != 3) return false; + return true; +} + +bool fwrite_char(FILE* fp, uint8_t x) { + uint8_t buf[1]; + buf[0] = x; + int ret = fwrite(buf, 1, 1, fp); + return ret == 1; +}
\ No newline at end of file diff --git a/fileutils.h b/fileutils.h new file mode 100644 index 0000000..b9888a3 --- /dev/null +++ b/fileutils.h @@ -0,0 +1,18 @@ +#ifndef FILEUTILS_H +#define FILEUTILS_H + +#include <stdbool.h> +#include <stdint.h> +#include <stdio.h> + +bool fwrite_word16le(FILE* fp, uint16_t x); +bool fwrite_word24le(FILE* fp, uint32_t x); +bool fwrite_word32le(FILE* fp, uint32_t x); + +bool fwrite_word16be(FILE* fp, uint16_t x); +bool fwrite_word24be(FILE* fp, uint32_t x); +bool fwrite_word32be(FILE* fp, uint32_t x); + +bool fwrite_char(FILE* fp, uint8_t x); + +#endif
\ No newline at end of file @@ -0,0 +1,56 @@ +#include <stdio.h> + +#include "rtmpserver.h" +#include "fileutils.h" + +void on_rtmp_start(void *ctx) { + *(FILE**)ctx = fopen("test.flv", "wb"); + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 'F'); + fwrite_char(fp, 'L'); + fwrite_char(fp, 'V'); + fwrite_char(fp, 1); + fwrite_char(fp, 5); + fwrite_word32be(fp, 9); + fwrite_word32be(fp, 0); +} + +void on_rtmp_stop(void *ctx) { + FILE *fp = *(FILE**)ctx; + fclose(fp); + exit(0); +} + +void on_rtmp_video(void *ctx, int64_t timestamp, char *buf, size_t size) { + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 9); + fwrite_word24be(fp, size); + fwrite_word24be(fp, timestamp); + fwrite_char(fp, timestamp >> 24); + fwrite_word24be(fp, 0); + fwrite(buf, 1, size, fp); + fwrite_word32be(fp, size + 11); +} + +void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) { + FILE *fp = *(FILE**)ctx; + fwrite_char(fp, 8); + fwrite_word24be(fp, size); + fwrite_word24be(fp, timestamp); + fwrite_char(fp, timestamp >> 24); + fwrite_word24be(fp, 0); + fwrite(buf, 1, size, fp); + fwrite_word32be(fp, size + 11); +} + +int main() { + RtmpCallbacks rtmp_cbs = { + .on_audio = &on_rtmp_audio, + .on_video = &on_rtmp_video, + .on_start = &on_rtmp_start, + .on_stop = &on_rtmp_stop, + }; + FILE* fp = NULL; + start_rtmpserver(rtmp_cbs, &fp); + return 0; +}
\ No newline at end of file @@ -0,0 +1,74 @@ +/** + * @file rtmp.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ + +#ifndef RTMP_H_ +#define RTMP_H_ + +#include <stdint.h> + +#define PORT 1935 + +#define DEFAULT_CHUNK_LEN 128 + +#define PACKED __attribute__((packed)) + +#define HANDSHAKE_PLAINTEXT 0x03 + +#define RANDOM_LEN (1536 - 8) + +#define MSG_SET_CHUNK 0x01 +#define MSG_BYTES_READ 0x03 +#define MSG_USER_CONTROL 0x04 +#define MSG_RESPONSE 0x05 +#define MSG_REQUEST 0x06 +#define MSG_AUDIO 0x08 +#define MSG_VIDEO 0x09 +#define MSG_INVOKE3 0x11 /* AMF3 */ +#define MSG_NOTIFY 0x12 +#define MSG_OBJECT 0x13 +#define MSG_INVOKE 0x14 /* AMF0 */ +#define MSG_FLASH_VIDEO 0x16 + +#define CONTROL_CLEAR_STREAM 0x00 +#define CONTROL_CLEAR_BUFFER 0x01 +#define CONTROL_STREAM_DRY 0x02 +#define CONTROL_BUFFER_TIME 0x03 +#define CONTROL_RESET_STREAM 0x04 +#define CONTROL_PING 0x06 +#define CONTROL_REQUEST_VERIFY 0x1a +#define CONTROL_RESPOND_VERIFY 0x1b +#define CONTROL_BUFFER_EMPTY 0x1f +#define CONTROL_BUFFER_READY 0x20 + +#define CONTROL_ID 0 +#define STREAM_ID 1337 + +#define CHAN_CONTROL 2 +#define CHAN_RESULT 3 +#define CHAN_STREAM 4 + +#define FLV_KEY_FRAME 0x01 +#define FLV_INTER_FRAME 0x02 + +struct Handshake { + uint8_t flags[8]; + uint8_t random[RANDOM_LEN]; +} PACKED; + +struct RTMP_Header { + uint8_t flags; + char timestamp[3]; + char msg_len[3]; + uint8_t msg_type; + char endpoint[4]; /* Note, this is little-endian while others are BE */ +} PACKED; + +#endif diff --git a/rtmpserver.cpp b/rtmpserver.cpp new file mode 100644 index 0000000..3d0d4f0 --- /dev/null +++ b/rtmpserver.cpp @@ -0,0 +1,820 @@ +#include "amf.h" +#include "rtmputils.h" +#include "rtmp.h" +#include <vector> +#include <stdexcept> +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <errno.h> +#include <assert.h> +#include <stdarg.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <sys/poll.h> +#include <sys/time.h> +#include <unistd.h> +#include <fcntl.h> + +#include "rtmpserver.h" + +#define APP_NAME "live" + +RtmpCallbacks g_rtmp_server_cbs; +void *g_rtmp_server_ctx; + +struct RTMP_Message { + uint8_t type; + size_t len; + unsigned long timestamp; + uint32_t endpoint; + std::string buf; +}; + +struct Client { + int fd; + bool publisher; + bool playing; /* Wants to receive the stream? */ + bool ready; /* Wants to receive and seen a keyframe */ + RTMP_Message messages[64]; + std::string buf; + std::string send_queue; + std::string path; + size_t chunk_len; + uint32_t written_seq; + uint32_t read_seq; +}; + +namespace { + +amf_object_t metadata; +int listen_fd; +std::vector<pollfd> poll_table; +std::vector<Client *> clients; + +int set_nonblock(int fd, bool enabled) +{ + int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK; + if (enabled) { + flags |= O_NONBLOCK; + } + return fcntl(fd, F_SETFL, flags); +} + +size_t recv_all(int fd, void *buf, size_t len) +{ + size_t pos = 0; + while (pos < len) { + ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0); + if (bytes < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + throw std::runtime_error(strf("unable to recv: %s", strerror(errno))); + } + if (bytes == 0) + break; + pos += bytes; + } + return pos; +} + +size_t send_all(int fd, const void *buf, size_t len) +{ + size_t pos = 0; + while (pos < len) { + ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + throw std::runtime_error(strf("unable to send: %s", strerror(errno))); + } + if (written == 0) + break; + pos += written; + } + return pos; +} + +bool is_safe(uint8_t b) +{ + return b >= ' ' && b < 128; +} + +void hexdump(const void *buf, size_t len) +{ + const uint8_t *data = (const uint8_t *) buf; + for (size_t i = 0; i < len; i += 16) { + for (int j = 0; j < 16; ++j) { + if (i + j < len) + debug("%.2x ", data[i + j]); + else + debug(" "); + } + for (int j = 0; j < 16; ++j) { + if (i + j < len) { + putc(is_safe(data[i + j]) ? data[i + j] : '.', + stdout); + } else { + putc(' ', stdout); + } + } + putc('\n', stdout); + } +} + +void try_to_send(void *data) +{ + Client *client = (Client *)data; + size_t len = client->send_queue.size(); + if (len > 4096) + len = 4096; + + ssize_t written = send(client->fd, client->send_queue.data(), len, 0); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + throw std::runtime_error(strf("unable to write to a client: %s", + strerror(errno))); + } + + client->send_queue.erase(0, written); +} + +void rtmp_send(Client *client, uint8_t type, uint32_t endpoint, + const std::string &buf, unsigned long timestamp = 0, + int channel_num = CHAN_CONTROL) +{ + if (endpoint == STREAM_ID) { + /* + * For some unknown reason, stream-related msgs must be sent + * on a specific channel. + */ + channel_num = CHAN_STREAM; + } + + RTMP_Header header; + header.flags = (channel_num & 0x3f) | (0 << 6); + header.msg_type = type; + set_be24(header.timestamp, timestamp); + set_be24(header.msg_len, buf.size()); + set_le32(header.endpoint, endpoint); + + client->send_queue.append((char *) &header, sizeof header); + client->written_seq += sizeof header; + + size_t pos = 0; + while (pos < buf.size()) { + if (pos) { + uint8_t flags = (channel_num & 0x3f) | (3 << 6); + client->send_queue += char(flags); + + client->written_seq += 1; + } + + size_t chunk = buf.size() - pos; + if (chunk > client->chunk_len) + chunk = client->chunk_len; + client->send_queue.append(buf, pos, chunk); + + client->written_seq += chunk; + pos += chunk; + } + + try_to_send(client); +} + +void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(), + const AMFValue &status = AMFValue()) +{ + if (txid <= 0.0) + return; + Encoder invoke; + amf_write(&invoke, std::string("_result")); + amf_write(&invoke, txid); + amf_write(&invoke, reply); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT); +} + +void handle_connect(Client *client, double txid, Decoder *dec) +{ + amf_object_t params = amf_load_object(dec); + std::string app = get(params, std::string("app")).as_string(); + std::string ver = "(unknown)"; + + AMFValue flashver = get(params, std::string("flashVer")); + if (flashver.type() == AMF_STRING) { + ver = flashver.as_string(); + } + + /* + if (app != APP_NAME) { + throw std::runtime_error("Unsupported application: " + app); + } + */ + + printf("connect: %s (version %s)\n", app.c_str(), ver.c_str()); + + amf_object_t version; + version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484"))); + version.insert(std::make_pair("capabilities", 255.0)); + version.insert(std::make_pair("mode", 1.0)); + + amf_object_t status; + status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success"))); + /* report support for AMF3 */ + // status.insert(std::make_pair("objectEncoding", 3.0)); + + send_reply(client, txid, version, status); + +/* + uint32_t chunk_len = htonl(1024); + std::string set_chunk((char *) &chunk_len, 4); + rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk, 0, + MEDIA_CHANNEL); + + client->chunk_len = 1024; +*/ +} + +int publishernum = 0; + +void handle_fcpublish(Client *client, double txid, Decoder *dec) +{ + if (publishernum > 0) { + throw std::runtime_error{"only one publisher."}; + } + publishernum = 1; + g_rtmp_server_cbs.on_start(g_rtmp_server_ctx); + client->publisher = true; + printf( "publisher connected."); + + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + debug("fcpublish %s\n", path.c_str()); + + client->path = path; + + amf_object_t status; + status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); + status.insert(std::make_pair("description", path)); + + Encoder invoke; + amf_write(&invoke, std::string("onFCPublish")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf); + + send_reply(client, txid); +} + +void handle_createstream(Client *client, double txid, Decoder *dec) +{ + send_reply(client, txid, AMFValue(), double(STREAM_ID)); +} + +void handle_publish(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + debug("publish %s\n", path.c_str()); + + client->path = path; + + amf_object_t status; + status.insert(std::make_pair("level", std::string("status"))); + status.insert(std::make_pair("code", std::string("NetStream.Publish.Start"))); + status.insert(std::make_pair("description", std::string("Stream is now published."))); + status.insert(std::make_pair("details", path)); + + Encoder invoke; + amf_write(&invoke, std::string("onStatus")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); + + send_reply(client, txid); +} + +void start_playback(Client *client) +{ + throw std::runtime_error{"playback not supported."}; +} + +void handle_play(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + std::string path = amf_load_string(dec); + + debug("play %s\n", path.c_str()); + client->path = path; + + start_playback(client); + + send_reply(client, txid); +} + +void handle_play2(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + amf_object_t params = amf_load_object(dec); + std::string path = get(params, std::string("streamName")).as_string(); + + debug("play %s\n", path.c_str()); + client->path = path; + start_playback(client); + + send_reply(client, txid); +} + +void handle_pause(Client *client, double txid, Decoder *dec) +{ + amf_load(dec); /* NULL */ + + bool paused = amf_load_boolean(dec); + + if (paused) { + debug("pausing\n"); + + amf_object_t status; + status.insert(std::make_pair("level", std::string("status"))); + status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify"))); + status.insert(std::make_pair("description", std::string("Pausing."))); + + Encoder invoke; + amf_write(&invoke, std::string("onStatus")); + amf_write(&invoke, 0.0); + amf_write_null(&invoke); + amf_write(&invoke, status); + rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf); + client->playing = false; + } else { + start_playback(client); + } + + send_reply(client, txid); +} + +void handle_setdataframe(Client *client, Decoder *dec) +{ + if(client->publisher == false) { + printf("not a publisher"); + throw std::runtime_error("not a publisher"); + } + + printf( "client paht is %s",client->path.c_str()); + + std::string type = amf_load_string(dec); + if (type != "onMetaData") { + throw std::runtime_error("can only set metadata"); + } + + metadata = amf_load_ecma(dec); + + Encoder notify; + amf_write(¬ify, std::string("onMetaData")); + amf_write_ecma(¬ify, metadata); + + FOR_EACH(std::vector<Client *>, i, clients) { + Client *receiver = *i; + if (receiver != NULL && receiver->playing && receiver->path == client->path) { + rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf); + } + } +} + +void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec) +{ + std::string method = amf_load_string(dec); + double txid = amf_load_number(dec); + + debug( "invoked %s txid %f\n", method.c_str(), txid); + + if (msg->endpoint == CONTROL_ID) { + if (method == "connect") { + handle_connect(client, txid, dec); + } else if (method == "FCPublish") { + handle_fcpublish(client, txid, dec); + } else if (method == "createStream") { + handle_createstream(client, txid, dec); + } + + } else if (msg->endpoint == STREAM_ID) { + if (method == "publish") { + handle_publish(client, txid, dec); + } else if (method == "play") { + handle_play(client, txid, dec); + } else if (method == "play2") { + handle_play2(client, txid, dec); + } else if (method == "pause") { + handle_pause(client, txid, dec); + } + } +} + +void handle_message(Client *client, RTMP_Message *msg) +{ + + debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len, + msg->timestamp); + + size_t pos = 0; + + switch (msg->type) { + case MSG_BYTES_READ: + if (pos + 4 > msg->buf.size()) { + throw std::runtime_error("Not enough data"); + } + client->read_seq = load_be32(&msg->buf[pos]); + debug("%d in queue\n", + int(client->written_seq - client->read_seq)); + break; + + case MSG_SET_CHUNK: + if (pos + 4 > msg->buf.size()) { + throw std::runtime_error("Not enough data"); + } + client->chunk_len = load_be32(&msg->buf[pos]); + debug("chunk size set to %zu\n", client->chunk_len); + break; + + case MSG_INVOKE: { + debug("handling message invoke 0 \n"); + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 0; + handle_invoke(client, msg, &dec); + } + break; + + case MSG_INVOKE3: { + debug("handling message invoke 3 \n"); + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 1; + handle_invoke(client, msg, &dec); + } + break; + + case MSG_NOTIFY: { + Decoder dec; + dec.version = 0; + dec.buf = msg->buf; + dec.pos = 0; + std::string type = amf_load_string(&dec); + debug("notify %s\n", type.c_str()); + if (msg->endpoint == STREAM_ID) { + if (type == "@setDataFrame") { + handle_setdataframe(client, &dec); + } + } + } + break; + + case MSG_AUDIO: + if(client->publisher == false) { + throw std::runtime_error("not a publisher"); + } + g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); + // FOR_EACH(std::vector<Client *>, i, clients) { + // Client *receiver = *i; + // if (receiver != NULL && receiver->ready) { + // rtmp_send(receiver, MSG_AUDIO, STREAM_ID, + // msg->buf, msg->timestamp); + // } + // } + break; + case MSG_VIDEO: { + if(client->publisher == false){ + throw std::runtime_error("not a publisher"); + } + // uint8_t flags = msg->buf[0]; + g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size()); + // FOR_EACH(std::vector<Client *>, i, clients) { + // Client *receiver = *i; + // if (receiver != NULL && receiver->playing && + // client->path == receiver->path) { + // if (flags >> 4 == FLV_KEY_FRAME && + // !receiver->ready) { + // std::string control; + // uint16_t type = htons(CONTROL_CLEAR_STREAM); + // control.append((char *) &type, 2); + // uint32_t stream = htonl(STREAM_ID); + // control.append((char *) &stream, 4); + // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control); + // receiver->ready = true; + // } + // if (receiver->ready) { + // static int flag = 0; + // if(flag < 10) { + // flag ++; + // } + // rtmp_send(receiver, MSG_VIDEO, + // STREAM_ID, msg->buf, + // msg->timestamp); + // } + // } + // } + } + break; + + case MSG_FLASH_VIDEO: + //printf( "streaming FLV not supported"); + throw std::runtime_error("streaming FLV not supported"); + break; + + default: + debug("unhandled message: %02x\n", msg->type); + hexdump(msg->buf.data(), msg->buf.size()); + break; + } +} + +/* TODO: Make this asynchronous */ +void do_handshake(Client *client) +{ + Handshake serversig; + Handshake clientsig; + + uint8_t c; + if (recv_all(client->fd, &c, 1) < 1) + return; + if (c != HANDSHAKE_PLAINTEXT) { + //printf( "only plaintext handshake supported"); + throw std::runtime_error("only plaintext handshake supported"); + } + + if (send_all(client->fd, &c, 1) < 1) + return; + + memset(&serversig, 0, sizeof serversig); + serversig.flags[0] = 0x03; + for (int i = 0; i < RANDOM_LEN; ++i) { + serversig.random[i] = rand(); + } + + if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig) + return; + + /* Echo client's signature back */ + if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + + if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig) + return; + if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) { + //printf( "invalid handshake"); + throw std::runtime_error("invalid handshake"); + } + + client->read_seq = 1 + sizeof serversig * 2; + client->written_seq = 1 + sizeof serversig * 2; +} + +void recv_from_client(void *data) +{ + Client *client = (Client *)data; + + std::string chunk(4096, 0); + ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0); + if (got == 0) { + //printf( "end of life from a client"); + throw std::runtime_error("EOF from a client"); + } else if (got < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + //printf( "unable to read from a client: %s",strerror(errno)); + throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno))); + } + client->buf.append(chunk, 0, got); + + //printf( "got data and length is %d",client->buf.size()); + + while (!client->buf.empty()) { + uint8_t flags = client->buf[0]; + + static const size_t HEADER_LENGTH[] = {12, 8, 4, 1}; + size_t header_len = HEADER_LENGTH[flags >> 6]; + + if (client->buf.size() < header_len) { + /* need more data */ + break; + } + + RTMP_Header header; + memcpy(&header, client->buf.data(), header_len); + + RTMP_Message *msg = &client->messages[flags & 0x3f]; + + if (header_len >= 8) { + msg->len = load_be24(header.msg_len); + if (msg->len < msg->buf.size()) { + throw std::runtime_error("invalid msg length"); + } + msg->type = header.msg_type; + } + if (header_len >= 12) { + msg->endpoint = load_le32(header.endpoint); + } + + if (msg->len == 0) { + throw std::runtime_error("message without a header"); + } + size_t chunk = msg->len - msg->buf.size(); + if (chunk > client->chunk_len) + chunk = client->chunk_len; + + if (client->buf.size() < header_len + chunk) { + /* need more data */ + break; + } + + if (header_len >= 4) { + unsigned long ts = load_be24(header.timestamp); + if (ts == 0xffffff) { + throw std::runtime_error("ext timestamp not supported"); + } + if (header_len < 12) { + ts += msg->timestamp; + } + msg->timestamp = ts; + } + + msg->buf.append(client->buf, header_len, chunk); + client->buf.erase(0, header_len + chunk); + + if (msg->buf.size() == msg->len) { + handle_message(client, msg); + msg->buf.clear(); + } + } +} + +Client *new_client() +{ + sockaddr_in sin; + socklen_t addrlen = sizeof sin; + int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen); + if (fd < 0) { + printf("Unable to accept a client: %s\n", strerror(errno)); + return NULL; + } + + Client *client = new Client; + client->publisher = false; + client->playing = false; + client->ready = false; + client->fd = fd; + client->written_seq = 0; + client->read_seq = 0; + client->chunk_len = DEFAULT_CHUNK_LEN; + for (int i = 0; i < 64; ++i) { + client->messages[i].timestamp = 0; + client->messages[i].len = 0; + } + + try { + do_handshake(client); + } catch (const std::runtime_error &e) { + printf("handshake failed: %s\n", e.what()); + close(fd); + delete client; + return NULL; + } + + set_nonblock(fd, true); + + pollfd entry; + entry.events = POLLIN; + entry.revents = 0; + entry.fd = fd; + poll_table.push_back(entry); + clients.push_back(client); + + return client; +} + +void close_client(Client *client, size_t i) +{ + clients.erase(clients.begin() + i); + poll_table.erase(poll_table.begin() + i); + close(client->fd); + + if (client->publisher == true) { + printf("publisher disconnected.\n"); + client->publisher = false; + publishernum = 0; + g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx); + FOR_EACH(std::vector<Client *>, i, clients) { + Client *client = *i; + if (client != NULL) { + client->ready = false; + } + } + } + delete client; +} + +void do_poll(void) +{ + for (size_t i = 0; i < poll_table.size(); ++i) { + Client *client = clients[i]; + if (client != NULL) { + if (!client->send_queue.empty()) { + //debug("waiting for pollout\n"); + poll_table[i].events = POLLIN | POLLOUT; + } else { + poll_table[i].events = POLLIN; + } + } + } + + if (poll(&poll_table[0], poll_table.size(), -1) < 0) { + if (errno == EAGAIN || errno == EINTR) + return; + throw std::runtime_error(strf("poll() failed: %s", + strerror(errno))); + } + + for (size_t i = 0; i < poll_table.size(); ++i) { + Client *client = clients[i]; + if (poll_table[i].revents & POLLOUT) { + try { + try_to_send(client); + } catch (const std::runtime_error &e) { + printf("client error: %s\n", e.what()); + close_client(client, i); + --i; + continue; + } + } + if (poll_table[i].revents & POLLIN) { + if (client == NULL) { + new_client(); + } else try { + recv_from_client(client); + } catch (const std::runtime_error &e) { + printf("client error: %s\n", e.what()); + close_client(client, i); + --i; + } + } + } +} + +} + +int g_rtmp_server_quit = 0; + +void start_rtmpserver(RtmpCallbacks cbs, void *ctx) { + g_rtmp_server_cbs = cbs; + g_rtmp_server_ctx = ctx; + try { + listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + return; + int opt = 1; + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + perror("setsockopt SO_REUSEADDR"); + exit(EXIT_FAILURE); + } + sockaddr_in sin; + sin.sin_family = AF_INET; + sin.sin_port = htons(PORT); + sin.sin_addr.s_addr = INADDR_ANY; + if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) { + throw std::runtime_error(strf("Unable to listen: %s",strerror(errno))); + return; + } + + listen(listen_fd, 10); + + pollfd entry; + entry.events = POLLIN; + entry.revents = 0; + entry.fd = listen_fd; + poll_table.push_back(entry); + clients.push_back(NULL); + + for (;;) { + if (g_rtmp_server_quit) { + return; + } + do_poll(); + } + return; + } catch (const std::runtime_error &e) { + fprintf(stderr, "ERROR: %s\n", e.what()); + return; + } +} diff --git a/rtmpserver.h b/rtmpserver.h new file mode 100644 index 0000000..cae56a5 --- /dev/null +++ b/rtmpserver.h @@ -0,0 +1,25 @@ +#ifndef RTMPSERVER_H_ +#define RTMPSERVER_H_ + +#include <stdint.h> +#include <stdlib.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + void (*on_start)(void *ctx); + void (*on_stop)(void *ctx); + void (*on_video)(void* ctx, int64_t timestamp, char *buf, size_t size); + void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size); +} RtmpCallbacks; + +void start_rtmpserver(RtmpCallbacks cbs, void *ctx); + +#ifdef __cplusplus +} +#endif + + +#endif
\ No newline at end of file diff --git a/rtmputils.cpp b/rtmputils.cpp new file mode 100644 index 0000000..59088f1 --- /dev/null +++ b/rtmputils.cpp @@ -0,0 +1,67 @@ +#include "rtmputils.h" +#include <string> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <arpa/inet.h> + +/* + * Used to do unaligned loads on archs that don't support them. GCC can mostly + * optimize these away. + */ +uint32_t load_be32(const void *p) +{ + uint32_t val; + memcpy(&val, p, sizeof val); + return ntohl(val); +} + +uint16_t load_be16(const void *p) +{ + uint16_t val; + memcpy(&val, p, sizeof val); + return ntohs(val); +} + +uint32_t load_le32(const void *p) +{ + const uint8_t *data = (const uint8_t *) p; + return data[0] | ((uint32_t) data[1] << 8) | + ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24); +} + +uint32_t load_be24(const void *p) +{ + const uint8_t *data = (const uint8_t *) p; + return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16); +} + +void set_be24(void *p, uint32_t val) +{ + uint8_t *data = (uint8_t *) p; + data[0] = val >> 16; + data[1] = val >> 8; + data[2] = val; +} + +void set_le32(void *p, uint32_t val) +{ + uint8_t *data = (uint8_t *) p; + data[0] = val; + data[1] = val >> 8; + data[2] = val >> 16; + data[3] = val >> 24; +} + +const std::string strf(const char *fmt, ...) +{ + va_list vl; + va_start(vl, fmt); + char *buf = NULL; + vasprintf(&buf, fmt, vl); + va_end(vl); + std::string s(buf); + free(buf); + return s; +} diff --git a/rtmputils.h b/rtmputils.h new file mode 100644 index 0000000..f873232 --- /dev/null +++ b/rtmputils.h @@ -0,0 +1,49 @@ +/** + * @file utils.h + * @author Dean Zou (zoudingyuan@junjietech.com) + * @brief + * @version 1.0 + * @date 2019-06-27 + * + * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD + * + */ +#ifndef RTMPUTILS_H_ +#define RTMPUTILS_H_ + +#include <map> +#include <string> +#include <stdio.h> +#include <stdint.h> + +#define FOR_EACH(type, i, where) \ + for (typename type::iterator i = (where).begin(); i != (where).end(); ++i) + +#define FOR_EACH_CONST(type, i, where) \ + for (typename type::const_iterator i = (where).begin(); \ + i != (where).end(); ++i) + +// #define debug(fmt...) fprintf(stderr, fmt) + +#define debug(fmt...) + +template<class Key, class Value> +Value get(const std::map<Key, Value> &map, const Key &k, + const Value &def = Value()) +{ + typename std::map<Key, Value>::const_iterator i = map.find(k); + if (i == map.end()) + return def; + return i->second; +} + +uint32_t load_be32(const void *p); +uint16_t load_be16(const void *p); +uint32_t load_be24(const void *p); +uint32_t load_le32(const void *p); +void set_be24(void *p, uint32_t val); +void set_le32(void *p, uint32_t val); + +const std::string strf(const char *fmt, ...); + +#endif |
