aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMistivia <i@mistivia.com>2025-09-10 17:41:52 +0800
committerMistivia <i@mistivia.com>2025-09-10 17:41:52 +0800
commit8d089010b37ee193b2b8c809a4f317fa2b11a758 (patch)
treef2a671f9bcc8760983ab016d8d181f2630fe16a7
rtmp server
-rw-r--r--.gitignore3
-rw-r--r--LICENSE0
-rw-r--r--Makefile28
-rw-r--r--amf.cpp429
-rw-r--r--amf.h167
-rw-r--r--fileutils.c72
-rw-r--r--fileutils.h18
-rw-r--r--main.c56
-rw-r--r--rtmp.h74
-rw-r--r--rtmpserver.cpp820
-rw-r--r--rtmpserver.h25
-rw-r--r--rtmputils.cpp67
-rw-r--r--rtmputils.h49
13 files changed, 1808 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0de0710
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*.o
+ezlive
+*.flv \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/LICENSE
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..6f6d633
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,28 @@
+CC := gcc
+CXX := g++
+CFLAGS := -g -Wall
+CXXFLAGS := -g -Wall -std=c++14
+
+C_SOURCES := $(shell find . -maxdepth 1 -name '*.c')
+CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp')
+
+C_OBJS := $(C_SOURCES:.c=.o)
+CPP_OBJS := $(CPP_SOURCES:.cpp=.o)
+
+TARGET := ezlive
+
+all: $(TARGET)
+
+$(TARGET): $(C_OBJS) $(CPP_OBJS)
+ $(CXX) $(C_OBJS) $(CPP_OBJS) -o $@
+
+%.o: %.c
+ $(CC) $(CFLAGS) -c $< -o $@
+
+%.o: %.cpp
+ $(CXX) $(CXXFLAGS) -c $< -o $@
+
+clean:
+ rm -f $(C_OBJS) $(CPP_OBJS) $(TARGET)
+
+.PHONY: all clean \ No newline at end of file
diff --git a/amf.cpp b/amf.cpp
new file mode 100644
index 0000000..2794419
--- /dev/null
+++ b/amf.cpp
@@ -0,0 +1,429 @@
+#include "amf.h"
+#include "rtmputils.h"
+
+#include <stdexcept>
+#include <string.h>
+#include <arpa/inet.h>
+
+namespace {
+
+uint8_t peek(const Decoder *dec)
+{
+ if (dec->pos >= dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ return uint8_t(dec->buf[dec->pos]);
+}
+
+uint8_t get_byte(Decoder *dec)
+{
+ if (dec->version == 0 && peek(dec) == AMF0_SWITCH_AMF3) {
+ debug("entering AMF3 mode\n");
+ dec->pos++;
+ dec->version = 3;
+ }
+
+ if (dec->pos >= dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ return uint8_t(dec->buf[dec->pos++]);
+}
+
+}
+
+AMFValue::AMFValue(AMFType type) :
+ m_type(type)
+{
+ switch (m_type) {
+ case AMF_OBJECT:
+ case AMF_ECMA_ARRAY:
+ m_value.object = new amf_object_t;
+ break;
+ case AMF_NUMBER:
+ case AMF_INTEGER:
+ case AMF_NULL:
+ case AMF_UNDEFINED:
+ break;
+ default:
+ assert(0);
+ }
+}
+
+AMFValue::AMFValue(const std::string &s) :
+ m_type(AMF_STRING)
+{
+ m_value.string = new std::string(s);
+}
+
+AMFValue::AMFValue(double n) :
+ m_type(AMF_NUMBER)
+{
+ m_value.number = n;
+}
+
+AMFValue::AMFValue(int i) :
+ m_type(AMF_INTEGER)
+{
+ m_value.integer = i;
+}
+
+AMFValue::AMFValue(bool b) :
+ m_type(AMF_BOOLEAN)
+{
+ m_value.boolean = b;
+}
+
+AMFValue::AMFValue(const amf_object_t &object) :
+ m_type(AMF_OBJECT)
+{
+ m_value.object = new amf_object_t(object);
+}
+
+AMFValue::AMFValue(const AMFValue &from) :
+ m_type(AMF_NULL)
+{
+ *this = from;
+}
+
+AMFValue::~AMFValue()
+{
+ destroy();
+}
+
+void AMFValue::destroy()
+{
+ switch (m_type) {
+ case AMF_STRING:
+ delete m_value.string;
+ break;
+ case AMF_OBJECT:
+ case AMF_ECMA_ARRAY:
+ delete m_value.object;
+ break;
+ case AMF_NULL:
+ case AMF_NUMBER:
+ case AMF_INTEGER:
+ case AMF_BOOLEAN:
+ case AMF_UNDEFINED:
+ break;
+ }
+}
+
+void AMFValue::operator = (const AMFValue &from)
+{
+ destroy();
+ m_type = from.m_type;
+ switch (m_type) {
+ case AMF_STRING:
+ m_value.string = new std::string(*from.m_value.string);
+ break;
+ case AMF_OBJECT:
+ case AMF_ECMA_ARRAY:
+ m_value.object = new amf_object_t(*from.m_value.object);
+ break;
+ case AMF_NUMBER:
+ m_value.number = from.m_value.number;
+ break;
+ case AMF_INTEGER:
+ m_value.integer = from.m_value.integer;
+ break;
+ case AMF_BOOLEAN:
+ m_value.boolean = from.m_value.boolean;
+ break;
+ default:
+ break;
+ }
+}
+
+void amf_write(Encoder *enc, const std::string &s)
+{
+ enc->buf += char(AMF0_STRING);
+ uint16_t str_len = htons(s.size());
+ enc->buf.append((char *) &str_len, 2);
+ enc->buf += s;
+}
+
+void amf_write(Encoder *enc, int i)
+{
+ throw std::runtime_error("AMF0 does not have integers");
+}
+
+void amf_write(Encoder *enc, double n)
+{
+ enc->buf += char(AMF0_NUMBER);
+ uint64_t encoded = 0;
+//#if defined(__i386__) || defined(__x86_64__)
+ /* Flash uses same floating point format as x86 */
+ memcpy(&encoded, &n, 8);
+//#endif
+ uint32_t val = htonl(encoded >> 32);
+ enc->buf.append((char *) &val, 4);
+ val = htonl(encoded);
+ enc->buf.append((char *) &val, 4);
+}
+
+void amf_write(Encoder *enc, bool b)
+{
+ enc->buf += char(AMF0_BOOLEAN);
+ enc->buf += char(b);
+}
+
+void amf_write_key(Encoder *enc, const std::string &s)
+{
+ uint16_t str_len = htons(s.size());
+ enc->buf.append((char *) &str_len, 2);
+ enc->buf += s;
+}
+
+void amf_write(Encoder *enc, const amf_object_t &object)
+{
+ enc->buf += char(AMF0_OBJECT);
+ for (amf_object_t::const_iterator i = object.begin();
+ i != object.end(); ++i) {
+ amf_write_key(enc, i->first);
+ amf_write(enc, i->second);
+ }
+ amf_write_key(enc, "");
+ enc->buf += char(AMF0_OBJECT_END);
+}
+
+void amf_write_ecma(Encoder *enc, const amf_object_t &object)
+{
+ enc->buf += char(AMF0_ECMA_ARRAY);
+ uint32_t zero = 0;
+ enc->buf.append((char *) &zero, 4);
+ for (amf_object_t::const_iterator i = object.begin();
+ i != object.end(); ++i) {
+ amf_write_key(enc, i->first);
+ amf_write(enc, i->second);
+ }
+ amf_write_key(enc, "");
+ enc->buf += char(AMF0_OBJECT_END);
+}
+
+void amf_write_null(Encoder *enc)
+{
+ enc->buf += char(AMF0_NULL);
+}
+
+void amf_write(Encoder *enc, const AMFValue &value)
+{
+ switch (value.type()) {
+ case AMF_STRING:
+ amf_write(enc, value.as_string());
+ break;
+ case AMF_NUMBER:
+ amf_write(enc, value.as_number());
+ break;
+ case AMF_INTEGER:
+ amf_write(enc, value.as_integer());
+ break;
+ case AMF_BOOLEAN:
+ amf_write(enc, value.as_boolean());
+ break;
+ case AMF_OBJECT:
+ amf_write(enc, value.as_object());
+ break;
+ case AMF_ECMA_ARRAY:
+ amf_write_ecma(enc, value.as_object());
+ break;
+ case AMF_NULL:
+ amf_write_null(enc);
+ break;
+ default:
+ break;
+ }
+}
+
+unsigned int load_amf3_integer(Decoder *dec)
+{
+ unsigned int value = 0;
+ for (int i = 0; i < 4; ++i) {
+ uint8_t b = get_byte(dec);
+ if (i == 3) {
+ /* use all bits from 4th byte */
+ value = (value << 8) | b;
+ break;
+ }
+ value = (value << 7) | (b & 0x7f);
+ if ((b & 0x80) == 0)
+ break;
+ }
+ return value;
+}
+
+std::string amf_load_string(Decoder *dec)
+{
+ size_t str_len = 0;
+ uint8_t type = get_byte(dec);
+ if (dec->version == 3) {
+ if (type != AMF3_STRING) {
+ throw std::runtime_error("Expected a string");
+ }
+ str_len = load_amf3_integer(dec) / 2;
+
+ } else {
+ if (type != AMF0_STRING) {
+ throw std::runtime_error("Expected a string");
+ }
+ if (dec->pos + 2 > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ str_len = load_be16(&dec->buf[dec->pos]);
+ dec->pos += 2;
+ }
+ if (dec->pos + str_len > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ std::string s(dec->buf, dec->pos, str_len);
+ dec->pos += str_len;
+ return s;
+}
+
+double amf_load_number(Decoder *dec)
+{
+ if (get_byte(dec) != AMF0_NUMBER) {
+ throw std::runtime_error("Expected a string");
+ }
+ if (dec->pos + 8 > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ uint64_t val = ((uint64_t) load_be32(&dec->buf[dec->pos]) << 32) |
+ load_be32(&dec->buf[dec->pos + 4]);
+ double n = 0;
+//#if defined(__i386__) || defined(__x86_64__)
+ /* Flash uses same floating point format as x86 */
+ memcpy(&n, &val, 8);
+//#endif
+ dec->pos += 8;
+ return n;
+}
+
+int amf_load_integer(Decoder *dec)
+{
+ if (dec->version == 3) {
+ return load_amf3_integer(dec);
+ } else {
+ return amf_load_number(dec);
+ }
+}
+
+bool amf_load_boolean(Decoder *dec)
+{
+ if (get_byte(dec) != AMF0_BOOLEAN) {
+ throw std::runtime_error("Expected a boolean");
+ }
+ return get_byte(dec) != 0;
+}
+
+std::string amf_load_key(Decoder *dec)
+{
+ if (dec->pos + 2 > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ size_t str_len = load_be16(&dec->buf[dec->pos]);
+ dec->pos += 2;
+ if (dec->pos + str_len > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ std::string s(dec->buf, dec->pos, str_len);
+ dec->pos += str_len;
+ return s;
+}
+
+amf_object_t amf_load_object(Decoder *dec)
+{
+ amf_object_t object;
+ if (get_byte(dec) != AMF0_OBJECT) {
+ throw std::runtime_error("Expected an object");
+ }
+ while (1) {
+ std::string key = amf_load_key(dec);
+ if (key.empty())
+ break;
+ AMFValue value = amf_load(dec);
+ object.insert(std::make_pair(key, value));
+ }
+ if (get_byte(dec) != AMF0_OBJECT_END) {
+ throw std::runtime_error("expected object end");
+ }
+ return object;
+}
+
+amf_object_t amf_load_ecma(Decoder *dec)
+{
+ /* ECMA array is the same as object, with 4 extra zero bytes */
+ amf_object_t object;
+ if (get_byte(dec) != AMF0_ECMA_ARRAY) {
+ throw std::runtime_error("Expected an ECMA array");
+ }
+ if (dec->pos + 4 > dec->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ dec->pos += 4;
+ while (1) {
+ std::string key = amf_load_key(dec);
+ if (key.empty())
+ break;
+ AMFValue value = amf_load(dec);
+ object.insert(std::make_pair(key, value));
+ }
+ if (get_byte(dec) != AMF0_OBJECT_END) {
+ throw std::runtime_error("expected object end");
+ }
+ return object;
+}
+
+AMFValue amf_load(Decoder *dec)
+{
+ uint8_t type = peek(dec);
+ if (dec->version == 3) {
+ switch (type) {
+ case AMF3_STRING:
+ return AMFValue(amf_load_string(dec));
+ case AMF3_NUMBER:
+ return AMFValue(amf_load_number(dec));
+ case AMF3_INTEGER:
+ return AMFValue(amf_load_integer(dec));
+ case AMF3_FALSE:
+ dec->pos++;
+ return AMFValue(false);
+ case AMF3_TRUE:
+ dec->pos++;
+ return AMFValue(true);
+ case AMF3_OBJECT:
+ return AMFValue(amf_load_object(dec));
+ case AMF3_ARRAY:
+ return AMFValue(amf_load_ecma(dec));
+ case AMF3_NULL:
+ dec->pos++;
+ return AMFValue(AMF_NULL);
+ case AMF3_UNDEFINED:
+ dec->pos++;
+ return AMFValue(AMF_UNDEFINED);
+ default:
+ throw std::runtime_error(strf("Unsupported AMF3 type: %02x", type));
+ }
+ } else {
+ switch (type) {
+ case AMF0_STRING:
+ return AMFValue(amf_load_string(dec));
+ case AMF0_NUMBER:
+ return AMFValue(amf_load_number(dec));
+ case AMF0_BOOLEAN:
+ return AMFValue(amf_load_boolean(dec));
+ case AMF0_OBJECT:
+ return AMFValue(amf_load_object(dec));
+ case AMF0_ECMA_ARRAY:
+ return AMFValue(amf_load_ecma(dec));
+ case AMF0_NULL:
+ dec->pos++;
+ return AMFValue(AMF_NULL);
+ case AMF0_UNDEFINED:
+ dec->pos++;
+ return AMFValue(AMF_UNDEFINED);
+ default:
+ throw std::runtime_error(strf("Unsupported AMF0 type: %02x", type));
+ }
+ }
+}
diff --git a/amf.h b/amf.h
new file mode 100644
index 0000000..6d5738e
--- /dev/null
+++ b/amf.h
@@ -0,0 +1,167 @@
+/**
+ * @file amf.h
+ * @author Dean Zou (zoudingyuan@junjietech.com)
+ * @brief
+ * @version 1.0
+ * @date 2019-06-27
+ *
+ * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
+ *
+ */
+
+#ifndef AMF_H_
+#define AMF_H_
+
+#include <string>
+#include <map>
+#include <assert.h>
+
+enum AMFType {
+ AMF_NUMBER,
+ AMF_INTEGER,
+ AMF_BOOLEAN,
+ AMF_STRING,
+ AMF_OBJECT,
+ AMF_NULL,
+ AMF_UNDEFINED,
+ AMF_ECMA_ARRAY,
+};
+
+enum {
+ AMF0_NUMBER,
+ AMF0_BOOLEAN,
+ AMF0_STRING,
+ AMF0_OBJECT,
+ AMF0_MOVIECLIP,
+ AMF0_NULL,
+ AMF0_UNDEFINED,
+ AMF0_REFERENCE,
+ AMF0_ECMA_ARRAY,
+ AMF0_OBJECT_END,
+ AMF0_STRICT_ARRAY,
+ AMF0_DATE,
+ AMF0_LONG_STRING,
+ AMF0_UNSUPPORTED,
+ AMF0_RECORD_SET,
+ AMF0_XML_OBJECT,
+ AMF0_TYPED_OBJECT,
+ AMF0_SWITCH_AMF3,
+};
+
+enum {
+ AMF3_UNDEFINED,
+ AMF3_NULL,
+ AMF3_FALSE,
+ AMF3_TRUE,
+ AMF3_INTEGER,
+ AMF3_NUMBER,
+ AMF3_STRING,
+ AMF3_LEGACY_XML,
+ AMF3_DATE,
+ AMF3_ARRAY,
+ AMF3_OBJECT,
+ AMF3_XML,
+ AMF3_BYTE_ARRAY,
+};
+
+struct Decoder {
+ std::string buf;
+ size_t pos;
+ int version;
+};
+
+struct Encoder {
+ std::string buf;
+};
+
+class AMFValue;
+
+typedef std::map<std::string, AMFValue> amf_object_t;
+
+class AMFValue {
+public:
+ AMFValue(AMFType type = AMF_NULL);
+ AMFValue(const std::string &s);
+ AMFValue(double n);
+ AMFValue(int i);
+ AMFValue(bool b);
+ AMFValue(const amf_object_t &object);
+ AMFValue(const AMFValue &from);
+ ~AMFValue();
+
+ AMFType type() const { return m_type; }
+
+ std::string as_string() const
+ {
+ assert(m_type == AMF_STRING);
+ return *m_value.string;
+ }
+ double as_number() const
+ {
+ assert(m_type == AMF_NUMBER);
+ return m_value.number;
+ }
+ double as_integer() const
+ {
+ assert(m_type == AMF_INTEGER);
+ return m_value.integer;
+ }
+ bool as_boolean() const
+ {
+ assert(m_type == AMF_BOOLEAN);
+ return m_value.boolean;
+ }
+ amf_object_t as_object() const
+ {
+ assert(m_type == AMF_OBJECT);
+ return *m_value.object;
+ }
+
+ AMFValue get(const std::string &s) const
+ {
+ assert(m_type == AMF_OBJECT);
+ amf_object_t::const_iterator i = m_value.object->find(s);
+ if (i == m_value.object->end())
+ return AMFValue();
+ return i->second;
+ }
+
+ void set(const std::string &s, const AMFValue &val)
+ {
+ assert(m_type == AMF_OBJECT);
+ m_value.object->insert(std::make_pair(s, val));
+ }
+
+ void operator = (const AMFValue &from);
+
+private:
+ AMFType m_type;
+ union {
+ std::string *string;
+ double number;
+ int integer;
+ bool boolean;
+ amf_object_t *object;
+ } m_value;
+
+ void destroy();
+};
+
+void amf_write(Encoder *enc, const std::string &s);
+void amf_write(Encoder *enc, double n);
+void amf_write(Encoder *enc, bool b);
+void amf_write_key(Encoder *enc, const std::string &s);
+void amf_write(Encoder *enc, const amf_object_t &object);
+void amf_write_ecma(Encoder *enc, const amf_object_t &object);
+void amf_write_null(Encoder *enc);
+void amf_write(Encoder *enc, const AMFValue &value);
+
+std::string amf_load_string(Decoder *dec);
+double amf_load_number(Decoder *dec);
+bool amf_load_boolean(Decoder *dec);
+std::string amf_load_key(Decoder *dec);
+amf_object_t amf_load_object(Decoder *dec);
+amf_object_t amf_load_ecma(Decoder *dec);
+AMFValue amf_load(Decoder *dec);
+
+#endif
diff --git a/fileutils.c b/fileutils.c
new file mode 100644
index 0000000..a9d9efc
--- /dev/null
+++ b/fileutils.c
@@ -0,0 +1,72 @@
+#include "fileutils.h"
+
+bool fwrite_word16le(FILE* fp, uint16_t x) {
+ uint8_t buf[2];
+ buf[0] = x & 0xff;
+ buf[1] = (x >> 8) & 0xff;
+ int r = fwrite(buf, 1, 2, fp);
+ if (r != 2) return false;
+ return true;
+}
+
+bool fwrite_word32le(FILE* fp, uint32_t x) {
+ bool ret = false;
+ uint16_t buf[2];
+
+ buf[0] = x & 0xffff;
+ buf[1] = (x >> 16) & 0xffff;
+ ret = fwrite_word16le(fp, buf[0]);
+ if (!ret) return ret;
+ ret = fwrite_word16le(fp, buf[1]);
+ if (!ret) return ret;
+ return true;
+}
+
+bool fwrite_word16be(FILE* fp, uint16_t x) {
+ uint8_t buf[2];
+ buf[1] = x & 0xff;
+ buf[0] = (x >> 8) & 0xff;
+ int r = fwrite(buf, 1, 2, fp);
+ if (r != 2) return false;
+ return true;
+}
+
+bool fwrite_word32be(FILE* fp, uint32_t x) {
+ bool ret = false;
+ uint16_t buf[2];
+
+ buf[1] = x & 0xffff;
+ buf[0] = (x >> 16) & 0xffff;
+ ret = fwrite_word16be(fp, buf[0]);
+ if (!ret) return ret;
+ ret = fwrite_word16be(fp, buf[1]);
+ if (!ret) return ret;
+ return true;
+}
+
+bool fwrite_word24le(FILE* fp, uint32_t x) {
+ uint8_t buf[3];
+ buf[0] = x & 0xff;
+ buf[1] = (x >> 8) & 0xff;
+ buf[2] = (x >> 16) & 0xff;
+ int r = fwrite(buf, 1, 3, fp);
+ if (r != 3) return false;
+ return true;
+}
+
+bool fwrite_word24be(FILE* fp, uint32_t x) {
+ uint8_t buf[3];
+ buf[2] = x & 0xff;
+ buf[1] = (x >> 8) & 0xff;
+ buf[0] = (x >> 16) & 0xff;
+ int r = fwrite(buf, 1, 3, fp);
+ if (r != 3) return false;
+ return true;
+}
+
+bool fwrite_char(FILE* fp, uint8_t x) {
+ uint8_t buf[1];
+ buf[0] = x;
+ int ret = fwrite(buf, 1, 1, fp);
+ return ret == 1;
+} \ No newline at end of file
diff --git a/fileutils.h b/fileutils.h
new file mode 100644
index 0000000..b9888a3
--- /dev/null
+++ b/fileutils.h
@@ -0,0 +1,18 @@
+#ifndef FILEUTILS_H
+#define FILEUTILS_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+
+bool fwrite_word16le(FILE* fp, uint16_t x);
+bool fwrite_word24le(FILE* fp, uint32_t x);
+bool fwrite_word32le(FILE* fp, uint32_t x);
+
+bool fwrite_word16be(FILE* fp, uint16_t x);
+bool fwrite_word24be(FILE* fp, uint32_t x);
+bool fwrite_word32be(FILE* fp, uint32_t x);
+
+bool fwrite_char(FILE* fp, uint8_t x);
+
+#endif \ No newline at end of file
diff --git a/main.c b/main.c
new file mode 100644
index 0000000..a1052fd
--- /dev/null
+++ b/main.c
@@ -0,0 +1,56 @@
+#include <stdio.h>
+
+#include "rtmpserver.h"
+#include "fileutils.h"
+
+void on_rtmp_start(void *ctx) {
+ *(FILE**)ctx = fopen("test.flv", "wb");
+ FILE *fp = *(FILE**)ctx;
+ fwrite_char(fp, 'F');
+ fwrite_char(fp, 'L');
+ fwrite_char(fp, 'V');
+ fwrite_char(fp, 1);
+ fwrite_char(fp, 5);
+ fwrite_word32be(fp, 9);
+ fwrite_word32be(fp, 0);
+}
+
+void on_rtmp_stop(void *ctx) {
+ FILE *fp = *(FILE**)ctx;
+ fclose(fp);
+ exit(0);
+}
+
+void on_rtmp_video(void *ctx, int64_t timestamp, char *buf, size_t size) {
+ FILE *fp = *(FILE**)ctx;
+ fwrite_char(fp, 9);
+ fwrite_word24be(fp, size);
+ fwrite_word24be(fp, timestamp);
+ fwrite_char(fp, timestamp >> 24);
+ fwrite_word24be(fp, 0);
+ fwrite(buf, 1, size, fp);
+ fwrite_word32be(fp, size + 11);
+}
+
+void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) {
+ FILE *fp = *(FILE**)ctx;
+ fwrite_char(fp, 8);
+ fwrite_word24be(fp, size);
+ fwrite_word24be(fp, timestamp);
+ fwrite_char(fp, timestamp >> 24);
+ fwrite_word24be(fp, 0);
+ fwrite(buf, 1, size, fp);
+ fwrite_word32be(fp, size + 11);
+}
+
+int main() {
+ RtmpCallbacks rtmp_cbs = {
+ .on_audio = &on_rtmp_audio,
+ .on_video = &on_rtmp_video,
+ .on_start = &on_rtmp_start,
+ .on_stop = &on_rtmp_stop,
+ };
+ FILE* fp = NULL;
+ start_rtmpserver(rtmp_cbs, &fp);
+ return 0;
+} \ No newline at end of file
diff --git a/rtmp.h b/rtmp.h
new file mode 100644
index 0000000..bdc97cc
--- /dev/null
+++ b/rtmp.h
@@ -0,0 +1,74 @@
+/**
+ * @file rtmp.h
+ * @author Dean Zou (zoudingyuan@junjietech.com)
+ * @brief
+ * @version 1.0
+ * @date 2019-06-27
+ *
+ * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
+ *
+ */
+
+#ifndef RTMP_H_
+#define RTMP_H_
+
+#include <stdint.h>
+
+#define PORT 1935
+
+#define DEFAULT_CHUNK_LEN 128
+
+#define PACKED __attribute__((packed))
+
+#define HANDSHAKE_PLAINTEXT 0x03
+
+#define RANDOM_LEN (1536 - 8)
+
+#define MSG_SET_CHUNK 0x01
+#define MSG_BYTES_READ 0x03
+#define MSG_USER_CONTROL 0x04
+#define MSG_RESPONSE 0x05
+#define MSG_REQUEST 0x06
+#define MSG_AUDIO 0x08
+#define MSG_VIDEO 0x09
+#define MSG_INVOKE3 0x11 /* AMF3 */
+#define MSG_NOTIFY 0x12
+#define MSG_OBJECT 0x13
+#define MSG_INVOKE 0x14 /* AMF0 */
+#define MSG_FLASH_VIDEO 0x16
+
+#define CONTROL_CLEAR_STREAM 0x00
+#define CONTROL_CLEAR_BUFFER 0x01
+#define CONTROL_STREAM_DRY 0x02
+#define CONTROL_BUFFER_TIME 0x03
+#define CONTROL_RESET_STREAM 0x04
+#define CONTROL_PING 0x06
+#define CONTROL_REQUEST_VERIFY 0x1a
+#define CONTROL_RESPOND_VERIFY 0x1b
+#define CONTROL_BUFFER_EMPTY 0x1f
+#define CONTROL_BUFFER_READY 0x20
+
+#define CONTROL_ID 0
+#define STREAM_ID 1337
+
+#define CHAN_CONTROL 2
+#define CHAN_RESULT 3
+#define CHAN_STREAM 4
+
+#define FLV_KEY_FRAME 0x01
+#define FLV_INTER_FRAME 0x02
+
+struct Handshake {
+ uint8_t flags[8];
+ uint8_t random[RANDOM_LEN];
+} PACKED;
+
+struct RTMP_Header {
+ uint8_t flags;
+ char timestamp[3];
+ char msg_len[3];
+ uint8_t msg_type;
+ char endpoint[4]; /* Note, this is little-endian while others are BE */
+} PACKED;
+
+#endif
diff --git a/rtmpserver.cpp b/rtmpserver.cpp
new file mode 100644
index 0000000..3d0d4f0
--- /dev/null
+++ b/rtmpserver.cpp
@@ -0,0 +1,820 @@
+#include "amf.h"
+#include "rtmputils.h"
+#include "rtmp.h"
+#include <vector>
+#include <stdexcept>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <sys/poll.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "rtmpserver.h"
+
+#define APP_NAME "live"
+
+RtmpCallbacks g_rtmp_server_cbs;
+void *g_rtmp_server_ctx;
+
+struct RTMP_Message {
+ uint8_t type;
+ size_t len;
+ unsigned long timestamp;
+ uint32_t endpoint;
+ std::string buf;
+};
+
+struct Client {
+ int fd;
+ bool publisher;
+ bool playing; /* Wants to receive the stream? */
+ bool ready; /* Wants to receive and seen a keyframe */
+ RTMP_Message messages[64];
+ std::string buf;
+ std::string send_queue;
+ std::string path;
+ size_t chunk_len;
+ uint32_t written_seq;
+ uint32_t read_seq;
+};
+
+namespace {
+
+amf_object_t metadata;
+int listen_fd;
+std::vector<pollfd> poll_table;
+std::vector<Client *> clients;
+
+int set_nonblock(int fd, bool enabled)
+{
+ int flags = fcntl(fd, F_GETFL) & ~O_NONBLOCK;
+ if (enabled) {
+ flags |= O_NONBLOCK;
+ }
+ return fcntl(fd, F_SETFL, flags);
+}
+
+size_t recv_all(int fd, void *buf, size_t len)
+{
+ size_t pos = 0;
+ while (pos < len) {
+ ssize_t bytes = recv(fd, (char *) buf + pos, len - pos, 0);
+ if (bytes < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+ throw std::runtime_error(strf("unable to recv: %s", strerror(errno)));
+ }
+ if (bytes == 0)
+ break;
+ pos += bytes;
+ }
+ return pos;
+}
+
+size_t send_all(int fd, const void *buf, size_t len)
+{
+ size_t pos = 0;
+ while (pos < len) {
+ ssize_t written = send(fd, (const char *) buf + pos, len - pos, 0);
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+ throw std::runtime_error(strf("unable to send: %s", strerror(errno)));
+ }
+ if (written == 0)
+ break;
+ pos += written;
+ }
+ return pos;
+}
+
+bool is_safe(uint8_t b)
+{
+ return b >= ' ' && b < 128;
+}
+
+void hexdump(const void *buf, size_t len)
+{
+ const uint8_t *data = (const uint8_t *) buf;
+ for (size_t i = 0; i < len; i += 16) {
+ for (int j = 0; j < 16; ++j) {
+ if (i + j < len)
+ debug("%.2x ", data[i + j]);
+ else
+ debug(" ");
+ }
+ for (int j = 0; j < 16; ++j) {
+ if (i + j < len) {
+ putc(is_safe(data[i + j]) ? data[i + j] : '.',
+ stdout);
+ } else {
+ putc(' ', stdout);
+ }
+ }
+ putc('\n', stdout);
+ }
+}
+
+void try_to_send(void *data)
+{
+ Client *client = (Client *)data;
+ size_t len = client->send_queue.size();
+ if (len > 4096)
+ len = 4096;
+
+ ssize_t written = send(client->fd, client->send_queue.data(), len, 0);
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ return;
+ throw std::runtime_error(strf("unable to write to a client: %s",
+ strerror(errno)));
+ }
+
+ client->send_queue.erase(0, written);
+}
+
+void rtmp_send(Client *client, uint8_t type, uint32_t endpoint,
+ const std::string &buf, unsigned long timestamp = 0,
+ int channel_num = CHAN_CONTROL)
+{
+ if (endpoint == STREAM_ID) {
+ /*
+ * For some unknown reason, stream-related msgs must be sent
+ * on a specific channel.
+ */
+ channel_num = CHAN_STREAM;
+ }
+
+ RTMP_Header header;
+ header.flags = (channel_num & 0x3f) | (0 << 6);
+ header.msg_type = type;
+ set_be24(header.timestamp, timestamp);
+ set_be24(header.msg_len, buf.size());
+ set_le32(header.endpoint, endpoint);
+
+ client->send_queue.append((char *) &header, sizeof header);
+ client->written_seq += sizeof header;
+
+ size_t pos = 0;
+ while (pos < buf.size()) {
+ if (pos) {
+ uint8_t flags = (channel_num & 0x3f) | (3 << 6);
+ client->send_queue += char(flags);
+
+ client->written_seq += 1;
+ }
+
+ size_t chunk = buf.size() - pos;
+ if (chunk > client->chunk_len)
+ chunk = client->chunk_len;
+ client->send_queue.append(buf, pos, chunk);
+
+ client->written_seq += chunk;
+ pos += chunk;
+ }
+
+ try_to_send(client);
+}
+
+void send_reply(Client *client, double txid, const AMFValue &reply = AMFValue(),
+ const AMFValue &status = AMFValue())
+{
+ if (txid <= 0.0)
+ return;
+ Encoder invoke;
+ amf_write(&invoke, std::string("_result"));
+ amf_write(&invoke, txid);
+ amf_write(&invoke, reply);
+ amf_write(&invoke, status);
+ rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf, 0, CHAN_RESULT);
+}
+
+void handle_connect(Client *client, double txid, Decoder *dec)
+{
+ amf_object_t params = amf_load_object(dec);
+ std::string app = get(params, std::string("app")).as_string();
+ std::string ver = "(unknown)";
+
+ AMFValue flashver = get(params, std::string("flashVer"));
+ if (flashver.type() == AMF_STRING) {
+ ver = flashver.as_string();
+ }
+
+ /*
+ if (app != APP_NAME) {
+ throw std::runtime_error("Unsupported application: " + app);
+ }
+ */
+
+ printf("connect: %s (version %s)\n", app.c_str(), ver.c_str());
+
+ amf_object_t version;
+ version.insert(std::make_pair("fmsVer", std::string("FMS/4,5,1,484")));
+ version.insert(std::make_pair("capabilities", 255.0));
+ version.insert(std::make_pair("mode", 1.0));
+
+ amf_object_t status;
+ status.insert(std::make_pair("code", std::string("NetConnection.Connect.Success")));
+ /* report support for AMF3 */
+ // status.insert(std::make_pair("objectEncoding", 3.0));
+
+ send_reply(client, txid, version, status);
+
+/*
+ uint32_t chunk_len = htonl(1024);
+ std::string set_chunk((char *) &chunk_len, 4);
+ rtmp_send(client, MSG_SET_CHUNK, CONTROL_ID, set_chunk, 0,
+ MEDIA_CHANNEL);
+
+ client->chunk_len = 1024;
+*/
+}
+
+int publishernum = 0;
+
+void handle_fcpublish(Client *client, double txid, Decoder *dec)
+{
+ if (publishernum > 0) {
+ throw std::runtime_error{"only one publisher."};
+ }
+ publishernum = 1;
+ g_rtmp_server_cbs.on_start(g_rtmp_server_ctx);
+ client->publisher = true;
+ printf( "publisher connected.");
+
+ amf_load(dec); /* NULL */
+
+ std::string path = amf_load_string(dec);
+ debug("fcpublish %s\n", path.c_str());
+
+ client->path = path;
+
+ amf_object_t status;
+ status.insert(std::make_pair("code", std::string("NetStream.Publish.Start")));
+ status.insert(std::make_pair("description", path));
+
+ Encoder invoke;
+ amf_write(&invoke, std::string("onFCPublish"));
+ amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
+ amf_write(&invoke, status);
+ rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf);
+
+ send_reply(client, txid);
+}
+
+void handle_createstream(Client *client, double txid, Decoder *dec)
+{
+ send_reply(client, txid, AMFValue(), double(STREAM_ID));
+}
+
+void handle_publish(Client *client, double txid, Decoder *dec)
+{
+ amf_load(dec); /* NULL */
+
+ std::string path = amf_load_string(dec);
+ debug("publish %s\n", path.c_str());
+
+ client->path = path;
+
+ amf_object_t status;
+ status.insert(std::make_pair("level", std::string("status")));
+ status.insert(std::make_pair("code", std::string("NetStream.Publish.Start")));
+ status.insert(std::make_pair("description", std::string("Stream is now published.")));
+ status.insert(std::make_pair("details", path));
+
+ Encoder invoke;
+ amf_write(&invoke, std::string("onStatus"));
+ amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
+ amf_write(&invoke, status);
+ rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
+
+ send_reply(client, txid);
+}
+
+void start_playback(Client *client)
+{
+ throw std::runtime_error{"playback not supported."};
+}
+
+void handle_play(Client *client, double txid, Decoder *dec)
+{
+ amf_load(dec); /* NULL */
+
+ std::string path = amf_load_string(dec);
+
+ debug("play %s\n", path.c_str());
+ client->path = path;
+
+ start_playback(client);
+
+ send_reply(client, txid);
+}
+
+void handle_play2(Client *client, double txid, Decoder *dec)
+{
+ amf_load(dec); /* NULL */
+
+ amf_object_t params = amf_load_object(dec);
+ std::string path = get(params, std::string("streamName")).as_string();
+
+ debug("play %s\n", path.c_str());
+ client->path = path;
+ start_playback(client);
+
+ send_reply(client, txid);
+}
+
+void handle_pause(Client *client, double txid, Decoder *dec)
+{
+ amf_load(dec); /* NULL */
+
+ bool paused = amf_load_boolean(dec);
+
+ if (paused) {
+ debug("pausing\n");
+
+ amf_object_t status;
+ status.insert(std::make_pair("level", std::string("status")));
+ status.insert(std::make_pair("code", std::string("NetStream.Pause.Notify")));
+ status.insert(std::make_pair("description", std::string("Pausing.")));
+
+ Encoder invoke;
+ amf_write(&invoke, std::string("onStatus"));
+ amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
+ amf_write(&invoke, status);
+ rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
+ client->playing = false;
+ } else {
+ start_playback(client);
+ }
+
+ send_reply(client, txid);
+}
+
+void handle_setdataframe(Client *client, Decoder *dec)
+{
+ if(client->publisher == false) {
+ printf("not a publisher");
+ throw std::runtime_error("not a publisher");
+ }
+
+ printf( "client paht is %s",client->path.c_str());
+
+ std::string type = amf_load_string(dec);
+ if (type != "onMetaData") {
+ throw std::runtime_error("can only set metadata");
+ }
+
+ metadata = amf_load_ecma(dec);
+
+ Encoder notify;
+ amf_write(&notify, std::string("onMetaData"));
+ amf_write_ecma(&notify, metadata);
+
+ FOR_EACH(std::vector<Client *>, i, clients) {
+ Client *receiver = *i;
+ if (receiver != NULL && receiver->playing && receiver->path == client->path) {
+ rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf);
+ }
+ }
+}
+
+void handle_invoke(Client *client, const RTMP_Message *msg, Decoder *dec)
+{
+ std::string method = amf_load_string(dec);
+ double txid = amf_load_number(dec);
+
+ debug( "invoked %s txid %f\n", method.c_str(), txid);
+
+ if (msg->endpoint == CONTROL_ID) {
+ if (method == "connect") {
+ handle_connect(client, txid, dec);
+ } else if (method == "FCPublish") {
+ handle_fcpublish(client, txid, dec);
+ } else if (method == "createStream") {
+ handle_createstream(client, txid, dec);
+ }
+
+ } else if (msg->endpoint == STREAM_ID) {
+ if (method == "publish") {
+ handle_publish(client, txid, dec);
+ } else if (method == "play") {
+ handle_play(client, txid, dec);
+ } else if (method == "play2") {
+ handle_play2(client, txid, dec);
+ } else if (method == "pause") {
+ handle_pause(client, txid, dec);
+ }
+ }
+}
+
+void handle_message(Client *client, RTMP_Message *msg)
+{
+
+ debug("RTMP message %02x, len %zu, timestamp %ld\n", msg->type, msg->len,
+ msg->timestamp);
+
+ size_t pos = 0;
+
+ switch (msg->type) {
+ case MSG_BYTES_READ:
+ if (pos + 4 > msg->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ client->read_seq = load_be32(&msg->buf[pos]);
+ debug("%d in queue\n",
+ int(client->written_seq - client->read_seq));
+ break;
+
+ case MSG_SET_CHUNK:
+ if (pos + 4 > msg->buf.size()) {
+ throw std::runtime_error("Not enough data");
+ }
+ client->chunk_len = load_be32(&msg->buf[pos]);
+ debug("chunk size set to %zu\n", client->chunk_len);
+ break;
+
+ case MSG_INVOKE: {
+ debug("handling message invoke 0 \n");
+ Decoder dec;
+ dec.version = 0;
+ dec.buf = msg->buf;
+ dec.pos = 0;
+ handle_invoke(client, msg, &dec);
+ }
+ break;
+
+ case MSG_INVOKE3: {
+ debug("handling message invoke 3 \n");
+ Decoder dec;
+ dec.version = 0;
+ dec.buf = msg->buf;
+ dec.pos = 1;
+ handle_invoke(client, msg, &dec);
+ }
+ break;
+
+ case MSG_NOTIFY: {
+ Decoder dec;
+ dec.version = 0;
+ dec.buf = msg->buf;
+ dec.pos = 0;
+ std::string type = amf_load_string(&dec);
+ debug("notify %s\n", type.c_str());
+ if (msg->endpoint == STREAM_ID) {
+ if (type == "@setDataFrame") {
+ handle_setdataframe(client, &dec);
+ }
+ }
+ }
+ break;
+
+ case MSG_AUDIO:
+ if(client->publisher == false) {
+ throw std::runtime_error("not a publisher");
+ }
+ g_rtmp_server_cbs.on_audio(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size());
+ // FOR_EACH(std::vector<Client *>, i, clients) {
+ // Client *receiver = *i;
+ // if (receiver != NULL && receiver->ready) {
+ // rtmp_send(receiver, MSG_AUDIO, STREAM_ID,
+ // msg->buf, msg->timestamp);
+ // }
+ // }
+ break;
+ case MSG_VIDEO: {
+ if(client->publisher == false){
+ throw std::runtime_error("not a publisher");
+ }
+ // uint8_t flags = msg->buf[0];
+ g_rtmp_server_cbs.on_video(g_rtmp_server_ctx, msg->timestamp, (char*)msg->buf.c_str(), msg->buf.size());
+ // FOR_EACH(std::vector<Client *>, i, clients) {
+ // Client *receiver = *i;
+ // if (receiver != NULL && receiver->playing &&
+ // client->path == receiver->path) {
+ // if (flags >> 4 == FLV_KEY_FRAME &&
+ // !receiver->ready) {
+ // std::string control;
+ // uint16_t type = htons(CONTROL_CLEAR_STREAM);
+ // control.append((char *) &type, 2);
+ // uint32_t stream = htonl(STREAM_ID);
+ // control.append((char *) &stream, 4);
+ // rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control);
+ // receiver->ready = true;
+ // }
+ // if (receiver->ready) {
+ // static int flag = 0;
+ // if(flag < 10) {
+ // flag ++;
+ // }
+ // rtmp_send(receiver, MSG_VIDEO,
+ // STREAM_ID, msg->buf,
+ // msg->timestamp);
+ // }
+ // }
+ // }
+ }
+ break;
+
+ case MSG_FLASH_VIDEO:
+ //printf( "streaming FLV not supported");
+ throw std::runtime_error("streaming FLV not supported");
+ break;
+
+ default:
+ debug("unhandled message: %02x\n", msg->type);
+ hexdump(msg->buf.data(), msg->buf.size());
+ break;
+ }
+}
+
+/* TODO: Make this asynchronous */
+void do_handshake(Client *client)
+{
+ Handshake serversig;
+ Handshake clientsig;
+
+ uint8_t c;
+ if (recv_all(client->fd, &c, 1) < 1)
+ return;
+ if (c != HANDSHAKE_PLAINTEXT) {
+ //printf( "only plaintext handshake supported");
+ throw std::runtime_error("only plaintext handshake supported");
+ }
+
+ if (send_all(client->fd, &c, 1) < 1)
+ return;
+
+ memset(&serversig, 0, sizeof serversig);
+ serversig.flags[0] = 0x03;
+ for (int i = 0; i < RANDOM_LEN; ++i) {
+ serversig.random[i] = rand();
+ }
+
+ if (send_all(client->fd, &serversig, sizeof serversig) < sizeof serversig)
+ return;
+
+ /* Echo client's signature back */
+ if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
+ return;
+ if (send_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
+ return;
+
+ if (recv_all(client->fd, &clientsig, sizeof serversig) < sizeof serversig)
+ return;
+ if (memcmp(serversig.random, clientsig.random, RANDOM_LEN) != 0) {
+ //printf( "invalid handshake");
+ throw std::runtime_error("invalid handshake");
+ }
+
+ client->read_seq = 1 + sizeof serversig * 2;
+ client->written_seq = 1 + sizeof serversig * 2;
+}
+
+void recv_from_client(void *data)
+{
+ Client *client = (Client *)data;
+
+ std::string chunk(4096, 0);
+ ssize_t got = recv(client->fd, &chunk[0], chunk.size(), 0);
+ if (got == 0) {
+ //printf( "end of life from a client");
+ throw std::runtime_error("EOF from a client");
+ } else if (got < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ return;
+ //printf( "unable to read from a client: %s",strerror(errno));
+ throw std::runtime_error(strf("unable to read from a client: %s",strerror(errno)));
+ }
+ client->buf.append(chunk, 0, got);
+
+ //printf( "got data and length is %d",client->buf.size());
+
+ while (!client->buf.empty()) {
+ uint8_t flags = client->buf[0];
+
+ static const size_t HEADER_LENGTH[] = {12, 8, 4, 1};
+ size_t header_len = HEADER_LENGTH[flags >> 6];
+
+ if (client->buf.size() < header_len) {
+ /* need more data */
+ break;
+ }
+
+ RTMP_Header header;
+ memcpy(&header, client->buf.data(), header_len);
+
+ RTMP_Message *msg = &client->messages[flags & 0x3f];
+
+ if (header_len >= 8) {
+ msg->len = load_be24(header.msg_len);
+ if (msg->len < msg->buf.size()) {
+ throw std::runtime_error("invalid msg length");
+ }
+ msg->type = header.msg_type;
+ }
+ if (header_len >= 12) {
+ msg->endpoint = load_le32(header.endpoint);
+ }
+
+ if (msg->len == 0) {
+ throw std::runtime_error("message without a header");
+ }
+ size_t chunk = msg->len - msg->buf.size();
+ if (chunk > client->chunk_len)
+ chunk = client->chunk_len;
+
+ if (client->buf.size() < header_len + chunk) {
+ /* need more data */
+ break;
+ }
+
+ if (header_len >= 4) {
+ unsigned long ts = load_be24(header.timestamp);
+ if (ts == 0xffffff) {
+ throw std::runtime_error("ext timestamp not supported");
+ }
+ if (header_len < 12) {
+ ts += msg->timestamp;
+ }
+ msg->timestamp = ts;
+ }
+
+ msg->buf.append(client->buf, header_len, chunk);
+ client->buf.erase(0, header_len + chunk);
+
+ if (msg->buf.size() == msg->len) {
+ handle_message(client, msg);
+ msg->buf.clear();
+ }
+ }
+}
+
+Client *new_client()
+{
+ sockaddr_in sin;
+ socklen_t addrlen = sizeof sin;
+ int fd = accept(listen_fd, (sockaddr *) &sin, &addrlen);
+ if (fd < 0) {
+ printf("Unable to accept a client: %s\n", strerror(errno));
+ return NULL;
+ }
+
+ Client *client = new Client;
+ client->publisher = false;
+ client->playing = false;
+ client->ready = false;
+ client->fd = fd;
+ client->written_seq = 0;
+ client->read_seq = 0;
+ client->chunk_len = DEFAULT_CHUNK_LEN;
+ for (int i = 0; i < 64; ++i) {
+ client->messages[i].timestamp = 0;
+ client->messages[i].len = 0;
+ }
+
+ try {
+ do_handshake(client);
+ } catch (const std::runtime_error &e) {
+ printf("handshake failed: %s\n", e.what());
+ close(fd);
+ delete client;
+ return NULL;
+ }
+
+ set_nonblock(fd, true);
+
+ pollfd entry;
+ entry.events = POLLIN;
+ entry.revents = 0;
+ entry.fd = fd;
+ poll_table.push_back(entry);
+ clients.push_back(client);
+
+ return client;
+}
+
+void close_client(Client *client, size_t i)
+{
+ clients.erase(clients.begin() + i);
+ poll_table.erase(poll_table.begin() + i);
+ close(client->fd);
+
+ if (client->publisher == true) {
+ printf("publisher disconnected.\n");
+ client->publisher = false;
+ publishernum = 0;
+ g_rtmp_server_cbs.on_stop(g_rtmp_server_ctx);
+ FOR_EACH(std::vector<Client *>, i, clients) {
+ Client *client = *i;
+ if (client != NULL) {
+ client->ready = false;
+ }
+ }
+ }
+ delete client;
+}
+
+void do_poll(void)
+{
+ for (size_t i = 0; i < poll_table.size(); ++i) {
+ Client *client = clients[i];
+ if (client != NULL) {
+ if (!client->send_queue.empty()) {
+ //debug("waiting for pollout\n");
+ poll_table[i].events = POLLIN | POLLOUT;
+ } else {
+ poll_table[i].events = POLLIN;
+ }
+ }
+ }
+
+ if (poll(&poll_table[0], poll_table.size(), -1) < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ return;
+ throw std::runtime_error(strf("poll() failed: %s",
+ strerror(errno)));
+ }
+
+ for (size_t i = 0; i < poll_table.size(); ++i) {
+ Client *client = clients[i];
+ if (poll_table[i].revents & POLLOUT) {
+ try {
+ try_to_send(client);
+ } catch (const std::runtime_error &e) {
+ printf("client error: %s\n", e.what());
+ close_client(client, i);
+ --i;
+ continue;
+ }
+ }
+ if (poll_table[i].revents & POLLIN) {
+ if (client == NULL) {
+ new_client();
+ } else try {
+ recv_from_client(client);
+ } catch (const std::runtime_error &e) {
+ printf("client error: %s\n", e.what());
+ close_client(client, i);
+ --i;
+ }
+ }
+ }
+}
+
+}
+
+int g_rtmp_server_quit = 0;
+
+void start_rtmpserver(RtmpCallbacks cbs, void *ctx) {
+ g_rtmp_server_cbs = cbs;
+ g_rtmp_server_ctx = ctx;
+ try {
+ listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (listen_fd < 0)
+ return;
+ int opt = 1;
+ if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
+ perror("setsockopt SO_REUSEADDR");
+ exit(EXIT_FAILURE);
+ }
+ sockaddr_in sin;
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(PORT);
+ sin.sin_addr.s_addr = INADDR_ANY;
+ if (bind(listen_fd, (sockaddr *) &sin, sizeof sin) < 0) {
+ throw std::runtime_error(strf("Unable to listen: %s",strerror(errno)));
+ return;
+ }
+
+ listen(listen_fd, 10);
+
+ pollfd entry;
+ entry.events = POLLIN;
+ entry.revents = 0;
+ entry.fd = listen_fd;
+ poll_table.push_back(entry);
+ clients.push_back(NULL);
+
+ for (;;) {
+ if (g_rtmp_server_quit) {
+ return;
+ }
+ do_poll();
+ }
+ return;
+ } catch (const std::runtime_error &e) {
+ fprintf(stderr, "ERROR: %s\n", e.what());
+ return;
+ }
+}
diff --git a/rtmpserver.h b/rtmpserver.h
new file mode 100644
index 0000000..cae56a5
--- /dev/null
+++ b/rtmpserver.h
@@ -0,0 +1,25 @@
+#ifndef RTMPSERVER_H_
+#define RTMPSERVER_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ void (*on_start)(void *ctx);
+ void (*on_stop)(void *ctx);
+ void (*on_video)(void* ctx, int64_t timestamp, char *buf, size_t size);
+ void (*on_audio)(void* ctx, int64_t timestamp, char *buf, size_t size);
+} RtmpCallbacks;
+
+void start_rtmpserver(RtmpCallbacks cbs, void *ctx);
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif \ No newline at end of file
diff --git a/rtmputils.cpp b/rtmputils.cpp
new file mode 100644
index 0000000..59088f1
--- /dev/null
+++ b/rtmputils.cpp
@@ -0,0 +1,67 @@
+#include "rtmputils.h"
+#include <string>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <arpa/inet.h>
+
+/*
+ * Used to do unaligned loads on archs that don't support them. GCC can mostly
+ * optimize these away.
+ */
+uint32_t load_be32(const void *p)
+{
+ uint32_t val;
+ memcpy(&val, p, sizeof val);
+ return ntohl(val);
+}
+
+uint16_t load_be16(const void *p)
+{
+ uint16_t val;
+ memcpy(&val, p, sizeof val);
+ return ntohs(val);
+}
+
+uint32_t load_le32(const void *p)
+{
+ const uint8_t *data = (const uint8_t *) p;
+ return data[0] | ((uint32_t) data[1] << 8) |
+ ((uint32_t) data[2] << 16) | ((uint32_t) data[3] << 24);
+}
+
+uint32_t load_be24(const void *p)
+{
+ const uint8_t *data = (const uint8_t *) p;
+ return data[2] | ((uint32_t) data[1] << 8) | ((uint32_t) data[0] << 16);
+}
+
+void set_be24(void *p, uint32_t val)
+{
+ uint8_t *data = (uint8_t *) p;
+ data[0] = val >> 16;
+ data[1] = val >> 8;
+ data[2] = val;
+}
+
+void set_le32(void *p, uint32_t val)
+{
+ uint8_t *data = (uint8_t *) p;
+ data[0] = val;
+ data[1] = val >> 8;
+ data[2] = val >> 16;
+ data[3] = val >> 24;
+}
+
+const std::string strf(const char *fmt, ...)
+{
+ va_list vl;
+ va_start(vl, fmt);
+ char *buf = NULL;
+ vasprintf(&buf, fmt, vl);
+ va_end(vl);
+ std::string s(buf);
+ free(buf);
+ return s;
+}
diff --git a/rtmputils.h b/rtmputils.h
new file mode 100644
index 0000000..f873232
--- /dev/null
+++ b/rtmputils.h
@@ -0,0 +1,49 @@
+/**
+ * @file utils.h
+ * @author Dean Zou (zoudingyuan@junjietech.com)
+ * @brief
+ * @version 1.0
+ * @date 2019-06-27
+ *
+ * @copyright Copyright (c) - 2019 JunJie Intelligence(Shenzhen) Co.,LTD
+ *
+ */
+#ifndef RTMPUTILS_H_
+#define RTMPUTILS_H_
+
+#include <map>
+#include <string>
+#include <stdio.h>
+#include <stdint.h>
+
+#define FOR_EACH(type, i, where) \
+ for (typename type::iterator i = (where).begin(); i != (where).end(); ++i)
+
+#define FOR_EACH_CONST(type, i, where) \
+ for (typename type::const_iterator i = (where).begin(); \
+ i != (where).end(); ++i)
+
+// #define debug(fmt...) fprintf(stderr, fmt)
+
+#define debug(fmt...)
+
+template<class Key, class Value>
+Value get(const std::map<Key, Value> &map, const Key &k,
+ const Value &def = Value())
+{
+ typename std::map<Key, Value>::const_iterator i = map.find(k);
+ if (i == map.end())
+ return def;
+ return i->second;
+}
+
+uint32_t load_be32(const void *p);
+uint16_t load_be16(const void *p);
+uint32_t load_be24(const void *p);
+uint32_t load_le32(const void *p);
+void set_be24(void *p, uint32_t val);
+void set_le32(void *p, uint32_t val);
+
+const std::string strf(const char *fmt, ...);
+
+#endif