diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | config.c | 88 | ||||
| -rw-r--r-- | config.h | 6 | ||||
| -rw-r--r-- | main.c | 1 | ||||
| -rw-r--r-- | s3_client.cpp | 83 | ||||
| -rw-r--r-- | s3_client.h | 18 | ||||
| -rw-r--r-- | s3_worker.c | 10 | ||||
| -rw-r--r-- | s3_worker.h | 16 | ||||
| -rw-r--r-- | s3_worker_impl.cpp | 19 |
9 files changed, 204 insertions, 41 deletions
@@ -2,7 +2,9 @@ CC := gcc CXX := g++ CFLAGS := -g -Wall CXXFLAGS := -g -Wall -std=c++14 -LDFLAGS := -g -lavformat -lavutil -lavcodec +LDFLAGS := -g \ + -lavformat -lavutil -lavcodec \ + -laws-cpp-sdk-core -laws-cpp-sdk-s3 C_SOURCES := $(shell find . -maxdepth 1 -name '*.c') CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp') @@ -1,12 +1,100 @@ #include "config.h" +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <ctype.h> + EZLiveConfig *ezlive_config; +static char *trim(char *s) { + char *end; + while (isspace((unsigned char)*s)) s++; + if (*s == 0) return s; + end = s + strlen(s) - 1; + while (end > s && isspace((unsigned char)*end)) end--; + end[1] = '\0'; + return s; +} void EZLiveConfig_init(EZLiveConfig *self) { + if (!self) return; + self->listening_addr = strdup("127.0.0.1"); + self->listening_port = 1935; + self->bucket = NULL; + self->endpoint = NULL; + self->s3_path = NULL; + self->access_key = NULL; + self->secret_key = NULL; + self->web_endpoint = NULL; + self->region = strdup("auto"); +} +static void set_field(const char **field, const char *value) { + if (*field) { + free((void *)*field); + } + *field = strdup(value); } void EZLiveConfig_load(EZLiveConfig *self, const char *filename) { + if (!self || !filename) return; + + FILE *fp = fopen(filename, "r"); + if (!fp) { + perror("fopen"); + return; + } + + char line[1024]; + while (fgets(line, sizeof(line), fp)) { + line[strcspn(line, "\n")] = 0; + char *hash = strchr(line, '#'); + if (hash) *hash = 0; + char *trimmed = trim(line); + if (*trimmed == 0) continue; + + char *eq = strchr(trimmed, '='); + if (!eq) continue; + + *eq = 0; + char *key = trim(trimmed); + char *val = trim(eq + 1); + + if (strcmp(key, "listening_addr") == 0) { + set_field(&self->listening_addr, val); + } else if (strcmp(key, "listening_port") == 0) { + self->listening_port = atoi(val); + } else if (strcmp(key, "bucket") == 0) { + set_field(&self->bucket, val); + } else if (strcmp(key, "endpoint") == 0) { + set_field(&self->endpoint, val); + } else if (strcmp(key, "s3_path") == 0) { + set_field(&self->s3_path, val); + } else if (strcmp(key, "access_key") == 0) { + set_field(&self->access_key, val); + } else if (strcmp(key, "secret_key") == 0) { + set_field(&self->secret_key, val); + } else if (strcmp(key, "web_endpoint") == 0) { + set_field(&self->web_endpoint, val); + } else if (strcmp(key, "region") == 0) { + set_field(&self->region, val); + } + } + + fclose(fp); } + +int EZLiveConfig_validate(EZLiveConfig *self) { + if (!self) return -1; + if (!self->listening_addr || strlen(self->listening_addr) == 0) return -2; + if (!self->bucket || strlen(self->bucket) == 0) return -3; + if (!self->endpoint || strlen(self->endpoint) == 0) return -4; + if (!self->s3_path || strlen(self->s3_path) == 0) return -5; + if (!self->access_key || strlen(self->access_key) == 0) return -6; + if (!self->secret_key || strlen(self->secret_key) == 0) return -7; + if (self->listening_port <= 0 || self->listening_port > 65535) return -8; + if (!self->region || strlen(self->region) == 0) return -9; + return 0; +}
\ No newline at end of file @@ -3,18 +3,20 @@ typedef struct { const char *listening_addr; - int port; + int listening_port; const char *bucket; const char *endpoint; - const char *dir; + const char *s3_path; const char *access_key; const char *secret_key; const char *web_endpoint; + const char *region; } EZLiveConfig; extern EZLiveConfig *ezlive_config; void EZLiveConfig_init(EZLiveConfig *self); void EZLiveConfig_load(EZLiveConfig *self, const char *filename); +int EZLiveConfig_validate(EZLiveConfig *self); #endif
\ No newline at end of file @@ -69,6 +69,7 @@ int main() { TranscodeTalker_init(&main_ctx.transcode_talker); s3_worker_init(); + s3_worker_push(s3_clear_task()); pthread_t transmux_thread; pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker); diff --git a/s3_client.cpp b/s3_client.cpp new file mode 100644 index 0000000..aeb3510 --- /dev/null +++ b/s3_client.cpp @@ -0,0 +1,83 @@ +#include "s3_client.h" + +#include <fstream> +#include <iostream> + +#include <aws/core/Aws.h> +#include <aws/s3/S3Client.h> +#include <aws/s3/model/PutObjectRequest.h> +#include <aws/s3/model/DeleteObjectRequest.h> +#include <aws/core/auth/AWSCredentials.h> + +#include "config.h" + +#include <unistd.h> + +namespace { + +Aws::S3::S3Client *s3client; +Aws::SDKOptions aws_options; +Aws::S3::S3ClientConfiguration config; +Aws::Auth::AWSCredentials credentials; + +} + +void S3Client_init() { + Aws::InitAPI(aws_options); + config.endpointOverride = ezlive_config->endpoint; + config.region = ezlive_config->region; + credentials = Aws::Auth::AWSCredentials(ezlive_config->access_key, ezlive_config->secret_key); + s3client = new Aws::S3::S3Client(credentials, nullptr, config); +} + +void S3Client_put(const char *filename, const char *object_name) { + while (1) { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(ezlive_config->bucket); + //We are using the name of the file as the key for the object in the bucket. + //However, this is just a string and can be set according to your retrieval needs. + request.SetKey(object_name); + + std::shared_ptr<Aws::IOStream> inputData = + std::make_shared<Aws::FStream>(filename, std::ios_base::in | std::ios_base::binary); + + if (!*inputData) { + std::cerr << "Error unable to read file " << filename << std::endl; + return; + } + + request.SetBody(inputData); + + Aws::S3::Model::PutObjectOutcome outcome = + s3client->PutObject(request); + + if (!outcome.IsSuccess()) { + std::cerr << "Error: putObject: " << + outcome.GetError().GetMessage() << std::endl; + sleep(3); + continue; + } else { + std::cout << "Added object '" << object_name << "' to bucket '" + << ezlive_config->bucket << "'."; + break; + } + } +} + +void S3Client_delete(const char *object_name) { + Aws::S3::Model::DeleteObjectRequest request; + + request.WithKey(object_name) + .WithBucket(ezlive_config->bucket); + + Aws::S3::Model::DeleteObjectOutcome outcome = + s3client->DeleteObject(request); + + if (!outcome.IsSuccess()) { + auto err = outcome.GetError(); + std::cerr << "Error: deleteObject: " << + err.GetExceptionName() << ": " << err.GetMessage() << std::endl; + } else { + std::cout << "Successfully deleted the object: " << object_name << std::endl; + } +}
\ No newline at end of file diff --git a/s3_client.h b/s3_client.h new file mode 100644 index 0000000..43d4208 --- /dev/null +++ b/s3_client.h @@ -0,0 +1,18 @@ +#ifndef S3_CLIENT_H_ +#define S3_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +void S3Client_init(); + +void S3Client_put(const char *filename, const char *object_name); + +void S3Client_delete(const char *object_name); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/s3_worker.c b/s3_worker.c index 5f21155..15fdb09 100644 --- a/s3_worker.c +++ b/s3_worker.c @@ -4,7 +4,9 @@ #include <stdlib.h> #include <string.h> +#include "config.h" #include "task_queue.h" +#include "s3_client.h" TaskQueue task_queue; @@ -12,9 +14,11 @@ void exec_s3_task(void *vtask) { char obj_name_buf[256]; S3Task *task = vtask; if (task->task_type == kUploadTask) { - // TODO + snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name); + S3Client_put(task->local_file, obj_name_buf); } else if (task->task_type == kDeleteTask) { - // TODO + snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name); + S3Client_delete(obj_name_buf); } else if (task->task_type == kClearTask) { // TODO } else { @@ -26,7 +30,7 @@ void exec_s3_task(void *vtask) { } void s3_worker_init() { - s3client_init(); + S3Client_init(); TaskQueue_init(&task_queue, 128); } diff --git a/s3_worker.h b/s3_worker.h index 7d3d209..5e88ef5 100644 --- a/s3_worker.h +++ b/s3_worker.h @@ -1,12 +1,6 @@ #ifndef S3_WORKER_H_ #define S3_WORKER_H_ -#ifdef __cplusplus -extern "C" { -#endif - -extern void *s3client; - typedef enum { kUploadTask, kDeleteTask, @@ -33,14 +27,4 @@ void* s3_worker_main(void *); void s3_worker_push(S3Task task); -void s3client_init(); - -void s3client_put(const char *filename, const char *object_name); - -void s3client_delete(const char *object_name); - -#ifdef __cplusplus -} -#endif - #endif diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp deleted file mode 100644 index ccfe7e0..0000000 --- a/s3_worker_impl.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "s3_worker.h" - -#include <aws/core/Aws.h> -#include <aws/s3/S3Client.h> -#include <aws/s3/model/PutObjectRequest.h> - -void *s3client; - -void s3client_init() { - -} - -void s3client_put(const char *filename, const char *object_name) { - -} - -void s3client_delete(const char *object_name) { - -}
\ No newline at end of file |
