aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMistivia <i@mistivia.com>2025-09-13 20:53:56 +0800
committerMistivia <i@mistivia.com>2025-09-13 20:53:56 +0800
commita4daf467f871b0e77f07f1071b47b960da7bfba9 (patch)
tree43b8847b395a90f5aadb57593c0d84e2c13cca7c
parentf3eeea1d7092f3ca98836035bf75b941d14c2067 (diff)
add s3 client
-rw-r--r--config.c12
-rw-r--r--config.h20
-rw-r--r--fsutils.c23
-rw-r--r--fsutils.h2
-rw-r--r--main.c7
-rw-r--r--ringbuf.c2
-rw-r--r--rtmpserver.h6
-rw-r--r--s3_worker.c72
-rw-r--r--s3_worker.h46
-rw-r--r--s3_worker_impl.cpp19
-rw-r--r--task_queue.c49
-rw-r--r--task_queue.h28
-rw-r--r--transcode_talker.c2
13 files changed, 268 insertions, 20 deletions
diff --git a/config.c b/config.c
new file mode 100644
index 0000000..11d82d8
--- /dev/null
+++ b/config.c
@@ -0,0 +1,12 @@
+#include "config.h"
+
+EZLiveConfig *ezlive_config;
+
+
+void EZLiveConfig_init(EZLiveConfig *self) {
+
+}
+
+void EZLiveConfig_load(EZLiveConfig *self, const char *filename) {
+
+}
diff --git a/config.h b/config.h
new file mode 100644
index 0000000..531a31e
--- /dev/null
+++ b/config.h
@@ -0,0 +1,20 @@
+#ifndef EZLIVE_CONFIG_H_
+#define EZLIVE_CONFIG_H_
+
+typedef struct {
+ const char *listening_addr;
+ int port;
+ const char *bucket;
+ const char *endpoint;
+ const char *dir;
+ const char *access_key;
+ const char *secret_key;
+ const char *web_endpoint;
+} EZLiveConfig;
+
+extern EZLiveConfig *ezlive_config;
+
+void EZLiveConfig_init(EZLiveConfig *self);
+void EZLiveConfig_load(EZLiveConfig *self, const char *filename);
+
+#endif \ No newline at end of file
diff --git a/fsutils.c b/fsutils.c
index 0c38eaf..f26f2b9 100644
--- a/fsutils.c
+++ b/fsutils.c
@@ -4,27 +4,29 @@
#include <stdlib.h>
#include <string.h>
+#include "s3_worker.h"
+
+const char hextable[] = "0123456789abcdef";
+
void tmp_local_filename(const char *prefix, char *buf) {
- static const char hex[] = "0123456789abcdef";
int prefix_len = strlen(prefix);
memcpy(buf, prefix, prefix_len);
buf = buf + prefix_len;
size_t i;
for (i = 0; i < 4; i++) {
unsigned char r = rand() & 0xFF;
- buf[i * 2] = hex[r >> 4];
- buf[i * 2 + 1] = hex[r & 0xF];
+ buf[i * 2] = hextable[r >> 4];
+ buf[i * 2 + 1] = hextable[r & 0xF];
}
buf[i*2] = '\0';
}
void tmp_ts_prefix(char *buf) {
- static const char hex[] = "0123456789abcdef";
size_t i;
for (i = 0; i < 4; i++) {
unsigned char r = rand() & 0xFF;
- buf[i * 2] = hex[r >> 4];
- buf[i * 2 + 1] = hex[r & 0xF];
+ buf[i * 2] = hextable[r >> 4];
+ buf[i * 2 + 1] = hextable[r & 0xF];
}
buf[i*2] = '\0';
}
@@ -34,14 +36,9 @@ void ts_filename(const char *prefix, int num, char *buf) {
}
void upload_file(const char *local, const char *remote) {
- // TODO
+ s3_worker_push(s3_upload_task(local, remote));
}
void remove_remote(const char *remote) {
- // TODO
+ s3_worker_push(s3_delete_task(remote));
}
-
-char ** list_file() {
- // TODO
- return NULL;
-} \ No newline at end of file
diff --git a/fsutils.h b/fsutils.h
index 24cd5ab..968bade 100644
--- a/fsutils.h
+++ b/fsutils.h
@@ -11,6 +11,4 @@ void upload_file(const char *local, const char *remote);
void remove_remote(const char *remote);
-char ** list_file();
-
#endif \ No newline at end of file
diff --git a/main.c b/main.c
index f7ab273..90281c4 100644
--- a/main.c
+++ b/main.c
@@ -1,8 +1,11 @@
+#include <pthread.h>
#include <stdio.h>
+#include <stdlib.h>
#include "rtmpserver.h"
#include "ringbuf.h"
#include "transcode_talker.h"
+#include "s3_worker.h"
typedef struct {
RingBuffer *ringbuf;
@@ -65,8 +68,12 @@ int main() {
};
TranscodeTalker_init(&main_ctx.transcode_talker);
+ s3_worker_init();
+
pthread_t transmux_thread;
pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker);
+ pthread_t s3worker_thread;
+ pthread_create(&s3worker_thread, NULL, &s3_worker_main, NULL);
start_rtmpserver(rtmp_cbs, &main_ctx);
return 0;
diff --git a/ringbuf.c b/ringbuf.c
index 2c58752..9f5fb86 100644
--- a/ringbuf.c
+++ b/ringbuf.c
@@ -46,8 +46,8 @@ size_t RingBuffer_space(RingBuffer *self) {
void RingBuffer_end(RingBuffer *self) {
pthread_mutex_lock(&self->mutex);
self->finished_flag = true;
- pthread_mutex_unlock(&self->mutex);
pthread_cond_signal(&self->not_empty_cond);
+ pthread_mutex_unlock(&self->mutex);
}
size_t RingBuffer_write(RingBuffer *self, const uint8_t *data, size_t len) {
diff --git a/rtmpserver.h b/rtmpserver.h
index cae56a5..c9383b4 100644
--- a/rtmpserver.h
+++ b/rtmpserver.h
@@ -1,13 +1,13 @@
#ifndef RTMPSERVER_H_
#define RTMPSERVER_H_
-#include <stdint.h>
-#include <stdlib.h>
-
#ifdef __cplusplus
extern "C" {
#endif
+#include <stdint.h>
+#include <stdlib.h>
+
typedef struct {
void (*on_start)(void *ctx);
void (*on_stop)(void *ctx);
diff --git a/s3_worker.c b/s3_worker.c
new file mode 100644
index 0000000..5f21155
--- /dev/null
+++ b/s3_worker.c
@@ -0,0 +1,72 @@
+#include "s3_worker.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "task_queue.h"
+
+TaskQueue task_queue;
+
+void exec_s3_task(void *vtask) {
+ char obj_name_buf[256];
+ S3Task *task = vtask;
+ if (task->task_type == kUploadTask) {
+ // TODO
+ } else if (task->task_type == kDeleteTask) {
+ // TODO
+ } else if (task->task_type == kClearTask) {
+ // TODO
+ } else {
+ fprintf(stderr, "unknown task type.\n");
+ }
+ free(task->local_file);
+ free(task->remote_name);
+ free(task);
+}
+
+void s3_worker_init() {
+ s3client_init();
+ TaskQueue_init(&task_queue, 128);
+}
+
+void s3_worker_push(S3Task task) {
+ S3Task *ptask = malloc(sizeof(S3Task));
+ *ptask = task;
+ ptask->local_file = ptask->local_file;
+ ptask->remote_name = ptask->remote_name;
+ TaskQueue_push(&task_queue, exec_s3_task, ptask);
+}
+
+void* s3_worker_main(void *ctx) {
+ while (1) {
+ TaskFn task_fn;
+ void *arg;
+ TaskQueue_pop(&task_queue, &task_fn, &arg);
+ (*task_fn)(arg);
+ }
+}
+
+S3Task s3_upload_task(const char *local, const char *remote) {
+ return (S3Task) {
+ .task_type = kUploadTask,
+ .local_file = strdup(local),
+ .remote_name = strdup(remote),
+ };
+}
+
+S3Task s3_delete_task(const char *name) {
+ return (S3Task) {
+ .task_type = kDeleteTask,
+ .local_file = NULL,
+ .remote_name = strdup(name),
+ };
+}
+
+S3Task s3_clear_task() {
+ return (S3Task) {
+ .task_type = kClearTask,
+ .local_file = NULL,
+ .remote_name = NULL,
+ };
+} \ No newline at end of file
diff --git a/s3_worker.h b/s3_worker.h
new file mode 100644
index 0000000..7d3d209
--- /dev/null
+++ b/s3_worker.h
@@ -0,0 +1,46 @@
+#ifndef S3_WORKER_H_
+#define S3_WORKER_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern void *s3client;
+
+typedef enum {
+ kUploadTask,
+ kDeleteTask,
+ kClearTask,
+} S3TaskType;
+
+typedef struct {
+ S3TaskType task_type;
+ char *local_file;
+ char *remote_name;
+} S3Task;
+
+S3Task s3_upload_task(const char *local, const char *remote);
+
+S3Task s3_delete_task(const char *name);
+
+S3Task s3_clear_task();
+
+void exec_s3_task(void *);
+
+void s3_worker_init();
+
+void* s3_worker_main(void *);
+
+void s3_worker_push(S3Task task);
+
+void s3client_init();
+
+void s3client_put(const char *filename, const char *object_name);
+
+void s3client_delete(const char *object_name);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp
new file mode 100644
index 0000000..ccfe7e0
--- /dev/null
+++ b/s3_worker_impl.cpp
@@ -0,0 +1,19 @@
+#include "s3_worker.h"
+
+#include <aws/core/Aws.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/PutObjectRequest.h>
+
+void *s3client;
+
+void s3client_init() {
+
+}
+
+void s3client_put(const char *filename, const char *object_name) {
+
+}
+
+void s3client_delete(const char *object_name) {
+
+} \ No newline at end of file
diff --git a/task_queue.c b/task_queue.c
new file mode 100644
index 0000000..5708fdc
--- /dev/null
+++ b/task_queue.c
@@ -0,0 +1,49 @@
+#include "task_queue.h"
+
+#include <stdlib.h>
+
+void TaskQueue_init(TaskQueue *self, int capacity) {
+ self->tasks = malloc(sizeof(TaskFn) * capacity);
+ self->args = malloc(sizeof(void*) * capacity);
+ self->capacity = capacity;
+ self->front = 0;
+ self->end = 0;
+ self->size = 0;
+ pthread_mutex_init(&self->lock, NULL);
+ pthread_cond_init(&self->not_full, NULL);
+ pthread_cond_init(&self->not_empty, NULL);
+}
+
+void TaskQueue_destroy(TaskQueue *self) {
+ free(self->tasks);
+ free(self->args);
+ pthread_mutex_destroy(&self->lock);
+ pthread_cond_destroy(&self->not_full);
+ pthread_cond_destroy(&self->not_empty);
+}
+
+void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg) {
+ pthread_mutex_lock(&self->lock);
+ while (self->size == self->capacity) {
+ pthread_cond_wait(&self->not_full, &self->lock);
+ }
+ self->tasks[self->end] = task;
+ self->args[self->end] = arg;
+ self->end = (self->end + 1) % self->capacity;
+ self->size++;
+ pthread_cond_signal(&self->not_empty);
+ pthread_mutex_unlock(&self->lock);
+}
+
+void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg) {
+ pthread_mutex_lock(&self->lock);
+ while (self->size == 0) {
+ pthread_cond_wait(&self->not_empty, &self->lock);
+ }
+ *task = self->tasks[self->front];
+ *arg = self->args[self->front];
+ self->front = (self->front + 1) % self->capacity;
+ self->size--;
+ pthread_cond_signal(&self->not_full);
+ pthread_mutex_unlock(&self->lock);
+}
diff --git a/task_queue.h b/task_queue.h
new file mode 100644
index 0000000..2843fb9
--- /dev/null
+++ b/task_queue.h
@@ -0,0 +1,28 @@
+#ifndef TASK_QUEUE_H_
+#define TASK_QUEUE_H_
+
+#include <pthread.h>
+
+typedef void (*TaskFn)(void *arg);
+
+typedef struct {
+ TaskFn *tasks;
+ void **args;
+ int capacity;
+ int front;
+ int end;
+ int size;
+ pthread_mutex_t lock;
+ pthread_cond_t not_full;
+ pthread_cond_t not_empty;
+} TaskQueue;
+
+void TaskQueue_init(TaskQueue *self, int capacity);
+
+void TaskQueue_destroy(TaskQueue *self);
+
+void TaskQueue_push(TaskQueue *self, TaskFn task, void *arg);
+
+void TaskQueue_pop(TaskQueue *self, TaskFn *task, void **arg);
+
+#endif
diff --git a/transcode_talker.c b/transcode_talker.c
index ac325ad..3d22de5 100644
--- a/transcode_talker.c
+++ b/transcode_talker.c
@@ -292,6 +292,6 @@ void TranscodeTalker_init(TranscodeTalker *self) {
void TranscodeTalker_new_stream(TranscodeTalker *self, RingBuffer *ringbuf) {
pthread_mutex_lock(&self->lock);
self->stream = ringbuf;
- pthread_mutex_unlock(&self->lock);
pthread_cond_signal(&self->streaming_cond);
+ pthread_mutex_unlock(&self->lock);
}