diff options
| author | Mistivia <i@mistivia.com> | 2025-09-13 20:53:56 +0800 |
|---|---|---|
| committer | Mistivia <i@mistivia.com> | 2025-09-13 20:53:56 +0800 |
| commit | a4daf467f871b0e77f07f1071b47b960da7bfba9 (patch) | |
| tree | 43b8847b395a90f5aadb57593c0d84e2c13cca7c | |
| parent | f3eeea1d7092f3ca98836035bf75b941d14c2067 (diff) | |
add s3 client
| -rw-r--r-- | config.c | 12 | ||||
| -rw-r--r-- | config.h | 20 | ||||
| -rw-r--r-- | fsutils.c | 23 | ||||
| -rw-r--r-- | fsutils.h | 2 | ||||
| -rw-r--r-- | main.c | 7 | ||||
| -rw-r--r-- | ringbuf.c | 2 | ||||
| -rw-r--r-- | rtmpserver.h | 6 | ||||
| -rw-r--r-- | s3_worker.c | 72 | ||||
| -rw-r--r-- | s3_worker.h | 46 | ||||
| -rw-r--r-- | s3_worker_impl.cpp | 19 | ||||
| -rw-r--r-- | task_queue.c | 49 | ||||
| -rw-r--r-- | task_queue.h | 28 | ||||
| -rw-r--r-- | transcode_talker.c | 2 |
13 files changed, 268 insertions, 20 deletions
diff --git a/config.c b/config.c new file mode 100644 index 0000000..11d82d8 --- /dev/null +++ b/config.c @@ -0,0 +1,12 @@ +#include "config.h" + +EZLiveConfig *ezlive_config; + + +void EZLiveConfig_init(EZLiveConfig *self) { + +} + +void EZLiveConfig_load(EZLiveConfig *self, const char *filename) { + +} diff --git a/config.h b/config.h new file mode 100644 index 0000000..531a31e --- /dev/null +++ b/config.h @@ -0,0 +1,20 @@ +#ifndef EZLIVE_CONFIG_H_ +#define EZLIVE_CONFIG_H_ + +typedef struct { + const char *listening_addr; + int port; + const char *bucket; + const char *endpoint; + const char *dir; + const char *access_key; + const char *secret_key; + const char *web_endpoint; +} EZLiveConfig; + +extern EZLiveConfig *ezlive_config; + +void EZLiveConfig_init(EZLiveConfig *self); +void EZLiveConfig_load(EZLiveConfig *self, const char *filename); + +#endif
\ No newline at end of file @@ -4,27 +4,29 @@ #include <stdlib.h> #include <string.h> +#include "s3_worker.h" + +const char hextable[] = "0123456789abcdef"; + void tmp_local_filename(const char *prefix, char *buf) { - static const char hex[] = "0123456789abcdef"; int prefix_len = strlen(prefix); memcpy(buf, prefix, prefix_len); buf = buf + prefix_len; size_t i; for (i = 0; i < 4; i++) { unsigned char r = rand() & 0xFF; - buf[i * 2] = hex[r >> 4]; - buf[i * 2 + 1] = hex[r & 0xF]; + buf[i * 2] = hextable[r >> 4]; + buf[i * 2 + 1] = hextable[r & 0xF]; } buf[i*2] = '\0'; } void tmp_ts_prefix(char *buf) { - static const char hex[] = "0123456789abcdef"; size_t i; for (i = 0; i < 4; i++) { unsigned char r = rand() & 0xFF; - buf[i * 2] = hex[r >> 4]; - buf[i * 2 + 1] = hex[r & 0xF]; + buf[i * 2] = hextable[r >> 4]; + buf[i * 2 + 1] = hextable[r & 0xF]; } buf[i*2] = '\0'; } @@ -34,14 +36,9 @@ void ts_filename(const char *prefix, int num, char *buf) { } void upload_file(const char *local, const char *remote) { - // TODO + s3_worker_push(s3_upload_task(local, remote)); } void remove_remote(const char *remote) { - // TODO + s3_worker_push(s3_delete_task(remote)); } - -char ** list_file() { - // TODO - return NULL; -}
\ No newline at end of file @@ -11,6 +11,4 @@ void upload_file(const char *local, const char *remote); void remove_remote(const char *remote); -char ** list_file(); - #endif
\ No newline at end of file @@ -1,8 +1,11 @@ +#include <pthread.h> #include <stdio.h> +#include <stdlib.h> #include "rtmpserver.h" #include "ringbuf.h" #include "transcode_talker.h" +#include "s3_worker.h" typedef struct { RingBuffer *ringbuf; @@ -65,8 +68,12 @@ int main() { }; TranscodeTalker_init(&main_ctx.transcode_talker); + s3_worker_init(); + pthread_t transmux_thread; pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker); + pthread_t s3worker_thread; + pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL); start_rtmpserver(rtmp_cbs, &main_ctx); return 0; @@ -46,8 +46,8 @@ size_t RingBuffer_space(RingBuffer *self) { void RingBuffer_end(RingBuffer *self) { pthread_mutex_lock(&self->mutex); self->finished_flag = true; - pthread_mutex_unlock(&self->mutex); pthread_cond_signal(&self->not_empty_cond); + pthread_mutex_unlock(&self->mutex); } size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len) { diff --git a/rtmpserver.h b/rtmpserver.h index cae56a5..c9383b4 100644 --- a/rtmpserver.h +++ b/rtmpserver.h @@ -1,13 +1,13 @@ #ifndef RTMPSERVER_H_ #define RTMPSERVER_H_ -#include <stdint.h> -#include <stdlib.h> - #ifdef __cplusplus extern "C" { #endif +#include <stdint.h> +#include <stdlib.h> + typedef struct { void (*on_start)(void *ctx); void (*on_stop)(void *ctx); diff --git a/s3_worker.c b/s3_worker.c new file mode 100644 index 0000000..5f21155 --- /dev/null +++ b/s3_worker.c @@ -0,0 +1,72 @@ +#include "s3_worker.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "task_queue.h" + +TaskQueue task_queue; + +void exec_s3_task(void *vtask) { + char obj_name_buf[256]; + S3Task *task = vtask; + if (task->task_type == kUploadTask) { + // TODO + } else if (task->task_type == kDeleteTask) { + // TODO + } else if (task->task_type == kClearTask) { + // TODO + } else { + fprintf(stderr, "unknown task type.\n"); + } + free(task->local_file); + free(task->remote_name); + free(task); +} + +void s3_worker_init() { + s3client_init(); + TaskQueue_init(&task_queue, 128); +} + +void s3_worker_push(S3Task task) { + S3Task *ptask = malloc(sizeof(S3Task)); + *ptask = task; + ptask->local_file = ptask->local_file; + ptask->remote_name = ptask->remote_name; + TaskQueue_push(&task_queue, exec_s3_task, ptask); +} + +void* s3_worker_main(void *ctx) { + while (1) { + TaskFn task_fn; + void *arg; + TaskQueue_pop(&task_queue, &task_fn, &arg); + (*task_fn)(arg); + } +} + +S3Task s3_upload_task(const char *local, const char *remote) { + return (S3Task) { + .task_type = kUploadTask, + .local_file = strdup(local), + .remote_name = strdup(remote), + }; +} + +S3Task s3_delete_task(const char *name) { + return (S3Task) { + .task_type = kDeleteTask, + .local_file = NULL, + .remote_name = strdup(name), + }; +} + +S3Task s3_clear_task() { + return (S3Task) { + .task_type = kClearTask, + .local_file = NULL, + .remote_name = NULL, + }; +}
\ No newline at end of file diff --git a/s3_worker.h b/s3_worker.h new file mode 100644 index 0000000..7d3d209 --- /dev/null +++ b/s3_worker.h @@ -0,0 +1,46 @@ +#ifndef S3_WORKER_H_ +#define S3_WORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern void *s3client; + +typedef enum { + kUploadTask, + kDeleteTask, + kClearTask, +} S3TaskType; + +typedef struct { + S3TaskType task_type; + char *local_file; + char *remote_name; +} S3Task; + +S3Task s3_upload_task(const char *local, const char *remote); + +S3Task s3_delete_task(const char *name); + +S3Task s3_clear_task(); + +void exec_s3_task(void *); + +void s3_worker_init(); + +void* s3_worker_main(void *); + +void s3_worker_push(S3Task task); + +void s3client_init(); + +void s3client_put(const char *filename, const char *object_name); + +void s3client_delete(const char *object_name); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp new file mode 100644 index 0000000..ccfe7e0 --- /dev/null +++ b/s3_worker_impl.cpp @@ -0,0 +1,19 @@ +#include "s3_worker.h" + +#include <aws/core/Aws.h> +#include <aws/s3/S3Client.h> +#include <aws/s3/model/PutObjectRequest.h> + +void *s3client; + +void s3client_init() { + +} + +void s3client_put(const char *filename, const char *object_name) { + +} + +void s3client_delete(const char *object_name) { + +}
\ No newline at end of file diff --git a/task_queue.c b/task_queue.c new file mode 100644 index 0000000..5708fdc --- /dev/null +++ b/task_queue.c @@ -0,0 +1,49 @@ +#include "task_queue.h" + +#include <stdlib.h> + +void TaskQueue_init(TaskQueue *self, int capacity) { + self->tasks = malloc(sizeof(TaskFn) * capacity); + self->args = malloc(sizeof(void*) * capacity); + self->capacity = capacity; + self->front = 0; + self->end = 0; + self->size = 0; + pthread_mutex_init(&self->lock, NULL); + pthread_cond_init(&self->not_full, NULL); + pthread_cond_init(&self->not_empty, NULL); +} + +void TaskQueue_destroy(TaskQueue *self) { + free(self->tasks); + free(self->args); + pthread_mutex_destroy(&self->lock); + pthread_cond_destroy(&self->not_full); + pthread_cond_destroy(&self->not_empty); +} + +void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg) { + pthread_mutex_lock(&self->lock); + while (self->size == self->capacity) { + pthread_cond_wait(&self->not_full, &self->lock); + } + self->tasks[self->end] = task; + self->args[self->end] = arg; + self->end = (self->end + 1) % self->capacity; + self->size++; + pthread_cond_signal(&self->not_empty); + pthread_mutex_unlock(&self->lock); +} + +void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg) { + pthread_mutex_lock(&self->lock); + while (self->size == 0) { + pthread_cond_wait(&self->not_empty, &self->lock); + } + *task = self->tasks[self->front]; + *arg = self->args[self->front]; + self->front = (self->front + 1) % self->capacity; + self->size--; + pthread_cond_signal(&self->not_full); + pthread_mutex_unlock(&self->lock); +} diff --git a/task_queue.h b/task_queue.h new file mode 100644 index 0000000..2843fb9 --- /dev/null +++ b/task_queue.h @@ -0,0 +1,28 @@ +#ifndef TASK_QUEUE_H_ +#define TASK_QUEUE_H_ + +#include <pthread.h> + +typedef void (*TaskFn)(void *arg); + +typedef struct { + TaskFn *tasks; + void **args; + int capacity; + int front; + int end; + int size; + pthread_mutex_t lock; + pthread_cond_t not_full; + pthread_cond_t not_empty; +} TaskQueue; + +void TaskQueue_init(TaskQueue *self, int capacity); + +void TaskQueue_destroy(TaskQueue *self); + +void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg); + +void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg); + +#endif diff --git a/transcode_talker.c b/transcode_talker.c index ac325ad..3d22de5 100644 --- a/transcode_talker.c +++ b/transcode_talker.c @@ -292,6 +292,6 @@ void TranscodeTalker_init(TranscodeTalker *self) { void TranscodeTalker_new_stream(TranscodeTalker *self, RingBuffer *ringbuf) { pthread_mutex_lock(&self->lock); self->stream = ringbuf; - pthread_mutex_unlock(&self->lock); pthread_cond_signal(&self->streaming_cond); + pthread_mutex_unlock(&self->lock); } |
