diff options
| -rw-r--r-- | .gitignore | 5 | ||||
| -rw-r--r-- | Makefile | 3 | ||||
| -rw-r--r-- | fileutils.c | 72 | ||||
| -rw-r--r-- | fileutils.h | 18 | ||||
| -rw-r--r-- | main.c | 78 | ||||
| -rw-r--r-- | ringbuf.c | 177 | ||||
| -rw-r--r-- | ringbuf.h | 44 | ||||
| -rw-r--r-- | transmuxer.c | 168 | ||||
| -rw-r--r-- | transmuxer.h | 20 |
9 files changed, 462 insertions, 123 deletions
@@ -1,3 +1,6 @@ *.o ezlive -*.flv
\ No newline at end of file +*.flv +*.ts +*.m3u8 +a.out
\ No newline at end of file @@ -2,6 +2,7 @@ CC := gcc CXX := g++ CFLAGS := -g -Wall CXXFLAGS := -g -Wall -std=c++14 +LDFLAGS := -lavformat -lavutil -lavcodec C_SOURCES := $(shell find . -maxdepth 1 -name '*.c') CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp') @@ -14,7 +15,7 @@ TARGET := ezlive all: $(TARGET) $(TARGET): $(C_OBJS) $(CPP_OBJS) - $(CXX) $(C_OBJS) $(CPP_OBJS) -o $@ + $(CXX) $(C_OBJS) $(CPP_OBJS) -o $@ $(LDFLAGS) %.o: %.c $(CC) $(CFLAGS) -c $< -o $@ diff --git a/fileutils.c b/fileutils.c deleted file mode 100644 index a9d9efc..0000000 --- a/fileutils.c +++ /dev/null @@ -1,72 +0,0 @@ -#include "fileutils.h" - -bool fwrite_word16le(FILE* fp, uint16_t x) { - uint8_t buf[2]; - buf[0] = x & 0xff; - buf[1] = (x >> 8) & 0xff; - int r = fwrite(buf, 1, 2, fp); - if (r != 2) return false; - return true; -} - -bool fwrite_word32le(FILE* fp, uint32_t x) { - bool ret = false; - uint16_t buf[2]; - - buf[0] = x & 0xffff; - buf[1] = (x >> 16) & 0xffff; - ret = fwrite_word16le(fp, buf[0]); - if (!ret) return ret; - ret = fwrite_word16le(fp, buf[1]); - if (!ret) return ret; - return true; -} - -bool fwrite_word16be(FILE* fp, uint16_t x) { - uint8_t buf[2]; - buf[1] = x & 0xff; - buf[0] = (x >> 8) & 0xff; - int r = fwrite(buf, 1, 2, fp); - if (r != 2) return false; - return true; -} - -bool fwrite_word32be(FILE* fp, uint32_t x) { - bool ret = false; - uint16_t buf[2]; - - buf[1] = x & 0xffff; - buf[0] = (x >> 16) & 0xffff; - ret = fwrite_word16be(fp, buf[0]); - if (!ret) return ret; - ret = fwrite_word16be(fp, buf[1]); - if (!ret) return ret; - return true; -} - -bool fwrite_word24le(FILE* fp, uint32_t x) { - uint8_t buf[3]; - buf[0] = x & 0xff; - buf[1] = (x >> 8) & 0xff; - buf[2] = (x >> 16) & 0xff; - int r = fwrite(buf, 1, 3, fp); - if (r != 3) return false; - return true; -} - -bool fwrite_word24be(FILE* fp, uint32_t x) { - uint8_t buf[3]; - buf[2] = x & 0xff; - buf[1] = (x >> 8) & 0xff; - buf[0] = (x >> 16) & 0xff; - int r = fwrite(buf, 1, 3, fp); - if (r != 3) return false; - return true; -} - -bool fwrite_char(FILE* fp, uint8_t x) { - uint8_t buf[1]; - buf[0] = x; - int ret = fwrite(buf, 1, 1, fp); - return ret == 1; -}
\ No newline at end of file diff --git a/fileutils.h b/fileutils.h deleted file mode 100644 index b9888a3..0000000 --- a/fileutils.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef FILEUTILS_H -#define FILEUTILS_H - -#include <stdbool.h> -#include <stdint.h> -#include <stdio.h> - -bool fwrite_word16le(FILE* fp, uint16_t x); -bool fwrite_word24le(FILE* fp, uint32_t x); -bool fwrite_word32le(FILE* fp, uint32_t x); - -bool fwrite_word16be(FILE* fp, uint16_t x); -bool fwrite_word24be(FILE* fp, uint32_t x); -bool fwrite_word32be(FILE* fp, uint32_t x); - -bool fwrite_char(FILE* fp, uint8_t x); - -#endif
\ No newline at end of file @@ -1,56 +1,72 @@ #include <stdio.h> #include "rtmpserver.h" -#include "fileutils.h" +#include "ringbuf.h" +#include "transmuxer.h" + +typedef struct { + RingBuffer *ringbuf; + Transmuxer transmuxer; +} MainCtx; void on_rtmp_start(void *ctx) { - *(FILE**)ctx = fopen("test.flv", "wb"); - FILE *fp = *(FILE**)ctx; - fwrite_char(fp, 'F'); - fwrite_char(fp, 'L'); - fwrite_char(fp, 'V'); - fwrite_char(fp, 1); - fwrite_char(fp, 5); - fwrite_word32be(fp, 9); - fwrite_word32be(fp, 0); + MainCtx *main_ctx = ctx; + main_ctx->ringbuf = malloc(sizeof(RingBuffer)); + RingBuffer_init(main_ctx->ringbuf, 4096); + RingBuffer *rb = main_ctx->ringbuf; + Transmuxer_new_stream(&main_ctx->transmuxer, rb); + + RingBuffer_write_char(rb, 'F'); + RingBuffer_write_char(rb, 'L'); + RingBuffer_write_char(rb, 'V'); + RingBuffer_write_char(rb, 1); + RingBuffer_write_char(rb, 5); + RingBuffer_write_word32be(rb, 9); + RingBuffer_write_word32be(rb, 0); } void on_rtmp_stop(void *ctx) { - FILE *fp = *(FILE**)ctx; - fclose(fp); - exit(0); + MainCtx *main_ctx = ctx; + RingBuffer_end(main_ctx->ringbuf); } void on_rtmp_video(void *ctx, int64_t timestamp, char *buf, size_t size) { - FILE *fp = *(FILE**)ctx; - fwrite_char(fp, 9); - fwrite_word24be(fp, size); - fwrite_word24be(fp, timestamp); - fwrite_char(fp, timestamp >> 24); - fwrite_word24be(fp, 0); - fwrite(buf, 1, size, fp); - fwrite_word32be(fp, size + 11); + MainCtx *main_ctx = ctx; + RingBuffer *rb = main_ctx->ringbuf; + RingBuffer_write_char(rb, 9); + RingBuffer_write_word24be(rb, size); + RingBuffer_write_word24be(rb, timestamp); + RingBuffer_write_char(rb, timestamp >> 24); + RingBuffer_write_word24be(rb, 0); + RingBuffer_write(rb, (const uint8_t *)buf, size); + RingBuffer_write_word32be(rb, size + 11); } void on_rtmp_audio(void *ctx, int64_t timestamp, char *buf, size_t size) { - FILE *fp = *(FILE**)ctx; - fwrite_char(fp, 8); - fwrite_word24be(fp, size); - fwrite_word24be(fp, timestamp); - fwrite_char(fp, timestamp >> 24); - fwrite_word24be(fp, 0); - fwrite(buf, 1, size, fp); - fwrite_word32be(fp, size + 11); + MainCtx *main_ctx = ctx; + RingBuffer *rb = main_ctx->ringbuf; + RingBuffer_write_char(rb, 8); + RingBuffer_write_word24be(rb, size); + RingBuffer_write_word24be(rb, timestamp); + RingBuffer_write_char(rb, timestamp >> 24); + RingBuffer_write_word24be(rb, 0); + RingBuffer_write(rb, (const uint8_t *)buf, size); + RingBuffer_write_word32be(rb, size + 11); } int main() { + MainCtx main_ctx; RtmpCallbacks rtmp_cbs = { .on_audio = &on_rtmp_audio, .on_video = &on_rtmp_video, .on_start = &on_rtmp_start, .on_stop = &on_rtmp_stop, }; - FILE* fp = NULL; - start_rtmpserver(rtmp_cbs, &fp); + + Transmuxer_init(&main_ctx.transmuxer); + pthread_t transmux_thread; + pthread_create(&transmux_thread, NULL, &Transmuxer_main, &main_ctx.transmuxer); + + start_rtmpserver(rtmp_cbs, &main_ctx); return 0; }
\ No newline at end of file diff --git a/ringbuf.c b/ringbuf.c new file mode 100644 index 0000000..876e590 --- /dev/null +++ b/ringbuf.c @@ -0,0 +1,177 @@ +#include "ringbuf.h" + +#include <string.h> + +void RingBuffer_init(RingBuffer *self, size_t size) { + self->buffer = (uint8_t *)malloc(size); + if (!self->buffer) { + return; + } + + self->max = size; + self->head = 0; + self->tail = 0; + self->full_flag = false; + self->finished_flag = false; + + pthread_mutex_init(&self->mutex, NULL); + pthread_cond_init(&self->not_empty_cond, NULL); + pthread_cond_init(&self->not_full_cond, NULL); +} + +void RingBuffer_destroy(RingBuffer *self) { + free(self->buffer); + pthread_mutex_destroy(&self->mutex); + pthread_cond_destroy(&self->not_empty_cond); + pthread_cond_destroy(&self->not_full_cond); +} + +size_t RingBuffer_size(RingBuffer *self) { + size_t size; + if (self->full_flag) { + size = self->max; + } else if (self->head >= self->tail) { + size = self->head - self->tail; + } else { + size = self->max + self->head - self->tail; + } + return size; +} + +size_t RingBuffer_space(RingBuffer *self) { + return self->max - RingBuffer_size(self); +} + +void RingBuffer_end(RingBuffer *self) { + pthread_mutex_lock(&self->mutex); + self->finished_flag = true; + pthread_mutex_unlock(&self->mutex); +} + +size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len) { + size_t written = 0; + pthread_mutex_lock(&self->mutex); + while (written < len) { + while (self->full_flag) { + pthread_cond_wait(&self->not_full_cond, &self->mutex); + } + + size_t free_space = RingBuffer_space(self); + size_t to_write = (len - written > free_space) ? free_space : len - written; + + size_t first = (to_write > self->max - self->head) ? self->max - self->head : to_write; + memcpy(self->buffer + self->head, data + written, first); + + size_t second = to_write - first; + if (second > 0) memcpy(self->buffer, data + written + first, second); + + self->head = (self->head + to_write) % self->max; + if (to_write == free_space) self->full_flag = true; + + written += to_write; + + pthread_cond_signal(&self->not_empty_cond); + } + pthread_mutex_unlock(&self->mutex); + return written; +} + +size_t RingBuffer_read(RingBuffer *self, uint8_t *data, size_t len) { + size_t read = 0; + pthread_mutex_lock(&self->mutex); + + while (read < len) { + while (RingBuffer_space(self) == 0) { + if (self->finished_flag) { + goto end; + } + pthread_cond_wait(&self->not_empty_cond, &self->mutex); + } + + size_t available = RingBuffer_size(self); + size_t to_read = (len - read > available) ? available : len - read; + + size_t first = (to_read > self->max - self->tail) ? self->max - self->tail : to_read; + memcpy(data + read, self->buffer + self->tail, first); + + size_t second = to_read - first; + if (second > 0) memcpy(data + read + first, self->buffer, second); + + self->tail = (self->tail + to_read) % self->max; + self->full_flag = false; + + read += to_read; + + pthread_cond_signal(&self->not_full_cond); + } +end: + pthread_mutex_unlock(&self->mutex); + return read; +} + +bool RingBuffer_write_word16le(RingBuffer* self, uint16_t x) { + uint8_t buf[2]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + int r = RingBuffer_write(self, buf, 2); + if (r != 2) return false; + return true; +} + +bool RingBuffer_write_word32le(RingBuffer* self, uint32_t x) { + uint8_t buf[4]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[2] = (x >> 16) & 0xff; + buf[3] = (x >> 24) & 0xff; + int r = RingBuffer_write(self, buf, 4); + if (r != 4) return false; + return true; +} + +bool RingBuffer_write_word16be(RingBuffer* self, uint16_t x) { + uint8_t buf[2]; + buf[1] = x & 0xff; + buf[0] = (x >> 8) & 0xff; + int r = RingBuffer_write(self, buf, 2); + if (r != 2) return false; + return true; +} + +bool RingBuffer_write_word32be(RingBuffer* self, uint32_t x) { + uint8_t buf[4]; + buf[3] = x & 0xff; + buf[2] = (x >> 8) & 0xff; + buf[1] = (x >> 16) & 0xff; + buf[0] = (x >> 24) & 0xff; + int r = RingBuffer_write(self, buf, 4); + if (r != 4) return false; + return true; +} + +bool RingBuffer_write_word24le(RingBuffer* self, uint32_t x) { + uint8_t buf[3]; + buf[0] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[2] = (x >> 16) & 0xff; + int r = RingBuffer_write(self, buf, 3); + if (r != 3) return false; + return true; +} + +bool RingBuffer_write_word24be(RingBuffer* self, uint32_t x) { + uint8_t buf[3]; + buf[2] = x & 0xff; + buf[1] = (x >> 8) & 0xff; + buf[0] = (x >> 16) & 0xff; + int r = RingBuffer_write(self, buf, 3); + if (r != 3) return false; + return true; +} + +bool RingBuffer_write_char(RingBuffer* self, uint8_t x) { + uint8_t buf[1]; + buf[0] = x; + int r = RingBuffer_write(self, buf, 1); + return r == 1; +} diff --git a/ringbuf.h b/ringbuf.h new file mode 100644 index 0000000..09e0306 --- /dev/null +++ b/ringbuf.h @@ -0,0 +1,44 @@ +#ifndef RINGBUF_H_ +#define RINGBUF_H_ + +#include <stdint.h> +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> + +typedef struct { + uint8_t *buffer; + size_t head; + size_t tail; + size_t max; + bool full_flag; + bool finished_flag; + + pthread_mutex_t mutex; + pthread_cond_t not_empty_cond; + pthread_cond_t not_full_cond; +} RingBuffer; + +void RingBuffer_init(RingBuffer *self, size_t size); + +void RingBuffer_destroy(RingBuffer *self); + +size_t RingBuffer_size(RingBuffer *self); + +size_t RingBuffer_space(RingBuffer *self); + +size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len); + +bool RingBuffer_write_word16le(RingBuffer *self, uint16_t x); +bool RingBuffer_write_word24le(RingBuffer *self, uint32_t x); +bool RingBuffer_write_word32le(RingBuffer *self, uint32_t x); +bool RingBuffer_write_word16be(RingBuffer *self, uint16_t x); +bool RingBuffer_write_word24be(RingBuffer *self, uint32_t x); +bool RingBuffer_write_word32be(RingBuffer *self, uint32_t x); +bool RingBuffer_write_char(RingBuffer *self, uint8_t x); + +size_t RingBuffer_read(RingBuffer *self, uint8_t *data, size_t len); + +void RingBuffer_end(RingBuffer *self); + +#endif
\ No newline at end of file diff --git a/transmuxer.c b/transmuxer.c new file mode 100644 index 0000000..05074d6 --- /dev/null +++ b/transmuxer.c @@ -0,0 +1,168 @@ + #include "transmuxer.h" +#include "ringbuf.h" + +#include <bits/pthreadtypes.h> +#include <libavformat/avformat.h> +#include <libavutil/timestamp.h> +#include <pthread.h> +#include <stdlib.h> + + +static int wait_for_new_stream(Transmuxer *self) { + // if no new stream, return 0 + return 0; +} + +typedef struct { + AVStream *audio_stream; + AVStream *video_stream; +} StreamPair; + +static StreamPair start_new_output_file( + AVFormatContext *in_fmt_ctx, + AVFormatContext *out_fmt_ctx, + const char *out_filename, + int aidx, int vidx) { + avformat_alloc_output_context2(&out_fmt_ctx, NULL, "mpegts", out_filename); + + AVStream *out_video_stream = avformat_new_stream(out_fmt_ctx, NULL); + AVStream *out_audio_stream = avformat_new_stream(out_fmt_ctx, NULL); + + avcodec_parameters_copy(out_video_stream->codecpar, in_fmt_ctx->streams[vidx]->codecpar); + avcodec_parameters_copy(out_audio_stream->codecpar, in_fmt_ctx->streams[aidx]->codecpar); + + if (!(out_fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + if (avio_open(&out_fmt_ctx->pb, out_filename, AVIO_FLAG_WRITE) < 0) { + fprintf(stderr, "Could not open output file '%s'\n", out_filename); + exit(-1); + } + } + + if (avformat_write_header(out_fmt_ctx, NULL) < 0) { + fprintf(stderr, "avformat_write_header failed.\n"); + abort(); + } + return (StreamPair) { + .audio_stream = out_audio_stream, + .video_stream = out_video_stream, + }; +} + +static void finalize_output_file(AVFormatContext *out_fmt_ctx) { + av_write_trailer(out_fmt_ctx); + if (!(out_fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + avio_closep(&out_fmt_ctx->pb); + } + avformat_free_context(out_fmt_ctx); + // TODO: update m3u8 +} + +#define SEGMENT_DURATION 5 + +void* Transmuxer_main (void *vself) { + Transmuxer *self = vself; + pthread_mutex_lock(&self->lock); + while (wait_for_new_stream(self)) { + AVFormatContext *in_fmt_ctx = NULL; + if (avformat_open_input(&in_fmt_ctx, NULL, NULL, NULL) < 0) { + fprintf(stderr, "Could not open input file\n"); + abort(); + } + + if (avformat_find_stream_info(in_fmt_ctx, NULL) < 0) { + fprintf(stderr, "Could not find stream info\n"); + abort(); + } + + int video_stream_index = -1, audio_stream_index = -1; + for (int i = 0; i < in_fmt_ctx->nb_streams; i++) { + if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) + video_stream_index = i; + else if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) + audio_stream_index = i; + } + + if (video_stream_index < 0) { + fprintf(stderr, "No video stream found\n"); + abort(); + } + if (audio_stream_index < 0) { + fprintf(stderr, "No audio stream found\n"); + abort(); + } + + AVFormatContext *out_fmt_ctx = NULL; + int segment_index = 0; + int64_t segment_start_pts = 0; + + char out_filename[256]; + snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); + + + int64_t pts_time; + AVPacket pkt; + StreamPair output_stream; + + output_stream = start_new_output_file(in_fmt_ctx, out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); + while (av_read_frame(in_fmt_ctx, &pkt) >= 0) { + AVStream *in_stream = in_fmt_ctx->streams[pkt.stream_index]; + AVStream *out_stream = NULL; + if (pkt.stream_index == video_stream_index) + out_stream = output_stream.video_stream; + else if (pkt.stream_index == audio_stream_index) + out_stream = output_stream.video_stream; + else { + av_packet_unref(&pkt); + continue; + } + + pts_time = av_rescale_q(pkt.pts, in_stream->time_base, AV_TIME_BASE_Q); + + // if need split + if (pts_time - segment_start_pts >= SEGMENT_DURATION * AV_TIME_BASE) { + if (pkt.stream_index == video_stream_index && (pkt.flags & AV_PKT_FLAG_KEY)) { + // close current ts + printf("new ts: %ld\n", pts_time - segment_start_pts); + finalize_output_file(out_fmt_ctx); + + // open new ts + segment_index++; + segment_start_pts = pts_time; + snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); + start_new_output_file(in_fmt_ctx, out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); + } + } + pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, out_stream->time_base); + pkt.pos = -1; + pkt.stream_index = (pkt.stream_index == video_stream_index) ? output_stream.video_stream->index : output_stream.audio_stream->index; + + av_interleaved_write_frame(out_fmt_ctx, &pkt); + av_packet_unref(&pkt); + } + printf("new ts: %ld\n", pts_time - segment_start_pts); + finalize_output_file(out_fmt_ctx); + + avformat_close_input(&in_fmt_ctx); + RingBuffer_destroy(self->stream); + self->stream = NULL; + free(self->stream); + } + pthread_mutex_unlock(&self->lock); + return NULL; +} + +void Transmuxer_init(Transmuxer *self) { + pthread_mutex_init(&self->lock, NULL); + pthread_cond_init(&self->streaming_cond, NULL); + self->stream = NULL; + self->quit = false; +} + +void Transmuxer_new_stream(Transmuxer *self, RingBuffer *ringbuf) { + pthread_mutex_lock(&self->lock); + self->stream = ringbuf; + pthread_mutex_unlock(&self->lock); + pthread_cond_signal(&self->streaming_cond); +} diff --git a/transmuxer.h b/transmuxer.h new file mode 100644 index 0000000..9bcbf1b --- /dev/null +++ b/transmuxer.h @@ -0,0 +1,20 @@ +#ifndef TRANSMUXER_H_ +#define TRANSMUXER_H_ + +#include "ringbuf.h" +#include "pthread.h" +#include <bits/pthreadtypes.h> + +typedef struct { + pthread_mutex_t lock; + pthread_cond_t streaming_cond; + RingBuffer *stream; + bool quit; +} Transmuxer; + +void Transmuxer_init(Transmuxer *self); + +void* Transmuxer_main(void *vself); +void Transmuxer_new_stream(Transmuxer *self, RingBuffer *ringbuf); + +#endif |
