aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--config.c88
-rw-r--r--config.h6
-rw-r--r--main.c1
-rw-r--r--s3_client.cpp83
-rw-r--r--s3_client.h18
-rw-r--r--s3_worker.c10
-rw-r--r--s3_worker.h16
-rw-r--r--s3_worker_impl.cpp19
9 files changed, 204 insertions, 41 deletions
diff --git a/Makefile b/Makefile
index a454c34..e7c5cc3 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,9 @@ CC := gcc
CXX := g++
CFLAGS := -g -Wall
CXXFLAGS := -g -Wall -std=c++14
-LDFLAGS := -g -lavformat -lavutil -lavcodec
+LDFLAGS := -g \
+ -lavformat -lavutil -lavcodec \
+ -laws-cpp-sdk-core -laws-cpp-sdk-s3
C_SOURCES := $(shell find . -maxdepth 1 -name '*.c')
CPP_SOURCES := $(shell find . -maxdepth 1 -name '*.cpp')
diff --git a/config.c b/config.c
index 11d82d8..458c79c 100644
--- a/config.c
+++ b/config.c
@@ -1,12 +1,100 @@
#include "config.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+
EZLiveConfig *ezlive_config;
+static char *trim(char *s) {
+ char *end;
+ while (isspace((unsigned char)*s)) s++;
+ if (*s == 0) return s;
+ end = s + strlen(s) - 1;
+ while (end > s && isspace((unsigned char)*end)) end--;
+ end[1] = '\0';
+ return s;
+}
void EZLiveConfig_init(EZLiveConfig *self) {
+ if (!self) return;
+ self->listening_addr = strdup("127.0.0.1");
+ self->listening_port = 1935;
+ self->bucket = NULL;
+ self->endpoint = NULL;
+ self->s3_path = NULL;
+ self->access_key = NULL;
+ self->secret_key = NULL;
+ self->web_endpoint = NULL;
+ self->region = strdup("auto");
+}
+static void set_field(const char **field, const char *value) {
+ if (*field) {
+ free((void *)*field);
+ }
+ *field = strdup(value);
}
void EZLiveConfig_load(EZLiveConfig *self, const char *filename) {
+ if (!self || !filename) return;
+
+ FILE *fp = fopen(filename, "r");
+ if (!fp) {
+ perror("fopen");
+ return;
+ }
+
+ char line[1024];
+ while (fgets(line, sizeof(line), fp)) {
+ line[strcspn(line, "\n")] = 0;
+ char *hash = strchr(line, '#');
+ if (hash) *hash = 0;
+ char *trimmed = trim(line);
+ if (*trimmed == 0) continue;
+
+ char *eq = strchr(trimmed, '=');
+ if (!eq) continue;
+
+ *eq = 0;
+ char *key = trim(trimmed);
+ char *val = trim(eq + 1);
+
+ if (strcmp(key, "listening_addr") == 0) {
+ set_field(&self->listening_addr, val);
+ } else if (strcmp(key, "listening_port") == 0) {
+ self->listening_port = atoi(val);
+ } else if (strcmp(key, "bucket") == 0) {
+ set_field(&self->bucket, val);
+ } else if (strcmp(key, "endpoint") == 0) {
+ set_field(&self->endpoint, val);
+ } else if (strcmp(key, "s3_path") == 0) {
+ set_field(&self->s3_path, val);
+ } else if (strcmp(key, "access_key") == 0) {
+ set_field(&self->access_key, val);
+ } else if (strcmp(key, "secret_key") == 0) {
+ set_field(&self->secret_key, val);
+ } else if (strcmp(key, "web_endpoint") == 0) {
+ set_field(&self->web_endpoint, val);
+ } else if (strcmp(key, "region") == 0) {
+ set_field(&self->region, val);
+ }
+ }
+
+ fclose(fp);
}
+
+int EZLiveConfig_validate(EZLiveConfig *self) {
+ if (!self) return -1;
+ if (!self->listening_addr || strlen(self->listening_addr) == 0) return -2;
+ if (!self->bucket || strlen(self->bucket) == 0) return -3;
+ if (!self->endpoint || strlen(self->endpoint) == 0) return -4;
+ if (!self->s3_path || strlen(self->s3_path) == 0) return -5;
+ if (!self->access_key || strlen(self->access_key) == 0) return -6;
+ if (!self->secret_key || strlen(self->secret_key) == 0) return -7;
+ if (self->listening_port <= 0 || self->listening_port > 65535) return -8;
+ if (!self->region || strlen(self->region) == 0) return -9;
+ return 0;
+} \ No newline at end of file
diff --git a/config.h b/config.h
index 531a31e..58b3f7d 100644
--- a/config.h
+++ b/config.h
@@ -3,18 +3,20 @@
typedef struct {
const char *listening_addr;
- int port;
+ int listening_port;
const char *bucket;
const char *endpoint;
- const char *dir;
+ const char *s3_path;
const char *access_key;
const char *secret_key;
const char *web_endpoint;
+ const char *region;
} EZLiveConfig;
extern EZLiveConfig *ezlive_config;
void EZLiveConfig_init(EZLiveConfig *self);
void EZLiveConfig_load(EZLiveConfig *self, const char *filename);
+int EZLiveConfig_validate(EZLiveConfig *self);
#endif \ No newline at end of file
diff --git a/main.c b/main.c
index 90281c4..adc20c6 100644
--- a/main.c
+++ b/main.c
@@ -69,6 +69,7 @@ int main() {
TranscodeTalker_init(&main_ctx.transcode_talker);
s3_worker_init();
+ s3_worker_push(s3_clear_task());
pthread_t transmux_thread;
pthread_create(&transmux_thread, NULL, &TranscodeTalker_main, &main_ctx.transcode_talker);
diff --git a/s3_client.cpp b/s3_client.cpp
new file mode 100644
index 0000000..aeb3510
--- /dev/null
+++ b/s3_client.cpp
@@ -0,0 +1,83 @@
+#include "s3_client.h"
+
+#include <fstream>
+#include <iostream>
+
+#include <aws/core/Aws.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/PutObjectRequest.h>
+#include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/core/auth/AWSCredentials.h>
+
+#include "config.h"
+
+#include <unistd.h>
+
+namespace {
+
+Aws::S3::S3Client *s3client;
+Aws::SDKOptions aws_options;
+Aws::S3::S3ClientConfiguration config;
+Aws::Auth::AWSCredentials credentials;
+
+}
+
+void S3Client_init() {
+ Aws::InitAPI(aws_options);
+ config.endpointOverride = ezlive_config->endpoint;
+ config.region = ezlive_config->region;
+ credentials = Aws::Auth::AWSCredentials(ezlive_config->access_key, ezlive_config->secret_key);
+ s3client = new Aws::S3::S3Client(credentials, nullptr, config);
+}
+
+void S3Client_put(const char *filename, const char *object_name) {
+ while (1) {
+ Aws::S3::Model::PutObjectRequest request;
+ request.SetBucket(ezlive_config->bucket);
+ //We are using the name of the file as the key for the object in the bucket.
+ //However, this is just a string and can be set according to your retrieval needs.
+ request.SetKey(object_name);
+
+ std::shared_ptr<Aws::IOStream> inputData =
+ std::make_shared<Aws::FStream>(filename, std::ios_base::in | std::ios_base::binary);
+
+ if (!*inputData) {
+ std::cerr << "Error unable to read file " << filename << std::endl;
+ return;
+ }
+
+ request.SetBody(inputData);
+
+ Aws::S3::Model::PutObjectOutcome outcome =
+ s3client->PutObject(request);
+
+ if (!outcome.IsSuccess()) {
+ std::cerr << "Error: putObject: " <<
+ outcome.GetError().GetMessage() << std::endl;
+ sleep(3);
+ continue;
+ } else {
+ std::cout << "Added object '" << object_name << "' to bucket '"
+ << ezlive_config->bucket << "'.";
+ break;
+ }
+ }
+}
+
+void S3Client_delete(const char *object_name) {
+ Aws::S3::Model::DeleteObjectRequest request;
+
+ request.WithKey(object_name)
+ .WithBucket(ezlive_config->bucket);
+
+ Aws::S3::Model::DeleteObjectOutcome outcome =
+ s3client->DeleteObject(request);
+
+ if (!outcome.IsSuccess()) {
+ auto err = outcome.GetError();
+ std::cerr << "Error: deleteObject: " <<
+ err.GetExceptionName() << ": " << err.GetMessage() << std::endl;
+ } else {
+ std::cout << "Successfully deleted the object: " << object_name << std::endl;
+ }
+} \ No newline at end of file
diff --git a/s3_client.h b/s3_client.h
new file mode 100644
index 0000000..43d4208
--- /dev/null
+++ b/s3_client.h
@@ -0,0 +1,18 @@
+#ifndef S3_CLIENT_H_
+#define S3_CLIENT_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void S3Client_init();
+
+void S3Client_put(const char *filename, const char *object_name);
+
+void S3Client_delete(const char *object_name);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/s3_worker.c b/s3_worker.c
index 5f21155..15fdb09 100644
--- a/s3_worker.c
+++ b/s3_worker.c
@@ -4,7 +4,9 @@
#include <stdlib.h>
#include <string.h>
+#include "config.h"
#include "task_queue.h"
+#include "s3_client.h"
TaskQueue task_queue;
@@ -12,9 +14,11 @@ void exec_s3_task(void *vtask) {
char obj_name_buf[256];
S3Task *task = vtask;
if (task->task_type == kUploadTask) {
- // TODO
+ snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name);
+ S3Client_put(task->local_file, obj_name_buf);
} else if (task->task_type == kDeleteTask) {
- // TODO
+ snprintf(obj_name_buf, 255, "%s%s", ezlive_config->s3_path, task->remote_name);
+ S3Client_delete(obj_name_buf);
} else if (task->task_type == kClearTask) {
// TODO
} else {
@@ -26,7 +30,7 @@ void exec_s3_task(void *vtask) {
}
void s3_worker_init() {
- s3client_init();
+ S3Client_init();
TaskQueue_init(&task_queue, 128);
}
diff --git a/s3_worker.h b/s3_worker.h
index 7d3d209..5e88ef5 100644
--- a/s3_worker.h
+++ b/s3_worker.h
@@ -1,12 +1,6 @@
#ifndef S3_WORKER_H_
#define S3_WORKER_H_
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-extern void *s3client;
-
typedef enum {
kUploadTask,
kDeleteTask,
@@ -33,14 +27,4 @@ void* s3_worker_main(void *);
void s3_worker_push(S3Task task);
-void s3client_init();
-
-void s3client_put(const char *filename, const char *object_name);
-
-void s3client_delete(const char *object_name);
-
-#ifdef __cplusplus
-}
-#endif
-
#endif
diff --git a/s3_worker_impl.cpp b/s3_worker_impl.cpp
deleted file mode 100644
index ccfe7e0..0000000
--- a/s3_worker_impl.cpp
+++ /dev/null
@@ -1,19 +0,0 @@
-#include "s3_worker.h"
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/PutObjectRequest.h>
-
-void *s3client;
-
-void s3client_init() {
-
-}
-
-void s3client_put(const char *filename, const char *object_name) {
-
-}
-
-void s3client_delete(const char *object_name) {
-
-} \ No newline at end of file