From 35c8f8e94f0346856130b2a96a7c99790796e53f Mon Sep 17 00:00:00 2001 From: Mistivia Date: Sun, 14 Sep 2025 00:30:41 +0800 Subject: add aws s3 client --- Makefile | 4 ++- config.c | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ config.h | 6 ++-- main.c | 1 + s3_client.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++ s3_client.h | 18 +++++++++++ s3_worker.c | 10 +++++-- s3_worker.h | 16 ---------- s3_worker_impl.cpp | 19 ------------ 9 files changed, 204 insertions(+), 41 deletions(-) create mode 100644 s3_client.cpp create mode 100644 s3_client.h delete mode 100644 s3_worker_impl.cpp diff --git a/Makefile b/Makefile index a454c34..e7c5cc3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,9 @@ CC := gcc CXX := g++ CFLAGS := -g -Wall CXXFLAGS := -g -Wall -std=c++14 -LDFLAGS := -g -lavformat -lavutil -lavcodec +LDFLAGS := -g \ + -lavformat -lavutil -lavcodec \ + -laws-cpp-sdk-core -laws-cpp-sdk-s3 C_SOURCES := $(shell find . -maxdepth 1 -name '*.c') CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp') diff --git a/config.c b/config.c index 11d82d8..458c79c 100644 --- a/config.c +++ b/config.c @@ -1,12 +1,100 @@ #include "config.h" +#include +#include +#include +#include + EZLiveConfig *ezlive_config; +static char *trim(char *s) { + char *end; + while (isspace((unsigned char)*s)) s++; + if (*s == 0) return s; + end = s + strlen(s) - 1; + while (end > s && isspace((unsigned char)*end)) end--; + end[1] = '\0'; + return s; +} void EZLiveConfig_init(EZLiveConfig *self) { + if (!self) return; + self->listening_addr = strdup("127.0.0.1"); + self->listening_port = 1935; + self->bucket = NULL; + self->endpoint = NULL; + self->s3_path = NULL; + self->access_key = NULL; + self->secret_key = NULL; + self->web_endpoint = NULL; + self->region = strdup("auto"); +} +static void set_field(const char **field, const char *value) { + if (*field) { + free((void *)*field); + } + *field = strdup(value); } void EZLiveConfig_load(EZLiveConfig *self, const char *filename) { + if (!self || !filename) return; + + FILE *fp = fopen(filename, "r"); + if (!fp) { + perror("fopen"); + return; + } + + char line[1024]; + while (fgets(line, sizeof(line), fp)) { + line[strcspn(line, "\n")] = 0; + char *hash = strchr(line, '#'); + if (hash) *hash = 0; + char *trimmed = trim(line); + if (*trimmed == 0) continue; + + char *eq = strchr(trimmed, '='); + if (!eq) continue; + + *eq = 0; + char *key = trim(trimmed); + char *val = trim(eq + 1); + + if (strcmp(key, "listening_addr") == 0) { + set_field(&self->listening_addr, val); + } else if (strcmp(key, "listening_port") == 0) { + self->listening_port = atoi(val); + } else if (strcmp(key, "bucket") == 0) { + set_field(&self->bucket, val); + } else if (strcmp(key, "endpoint") == 0) { + set_field(&self->endpoint, val); + } else if (strcmp(key, "s3_path") == 0) { + set_field(&self->s3_path, val); + } else if (strcmp(key, "access_key") == 0) { + set_field(&self->access_key, val); + } else if (strcmp(key, "secret_key") == 0) { + set_field(&self->secret_key, val); + } else if (strcmp(key, "web_endpoint") == 0) { + set_field(&self->web_endpoint, val); + } else if (strcmp(key, "region") == 0) { + set_field(&self->region, val); + } + } + + fclose(fp); } + +int EZLiveConfig_validate(EZLiveConfig *self) { + if (!self) return -1; + if (!self->listening_addr || strlen(self->listening_addr) == 0) return -2; + if (!self->bucket || strlen(self->bucket) == 0) return -3; + if (!self->endpoint || strlen(self->endpoint) == 0) return -4; + if (!self->s3_path || strlen(self->s3_path) == 0) return -5; + if (!self->access_key || strlen(self->access_key) == 0) return -6; + if (!self->secret_key || strlen(self->secret_key) == 0) return -7; + if (self->listening_port <= 0 || self->listening_port > 65535) return -8; + if (!self->region || strlen(self->region) == 0) return -9; + return 0; +} \ No newline at end of file diff --git a/config.h b/config.h index 531a31e..58b3f7d 100644 --- a/config.h +++ b/config.h @@ -3,18 +3,20 @@ typedef struct { const char *listening_addr; - int port; + int listening_port; const char *bucket; const char *endpoint; - const char *dir; + const char *s3_path; const char *access_key; const char *secret_key; const char *web_endpoint; + const char *region; } EZLiveConfig; extern EZLiveConfig *ezlive_config; void EZLiveConfig_init(EZLiveConfig *self); void EZLiveConfig_load(EZLiveConfig *self, const char *filename); +int EZLiveConfig_validate(EZLiveConfig *self); #endif \ No newline at end of file diff --git a/main.c b/main.c index 90281c4..adc20c6 100644 --- a/main.c +++ b/main.c @@ -69,6 +69,7 @@ int main() { TranscodeTalker_init(&main_ctx.transcode_talker); s3_worker_init(); + s3_worker_push(s3_clear_task()); pthread_t transmux_thread; pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker); diff --git a/s3_client.cpp b/s3_client.cpp new file mode 100644 index 0000000..aeb3510 --- /dev/null +++ b/s3_client.cpp @@ -0,0 +1,83 @@ +#include "s3_client.h" + +#include +#include + +#include +#include +#include +#include +#include + +#include "config.h" + +#include + +namespace { + +Aws::S3::S3Client *s3client; +Aws::SDKOptions aws_options; +Aws::S3::S3ClientConfiguration config; +Aws::Auth::AWSCredentials credentials; + +} + +void S3Client_init() { + Aws::InitAPI(aws_options); + config.endpointOverride = ezlive_config->endpoint; + config.region = ezlive_config->region; + credentials = Aws::Auth::AWSCredentials(ezlive_config->access_key, ezlive_config->secret_key); + s3client = new Aws::S3::S3Client(credentials, nullptr, config); +} + +void S3Client_put(const char *filename, const char *object_name) { + while (1) { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(ezlive_config->bucket); + //We are using the name of the file as the key for the object in the bucket. + //However, this is just a string and can be set according to your retrieval needs. + request.SetKey(object_name); + + std::shared_ptr inputData = + std::make_shared(filename, std::ios_base::in | std::ios_base::binary); + + if (!*inputData) { + std::cerr << "Error unable to read file " << filename << std::endl; + return; + } + + request.SetBody(inputData); + + Aws::S3::Model::PutObjectOutcome outcome = + s3client->PutObject(request); + + if (!outcome.IsSuccess()) { + std::cerr << "Error: putObject: " << + outcome.GetError().GetMessage() << std::endl; + sleep(3); + continue; + } else { + std::cout << "Added object '" << object_name << "' to bucket '" + << ezlive_config->bucket << "'."; + break; + } + } +} + +void S3Client_delete(const char *object_name) { + Aws::S3::Model::DeleteObjectRequest request; + + request.WithKey(object_name) + .WithBucket(ezlive_config->bucket); + + Aws::S3::Model::DeleteObjectOutcome outcome = + s3client->DeleteObject(request); + + if (!outcome.IsSuccess()) { + auto err = outcome.GetError(); + std::cerr << "Error: deleteObject: " << + err.GetExceptionName() << ": " << err.GetMessage() << std::endl; + } else { + std::cout << "Successfully deleted the object: " << object_name << std::endl; + } +} \ No newline at end of file diff --git a/s3_client.h b/s3_client.h new file mode 100644 index 0000000..43d4208 --- /dev/null +++ b/s3_client.h @@ -0,0 +1,18 @@ +#ifndef S3_CLIENT_H_ +#define S3_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +void S3Client_init(); + +void S3Client_put(const char *filename, const char *object_name); + +void S3Client_delete(const char *object_name); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/s3_worker.c b/s3_worker.c index 5f21155..15fdb09 100644 --- a/s3_worker.c +++ b/s3_worker.c @@ -4,7 +4,9 @@ #include #include +#include "config.h" #include "task_queue.h" +#include "s3_client.h" TaskQueue task_queue; @@ -12,9 +14,11 @@ void exec_s3_task(void *vtask) { char obj_name_buf[256]; S3Task *task = vtask; if (task->task_type == kUploadTask) { - // TODO + snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name); + S3Client_put(task->local_file, obj_name_buf); } else if (task->task_type == kDeleteTask) { - // TODO + snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name); + S3Client_delete(obj_name_buf); } else if (task->task_type == kClearTask) { // TODO } else { @@ -26,7 +30,7 @@ void exec_s3_task(void *vtask) { } void s3_worker_init() { - s3client_init(); + S3Client_init(); TaskQueue_init(&task_queue, 128); } diff --git a/s3_worker.h b/s3_worker.h index 7d3d209..5e88ef5 100644 --- a/s3_worker.h +++ b/s3_worker.h @@ -1,12 +1,6 @@ #ifndef S3_WORKER_H_ #define S3_WORKER_H_ -#ifdef __cplusplus -extern "C" { -#endif - -extern void *s3client; - typedef enum { kUploadTask, kDeleteTask, @@ -33,14 +27,4 @@ void* s3_worker_main(void *); void s3_worker_push(S3Task task); -void s3client_init(); - -void s3client_put(const char *filename, const char *object_name); - -void s3client_delete(const char *object_name); - -#ifdef __cplusplus -} -#endif - #endif diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp deleted file mode 100644 index ccfe7e0..0000000 --- a/s3_worker_impl.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "s3_worker.h" - -#include -#include -#include - -void *s3client; - -void s3client_init() { - -} - -void s3client_put(const char *filename, const char *object_name) { - -} - -void s3client_delete(const char *object_name) { - -} \ No newline at end of file -- cgit v1.0