From 7540ded75ee07b026950486172aca08cd0686a4e Mon Sep 17 00:00:00 2001 From: Mistivia Date: Sat, 13 Sep 2025 14:17:36 +0800 Subject: transcode talker --- main.c | 10 +-- ringbuf.c | 2 + transcode_talker.c | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++++ transcode_talker.h | 20 ++++++ transmuxer.c | 202 ----------------------------------------------------- transmuxer.h | 20 ------ 6 files changed, 229 insertions(+), 227 deletions(-) create mode 100644 transcode_talker.c create mode 100644 transcode_talker.h delete mode 100644 transmuxer.c delete mode 100644 transmuxer.h diff --git a/main.c b/main.c index a659fc9..9839ad5 100644 --- a/main.c +++ b/main.c @@ -2,11 +2,11 @@ #include "rtmpserver.h" #include "ringbuf.h" -#include "transmuxer.h" +#include "transcode_talker.h" typedef struct { RingBuffer *ringbuf; - Transmuxer transmuxer; + TranscodeTalker transcode_talker; } MainCtx; void on_rtmp_start(void *ctx) { @@ -14,7 +14,7 @@ void on_rtmp_start(void *ctx) { main_ctx->ringbuf = malloc(sizeof(RingBuffer)); RingBuffer_init(main_ctx->ringbuf, 4096); RingBuffer *rb = main_ctx->ringbuf; - Transmuxer_new_stream(&main_ctx->transmuxer, rb); + TranscodeTalker_new_stream(&main_ctx->transcode_talker, rb); RingBuffer_write_char(rb, 'F'); RingBuffer_write_char(rb, 'L'); @@ -63,9 +63,9 @@ int main() { .on_stop = &on_rtmp_stop, }; - Transmuxer_init(&main_ctx.transmuxer); + TranscodeTalker_init(&main_ctx.transcode_talker); pthread_t transmux_thread; - pthread_create(&transmux_thread, NULL, &Transmuxer_main, &main_ctx.transmuxer); + pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker); start_rtmpserver(rtmp_cbs, &main_ctx); return 0; diff --git a/ringbuf.c b/ringbuf.c index 29301e5..2c58752 100644 --- a/ringbuf.c +++ b/ringbuf.c @@ -1,5 +1,6 @@ #include "ringbuf.h" +#include #include void RingBuffer_init(RingBuffer *self, size_t size) { @@ -46,6 +47,7 @@ void RingBuffer_end(RingBuffer *self) { pthread_mutex_lock(&self->mutex); self->finished_flag = true; pthread_mutex_unlock(&self->mutex); + pthread_cond_signal(&self->not_empty_cond); } size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len) { diff --git a/transcode_talker.c b/transcode_talker.c new file mode 100644 index 0000000..9dd9ae1 --- /dev/null +++ b/transcode_talker.c @@ -0,0 +1,202 @@ +#include "transcode_talker.h" +#include "ringbuf.h" + +#include +#include +#include +#include +#include +#include +#include + + +static int wait_for_new_stream(TranscodeTalker *self) { + while (pthread_cond_wait(&self->streaming_cond, &self->lock)) { + if (self->quit) { + return 0; + } + if (self->stream != NULL) break; + } + return 1; +} + +typedef struct { + AVStream *audio_stream; + AVStream *video_stream; +} StreamPair; + +#define OUT_VIDEO_STREAM_INDEX 0 +#define OUT_AUDIO_STREAM_INDEX 1 + +static StreamPair start_new_output_file( + AVFormatContext *in_fmt_ctx, + AVFormatContext **out_fmt_ctx, + const char *out_filename, + int aidx, int vidx) { + avformat_alloc_output_context2(out_fmt_ctx, NULL, "mpegts", out_filename); + + AVStream *out_video_stream = avformat_new_stream(*out_fmt_ctx, NULL); + AVStream *out_audio_stream = avformat_new_stream(*out_fmt_ctx, NULL); + + avcodec_parameters_copy(out_video_stream->codecpar, in_fmt_ctx->streams[vidx]->codecpar); + avcodec_parameters_copy(out_audio_stream->codecpar, in_fmt_ctx->streams[aidx]->codecpar); + + if (!((*out_fmt_ctx)->oformat->flags & AVFMT_NOFILE)) { + if (avio_open(&(*out_fmt_ctx)->pb, out_filename, AVIO_FLAG_WRITE) < 0) { + fprintf(stderr, "Could not open output file '%s'\n", out_filename); + exit(-1); + } + } + + if (avformat_write_header(*out_fmt_ctx, NULL) < 0) { + fprintf(stderr, "avformat_write_header failed.\n"); + abort(); + } + return (StreamPair) { + .audio_stream = out_audio_stream, + .video_stream = out_video_stream, + }; +} + +static int RingBuffer_avio_read(void *ctx, uint8_t *buf, int buf_size) { + RingBuffer *rb = ctx; + size_t n = RingBuffer_read(rb, buf, buf_size); + if (n == 0 && buf_size > 0) { + return AVERROR_EOF; + } + return (int)n; +} + +AVIOContext* create_avio_from_ringbuffer(RingBuffer *rb, int buffer_size) { + uint8_t *avio_buf = av_malloc(buffer_size); + AVIOContext *avio = avio_alloc_context( + avio_buf, buffer_size, + 0, + rb, + RingBuffer_avio_read, + NULL, + NULL); + return avio; +} + +static void finalize_output_file(AVFormatContext *out_fmt_ctx) { + av_write_trailer(out_fmt_ctx); + if (!(out_fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + avio_closep(&out_fmt_ctx->pb); + } + avformat_free_context(out_fmt_ctx); + // TODO: update m3u8 +} + +#define SEGMENT_DURATION 5 + +void* TranscodeTalker_main (void *vself) { + TranscodeTalker *self = vself; + pthread_mutex_lock(&self->lock); + while (wait_for_new_stream(self)) { + AVFormatContext *in_fmt_ctx = avformat_alloc_context(); + in_fmt_ctx->pb = create_avio_from_ringbuffer(self->stream, 4096); + const AVInputFormat *flv_fmt = av_find_input_format("flv"); + if (avformat_open_input(&in_fmt_ctx, NULL, flv_fmt, NULL) < 0) { + fprintf(stderr, "Could not open input file\n"); + abort(); + } + + if (avformat_find_stream_info(in_fmt_ctx, NULL) < 0) { + fprintf(stderr, "Could not find stream info\n"); + abort(); + } + + int video_stream_index = -1, audio_stream_index = -1; + for (int i = 0; i < in_fmt_ctx->nb_streams; i++) { + if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) + video_stream_index = i; + else if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) + audio_stream_index = i; + } + + if (video_stream_index < 0) { + fprintf(stderr, "No video stream found\n"); + abort(); + } + if (audio_stream_index < 0) { + fprintf(stderr, "No audio stream found\n"); + abort(); + } + + AVFormatContext *out_fmt_ctx = NULL; + int segment_index = 0; + int64_t segment_start_pts = 0; + + char out_filename[256]; + snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); + + + int64_t pts_time; + AVPacket pkt; + StreamPair output_stream = start_new_output_file( + in_fmt_ctx, &out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); + while (av_read_frame(in_fmt_ctx, &pkt) >= 0) { + AVStream *in_stream = in_fmt_ctx->streams[pkt.stream_index]; + AVStream *out_stream = NULL; + if (pkt.stream_index == video_stream_index) + out_stream = output_stream.video_stream; + else if (pkt.stream_index == audio_stream_index) + out_stream = output_stream.audio_stream; + else { + av_packet_unref(&pkt); + continue; + } + + pts_time = av_rescale_q(pkt.pts, in_stream->time_base, AV_TIME_BASE_Q); + + // if need split + if (pts_time - segment_start_pts >= SEGMENT_DURATION * AV_TIME_BASE) { + if (pkt.stream_index == video_stream_index && (pkt.flags & AV_PKT_FLAG_KEY)) { + // close current ts + printf("new ts: %ld\n", pts_time - segment_start_pts); + finalize_output_file(out_fmt_ctx); + + // open new ts + segment_index++; + segment_start_pts = pts_time; + snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); + output_stream = start_new_output_file(in_fmt_ctx, &out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); + } + } + pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, out_stream->time_base); + pkt.pos = -1; + pkt.stream_index = (pkt.stream_index == audio_stream_index) ? OUT_AUDIO_STREAM_INDEX : OUT_VIDEO_STREAM_INDEX; + + av_interleaved_write_frame(out_fmt_ctx, &pkt); + av_packet_unref(&pkt); + } + printf("new ts: %ld\n", pts_time - segment_start_pts); + finalize_output_file(out_fmt_ctx); + + av_free(in_fmt_ctx->pb->buffer); + avio_context_free(&in_fmt_ctx->pb); + avformat_close_input(&in_fmt_ctx); + RingBuffer_destroy(self->stream); + self->stream = NULL; + free(self->stream); + } + pthread_mutex_unlock(&self->lock); + return NULL; +} + +void TranscodeTalker_init(TranscodeTalker *self) { + pthread_mutex_init(&self->lock, NULL); + pthread_cond_init(&self->streaming_cond, NULL); + self->stream = NULL; + self->quit = false; +} + +void TranscodeTalker_new_stream(TranscodeTalker *self, RingBuffer *ringbuf) { + pthread_mutex_lock(&self->lock); + self->stream = ringbuf; + pthread_mutex_unlock(&self->lock); + pthread_cond_signal(&self->streaming_cond); +} diff --git a/transcode_talker.h b/transcode_talker.h new file mode 100644 index 0000000..312ae8c --- /dev/null +++ b/transcode_talker.h @@ -0,0 +1,20 @@ +#ifndef TRANSCODE_TALKER_H_ +#define TRANSCODE_TALKER_H_ + +#include "ringbuf.h" +#include "pthread.h" +#include + +typedef struct { + pthread_mutex_t lock; + pthread_cond_t streaming_cond; + RingBuffer *stream; + bool quit; +} TranscodeTalker; + +void TranscodeTalker_init(TranscodeTalker *self); + +void* TranscodeTalker_main(void *vself); +void TranscodeTalker_new_stream(TranscodeTalker *self, RingBuffer *ringbuf); + +#endif diff --git a/transmuxer.c b/transmuxer.c deleted file mode 100644 index e2f60fc..0000000 --- a/transmuxer.c +++ /dev/null @@ -1,202 +0,0 @@ -#include "transmuxer.h" -#include "ringbuf.h" - -#include -#include -#include -#include -#include -#include -#include - - -static int wait_for_new_stream(Transmuxer *self) { - while (pthread_cond_wait(&self->streaming_cond, &self->lock)) { - if (self->quit) { - return 0; - } - if (self->stream != NULL) break; - } - return 1; -} - -typedef struct { - AVStream *audio_stream; - AVStream *video_stream; -} StreamPair; - -#define OUT_VIDEO_STREAM_INDEX 0 -#define OUT_AUDIO_STREAM_INDEX 1 - -static StreamPair start_new_output_file( - AVFormatContext *in_fmt_ctx, - AVFormatContext **out_fmt_ctx, - const char *out_filename, - int aidx, int vidx) { - avformat_alloc_output_context2(out_fmt_ctx, NULL, "mpegts", out_filename); - - AVStream *out_video_stream = avformat_new_stream(*out_fmt_ctx, NULL); - AVStream *out_audio_stream = avformat_new_stream(*out_fmt_ctx, NULL); - - avcodec_parameters_copy(out_video_stream->codecpar, in_fmt_ctx->streams[vidx]->codecpar); - avcodec_parameters_copy(out_audio_stream->codecpar, in_fmt_ctx->streams[aidx]->codecpar); - - if (!((*out_fmt_ctx)->oformat->flags & AVFMT_NOFILE)) { - if (avio_open(&(*out_fmt_ctx)->pb, out_filename, AVIO_FLAG_WRITE) < 0) { - fprintf(stderr, "Could not open output file '%s'\n", out_filename); - exit(-1); - } - } - - if (avformat_write_header(*out_fmt_ctx, NULL) < 0) { - fprintf(stderr, "avformat_write_header failed.\n"); - abort(); - } - return (StreamPair) { - .audio_stream = out_audio_stream, - .video_stream = out_video_stream, - }; -} - -static int RingBuffer_avio_read(void *ctx, uint8_t *buf, int buf_size) { - RingBuffer *rb = ctx; - size_t n = RingBuffer_read(rb, buf, buf_size); - if (n == 0 && buf_size > 0) { - return AVERROR_EOF; - } - return (int)n; -} - -AVIOContext* create_avio_from_ringbuffer(RingBuffer *rb, int buffer_size) { - uint8_t *avio_buf = av_malloc(buffer_size); - AVIOContext *avio = avio_alloc_context( - avio_buf, buffer_size, - 0, - rb, - RingBuffer_avio_read, - NULL, - NULL); - return avio; -} - -static void finalize_output_file(AVFormatContext *out_fmt_ctx) { - av_write_trailer(out_fmt_ctx); - if (!(out_fmt_ctx->oformat->flags & AVFMT_NOFILE)) { - avio_closep(&out_fmt_ctx->pb); - } - avformat_free_context(out_fmt_ctx); - // TODO: update m3u8 -} - -#define SEGMENT_DURATION 5 - -void* Transmuxer_main (void *vself) { - Transmuxer *self = vself; - pthread_mutex_lock(&self->lock); - while (wait_for_new_stream(self)) { - AVFormatContext *in_fmt_ctx = avformat_alloc_context(); - in_fmt_ctx->pb = create_avio_from_ringbuffer(self->stream, 4096); - const AVInputFormat *flv_fmt = av_find_input_format("flv"); - if (avformat_open_input(&in_fmt_ctx, NULL, flv_fmt, NULL) < 0) { - fprintf(stderr, "Could not open input file\n"); - abort(); - } - - if (avformat_find_stream_info(in_fmt_ctx, NULL) < 0) { - fprintf(stderr, "Could not find stream info\n"); - abort(); - } - - int video_stream_index = -1, audio_stream_index = -1; - for (int i = 0; i < in_fmt_ctx->nb_streams; i++) { - if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) - video_stream_index = i; - else if (in_fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) - audio_stream_index = i; - } - - if (video_stream_index < 0) { - fprintf(stderr, "No video stream found\n"); - abort(); - } - if (audio_stream_index < 0) { - fprintf(stderr, "No audio stream found\n"); - abort(); - } - - AVFormatContext *out_fmt_ctx = NULL; - int segment_index = 0; - int64_t segment_start_pts = 0; - - char out_filename[256]; - snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); - - - int64_t pts_time; - AVPacket pkt; - StreamPair output_stream = start_new_output_file( - in_fmt_ctx, &out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); - while (av_read_frame(in_fmt_ctx, &pkt) >= 0) { - AVStream *in_stream = in_fmt_ctx->streams[pkt.stream_index]; - AVStream *out_stream = NULL; - if (pkt.stream_index == video_stream_index) - out_stream = output_stream.video_stream; - else if (pkt.stream_index == audio_stream_index) - out_stream = output_stream.audio_stream; - else { - av_packet_unref(&pkt); - continue; - } - - pts_time = av_rescale_q(pkt.pts, in_stream->time_base, AV_TIME_BASE_Q); - - // if need split - if (pts_time - segment_start_pts >= SEGMENT_DURATION * AV_TIME_BASE) { - if (pkt.stream_index == video_stream_index && (pkt.flags & AV_PKT_FLAG_KEY)) { - // close current ts - printf("new ts: %ld\n", pts_time - segment_start_pts); - finalize_output_file(out_fmt_ctx); - - // open new ts - segment_index++; - segment_start_pts = pts_time; - snprintf(out_filename, sizeof(out_filename), "segment_%03d.ts", segment_index); - output_stream = start_new_output_file(in_fmt_ctx, &out_fmt_ctx, out_filename, audio_stream_index, video_stream_index); - } - } - pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); - pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); - pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, out_stream->time_base); - pkt.pos = -1; - pkt.stream_index = (pkt.stream_index == audio_stream_index) ? OUT_AUDIO_STREAM_INDEX : OUT_VIDEO_STREAM_INDEX; - - av_interleaved_write_frame(out_fmt_ctx, &pkt); - av_packet_unref(&pkt); - } - printf("new ts: %ld\n", pts_time - segment_start_pts); - finalize_output_file(out_fmt_ctx); - - av_free(in_fmt_ctx->pb->buffer); - avio_context_free(&in_fmt_ctx->pb); - avformat_close_input(&in_fmt_ctx); - RingBuffer_destroy(self->stream); - self->stream = NULL; - free(self->stream); - } - pthread_mutex_unlock(&self->lock); - return NULL; -} - -void Transmuxer_init(Transmuxer *self) { - pthread_mutex_init(&self->lock, NULL); - pthread_cond_init(&self->streaming_cond, NULL); - self->stream = NULL; - self->quit = false; -} - -void Transmuxer_new_stream(Transmuxer *self, RingBuffer *ringbuf) { - pthread_mutex_lock(&self->lock); - self->stream = ringbuf; - pthread_mutex_unlock(&self->lock); - pthread_cond_signal(&self->streaming_cond); -} diff --git a/transmuxer.h b/transmuxer.h deleted file mode 100644 index 9bcbf1b..0000000 --- a/transmuxer.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef TRANSMUXER_H_ -#define TRANSMUXER_H_ - -#include "ringbuf.h" -#include "pthread.h" -#include - -typedef struct { - pthread_mutex_t lock; - pthread_cond_t streaming_cond; - RingBuffer *stream; - bool quit; -} Transmuxer; - -void Transmuxer_init(Transmuxer *self); - -void* Transmuxer_main(void *vself); -void Transmuxer_new_stream(Transmuxer *self, RingBuffer *ringbuf); - -#endif -- cgit v1.0