From a4daf467f871b0e77f07f1071b47b960da7bfba9 Mon Sep 17 00:00:00 2001 From: Mistivia Date: Sat, 13 Sep 2025 20:53:56 +0800 Subject: add s3 client --- config.c | 12 +++++++++ config.h | 20 +++++++++++++++ fsutils.c | 23 ++++++++--------- fsutils.h | 2 -- main.c | 7 ++++++ ringbuf.c | 2 +- rtmpserver.h | 6 ++--- s3_worker.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ s3_worker.h | 46 ++++++++++++++++++++++++++++++++++ s3_worker_impl.cpp | 19 ++++++++++++++ task_queue.c | 49 +++++++++++++++++++++++++++++++++++++ task_queue.h | 28 +++++++++++++++++++++ transcode_talker.c | 2 +- 13 files changed, 268 insertions(+), 20 deletions(-) create mode 100644 config.c create mode 100644 config.h create mode 100644 s3_worker.c create mode 100644 s3_worker.h create mode 100644 s3_worker_impl.cpp create mode 100644 task_queue.c create mode 100644 task_queue.h diff --git a/config.c b/config.c new file mode 100644 index 0000000..11d82d8 --- /dev/null +++ b/config.c @@ -0,0 +1,12 @@ +#include "config.h" + +EZLiveConfig *ezlive_config; + + +void EZLiveConfig_init(EZLiveConfig *self) { + +} + +void EZLiveConfig_load(EZLiveConfig *self, const char *filename) { + +} diff --git a/config.h b/config.h new file mode 100644 index 0000000..531a31e --- /dev/null +++ b/config.h @@ -0,0 +1,20 @@ +#ifndef EZLIVE_CONFIG_H_ +#define EZLIVE_CONFIG_H_ + +typedef struct { + const char *listening_addr; + int port; + const char *bucket; + const char *endpoint; + const char *dir; + const char *access_key; + const char *secret_key; + const char *web_endpoint; +} EZLiveConfig; + +extern EZLiveConfig *ezlive_config; + +void EZLiveConfig_init(EZLiveConfig *self); +void EZLiveConfig_load(EZLiveConfig *self, const char *filename); + +#endif \ No newline at end of file diff --git a/fsutils.c b/fsutils.c index 0c38eaf..f26f2b9 100644 --- a/fsutils.c +++ b/fsutils.c @@ -4,27 +4,29 @@ #include #include +#include "s3_worker.h" + +const char hextable[] = "0123456789abcdef"; + void tmp_local_filename(const char *prefix, char *buf) { - static const char hex[] = "0123456789abcdef"; int prefix_len = strlen(prefix); memcpy(buf, prefix, prefix_len); buf = buf + prefix_len; size_t i; for (i = 0; i < 4; i++) { unsigned char r = rand() & 0xFF; - buf[i * 2] = hex[r >> 4]; - buf[i * 2 + 1] = hex[r & 0xF]; + buf[i * 2] = hextable[r >> 4]; + buf[i * 2 + 1] = hextable[r & 0xF]; } buf[i*2] = '\0'; } void tmp_ts_prefix(char *buf) { - static const char hex[] = "0123456789abcdef"; size_t i; for (i = 0; i < 4; i++) { unsigned char r = rand() & 0xFF; - buf[i * 2] = hex[r >> 4]; - buf[i * 2 + 1] = hex[r & 0xF]; + buf[i * 2] = hextable[r >> 4]; + buf[i * 2 + 1] = hextable[r & 0xF]; } buf[i*2] = '\0'; } @@ -34,14 +36,9 @@ void ts_filename(const char *prefix, int num, char *buf) { } void upload_file(const char *local, const char *remote) { - // TODO + s3_worker_push(s3_upload_task(local, remote)); } void remove_remote(const char *remote) { - // TODO + s3_worker_push(s3_delete_task(remote)); } - -char ** list_file() { - // TODO - return NULL; -} \ No newline at end of file diff --git a/fsutils.h b/fsutils.h index 24cd5ab..968bade 100644 --- a/fsutils.h +++ b/fsutils.h @@ -11,6 +11,4 @@ void upload_file(const char *local, const char *remote); void remove_remote(const char *remote); -char ** list_file(); - #endif \ No newline at end of file diff --git a/main.c b/main.c index f7ab273..90281c4 100644 --- a/main.c +++ b/main.c @@ -1,8 +1,11 @@ +#include #include +#include #include "rtmpserver.h" #include "ringbuf.h" #include "transcode_talker.h" +#include "s3_worker.h" typedef struct { RingBuffer *ringbuf; @@ -65,8 +68,12 @@ int main() { }; TranscodeTalker_init(&main_ctx.transcode_talker); + s3_worker_init(); + pthread_t transmux_thread; pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker); + pthread_t s3worker_thread; + pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL); start_rtmpserver(rtmp_cbs, &main_ctx); return 0; diff --git a/ringbuf.c b/ringbuf.c index 2c58752..9f5fb86 100644 --- a/ringbuf.c +++ b/ringbuf.c @@ -46,8 +46,8 @@ size_t RingBuffer_space(RingBuffer *self) { void RingBuffer_end(RingBuffer *self) { pthread_mutex_lock(&self->mutex); self->finished_flag = true; - pthread_mutex_unlock(&self->mutex); pthread_cond_signal(&self->not_empty_cond); + pthread_mutex_unlock(&self->mutex); } size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len) { diff --git a/rtmpserver.h b/rtmpserver.h index cae56a5..c9383b4 100644 --- a/rtmpserver.h +++ b/rtmpserver.h @@ -1,13 +1,13 @@ #ifndef RTMPSERVER_H_ #define RTMPSERVER_H_ -#include -#include - #ifdef __cplusplus extern "C" { #endif +#include +#include + typedef struct { void (*on_start)(void *ctx); void (*on_stop)(void *ctx); diff --git a/s3_worker.c b/s3_worker.c new file mode 100644 index 0000000..5f21155 --- /dev/null +++ b/s3_worker.c @@ -0,0 +1,72 @@ +#include "s3_worker.h" + +#include +#include +#include + +#include "task_queue.h" + +TaskQueue task_queue; + +void exec_s3_task(void *vtask) { + char obj_name_buf[256]; + S3Task *task = vtask; + if (task->task_type == kUploadTask) { + // TODO + } else if (task->task_type == kDeleteTask) { + // TODO + } else if (task->task_type == kClearTask) { + // TODO + } else { + fprintf(stderr, "unknown task type.\n"); + } + free(task->local_file); + free(task->remote_name); + free(task); +} + +void s3_worker_init() { + s3client_init(); + TaskQueue_init(&task_queue, 128); +} + +void s3_worker_push(S3Task task) { + S3Task *ptask = malloc(sizeof(S3Task)); + *ptask = task; + ptask->local_file = ptask->local_file; + ptask->remote_name = ptask->remote_name; + TaskQueue_push(&task_queue, exec_s3_task, ptask); +} + +void* s3_worker_main(void *ctx) { + while (1) { + TaskFn task_fn; + void *arg; + TaskQueue_pop(&task_queue, &task_fn, &arg); + (*task_fn)(arg); + } +} + +S3Task s3_upload_task(const char *local, const char *remote) { + return (S3Task) { + .task_type = kUploadTask, + .local_file = strdup(local), + .remote_name = strdup(remote), + }; +} + +S3Task s3_delete_task(const char *name) { + return (S3Task) { + .task_type = kDeleteTask, + .local_file = NULL, + .remote_name = strdup(name), + }; +} + +S3Task s3_clear_task() { + return (S3Task) { + .task_type = kClearTask, + .local_file = NULL, + .remote_name = NULL, + }; +} \ No newline at end of file diff --git a/s3_worker.h b/s3_worker.h new file mode 100644 index 0000000..7d3d209 --- /dev/null +++ b/s3_worker.h @@ -0,0 +1,46 @@ +#ifndef S3_WORKER_H_ +#define S3_WORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern void *s3client; + +typedef enum { + kUploadTask, + kDeleteTask, + kClearTask, +} S3TaskType; + +typedef struct { + S3TaskType task_type; + char *local_file; + char *remote_name; +} S3Task; + +S3Task s3_upload_task(const char *local, const char *remote); + +S3Task s3_delete_task(const char *name); + +S3Task s3_clear_task(); + +void exec_s3_task(void *); + +void s3_worker_init(); + +void* s3_worker_main(void *); + +void s3_worker_push(S3Task task); + +void s3client_init(); + +void s3client_put(const char *filename, const char *object_name); + +void s3client_delete(const char *object_name); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp new file mode 100644 index 0000000..ccfe7e0 --- /dev/null +++ b/s3_worker_impl.cpp @@ -0,0 +1,19 @@ +#include "s3_worker.h" + +#include +#include +#include + +void *s3client; + +void s3client_init() { + +} + +void s3client_put(const char *filename, const char *object_name) { + +} + +void s3client_delete(const char *object_name) { + +} \ No newline at end of file diff --git a/task_queue.c b/task_queue.c new file mode 100644 index 0000000..5708fdc --- /dev/null +++ b/task_queue.c @@ -0,0 +1,49 @@ +#include "task_queue.h" + +#include + +void TaskQueue_init(TaskQueue *self, int capacity) { + self->tasks = malloc(sizeof(TaskFn) * capacity); + self->args = malloc(sizeof(void*) * capacity); + self->capacity = capacity; + self->front = 0; + self->end = 0; + self->size = 0; + pthread_mutex_init(&self->lock, NULL); + pthread_cond_init(&self->not_full, NULL); + pthread_cond_init(&self->not_empty, NULL); +} + +void TaskQueue_destroy(TaskQueue *self) { + free(self->tasks); + free(self->args); + pthread_mutex_destroy(&self->lock); + pthread_cond_destroy(&self->not_full); + pthread_cond_destroy(&self->not_empty); +} + +void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg) { + pthread_mutex_lock(&self->lock); + while (self->size == self->capacity) { + pthread_cond_wait(&self->not_full, &self->lock); + } + self->tasks[self->end] = task; + self->args[self->end] = arg; + self->end = (self->end + 1) % self->capacity; + self->size++; + pthread_cond_signal(&self->not_empty); + pthread_mutex_unlock(&self->lock); +} + +void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg) { + pthread_mutex_lock(&self->lock); + while (self->size == 0) { + pthread_cond_wait(&self->not_empty, &self->lock); + } + *task = self->tasks[self->front]; + *arg = self->args[self->front]; + self->front = (self->front + 1) % self->capacity; + self->size--; + pthread_cond_signal(&self->not_full); + pthread_mutex_unlock(&self->lock); +} diff --git a/task_queue.h b/task_queue.h new file mode 100644 index 0000000..2843fb9 --- /dev/null +++ b/task_queue.h @@ -0,0 +1,28 @@ +#ifndef TASK_QUEUE_H_ +#define TASK_QUEUE_H_ + +#include + +typedef void (*TaskFn)(void *arg); + +typedef struct { + TaskFn *tasks; + void **args; + int capacity; + int front; + int end; + int size; + pthread_mutex_t lock; + pthread_cond_t not_full; + pthread_cond_t not_empty; +} TaskQueue; + +void TaskQueue_init(TaskQueue *self, int capacity); + +void TaskQueue_destroy(TaskQueue *self); + +void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg); + +void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg); + +#endif diff --git a/transcode_talker.c b/transcode_talker.c index ac325ad..3d22de5 100644 --- a/transcode_talker.c +++ b/transcode_talker.c @@ -292,6 +292,6 @@ void TranscodeTalker_init(TranscodeTalker *self) { void TranscodeTalker_new_stream(TranscodeTalker *self, RingBuffer *ringbuf) { pthread_mutex_lock(&self->lock); self->stream = ringbuf; - pthread_mutex_unlock(&self->lock); pthread_cond_signal(&self->streaming_cond); + pthread_mutex_unlock(&self->lock); } -- cgit v1.0