summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_fstore.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_fstore.c')
-rw-r--r--fluent-bit/src/flb_fstore.c558
1 files changed, 558 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_fstore.c b/fluent-bit/src/flb_fstore.c
new file mode 100644
index 00000000..03bcc99d
--- /dev/null
+++ b/fluent-bit/src/flb_fstore.c
@@ -0,0 +1,558 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_fstore.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_sds.h>
+#include <chunkio/chunkio.h>
+
+static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line,
+ char *str)
+{
+ if (level == CIO_LOG_ERROR) {
+ flb_error("[fstore] %s", str);
+ }
+ else if (level == CIO_LOG_WARN) {
+ flb_warn("[fstore] %s", str);
+ }
+ else if (level == CIO_LOG_INFO) {
+ flb_info("[fstore] %s", str);
+ }
+ else if (level == CIO_LOG_DEBUG) {
+ flb_debug("[fstore] %s", str);
+ }
+
+ return 0;
+}
+
+/*
+ * this function sets metadata into a fstore_file structure, note that it makes
+ * it own copy of the data to set a NULL byte at the end.
+ */
+static int meta_set(struct flb_fstore_file *fsf, void *meta, size_t size)
+{
+
+ char *p;
+
+ p = flb_calloc(1, size + 1);
+ if (!p) {
+ flb_errno();
+ flb_error("[fstore] could not cache metadata in file: %s:%s",
+ fsf->stream->name, fsf->chunk->name);
+ return -1;
+ }
+
+ if (fsf->meta_buf) {
+ flb_free(fsf->meta_buf);
+ }
+ fsf->meta_buf = p;
+ memcpy(fsf->meta_buf, meta, size);
+ fsf->meta_size = size;
+
+ return 0;
+}
+
+/* Set a file metadata */
+int flb_fstore_file_meta_set(struct flb_fstore *fs,
+ struct flb_fstore_file *fsf,
+ void *meta, size_t size)
+{
+ int ret;
+ int set_down = FLB_FALSE;
+
+ /* Check if the chunk is up */
+ if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
+ ret = cio_chunk_up_force(fsf->chunk);
+ if (ret != CIO_OK) {
+ flb_error("[fstore] error loading up file chunk");
+ return -1;
+ }
+ set_down = FLB_TRUE;
+ }
+
+ ret = cio_meta_write(fsf->chunk, meta, size);
+ if (ret == -1) {
+ flb_error("[fstore] could not write metadata to file: %s:%s",
+ fsf->stream->name, fsf->chunk->name);
+
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+
+ return -1;
+ }
+
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+
+ return meta_set(fsf, meta, size);
+}
+
+/* Re-read Chunk I/O metadata into fstore file */
+int flb_fstore_file_meta_get(struct flb_fstore *fs,
+ struct flb_fstore_file *fsf)
+{
+ int ret;
+ int set_down = FLB_FALSE;
+ char *meta_buf = NULL;
+ int meta_size = 0;
+
+ /* Check if the chunk is up */
+ if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
+ ret = cio_chunk_up_force(fsf->chunk);
+ if (ret != CIO_OK) {
+ flb_error("[fstore] error loading up file chunk");
+ return -1;
+ }
+ set_down = FLB_TRUE;
+ }
+
+ ret = cio_meta_read(fsf->chunk, &meta_buf, &meta_size);
+ if (ret == -1) {
+ flb_error("[fstore] error reading file chunk metadata");
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+ }
+
+ ret = meta_set(fsf, meta_buf, meta_size);
+ if (ret == -1) {
+ flb_free(meta_buf);
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+ return -1;
+ }
+
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+ return 0;
+}
+
+/* Create a new file */
+struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs,
+ struct flb_fstore_stream *fs_stream,
+ char *name, size_t size)
+{
+ int err;
+ struct cio_chunk *chunk;
+ struct flb_fstore_file *fsf;
+
+ fsf = flb_calloc(1, sizeof(struct flb_fstore_file));
+ if (!fsf) {
+ flb_errno();
+ return NULL;
+ }
+ fsf->stream = fs_stream->stream;
+
+ fsf->name = flb_sds_create(name);
+ if (!fsf->name) {
+ flb_error("[fstore] could not create file: %s:%s",
+ fsf->stream->name, name);
+ flb_free(fsf);
+ return NULL;
+ }
+
+ chunk = cio_chunk_open(fs->cio, fs_stream->stream, name,
+ CIO_OPEN, size, &err);
+ if (!chunk) {
+ flb_error("[fstore] could not create file: %s:%s",
+ fsf->stream->name, name);
+ flb_sds_destroy(fsf->name);
+ flb_free(fsf);
+ return NULL;
+ }
+
+ fsf->chunk = chunk;
+ mk_list_add(&fsf->_head, &fs_stream->files);
+
+ return fsf;
+}
+
+/* Lookup file on stream by using it name */
+struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs,
+ struct flb_fstore_stream *fs_stream,
+ char *name, size_t size)
+{
+ struct mk_list *head;
+ struct flb_fstore_file *fsf;
+
+ mk_list_foreach(head, &fs_stream->files) {
+ fsf = mk_list_entry(head, struct flb_fstore_file, _head);
+ if (flb_sds_len(fsf->name) != size) {
+ continue;
+ }
+
+ if (strncmp(fsf->name, name, size) == 0) {
+ return fsf;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Set a file to inactive mode. Inactive means just to remove the reference
+ * from the list.
+ */
+int flb_fstore_file_inactive(struct flb_fstore *fs,
+ struct flb_fstore_file *fsf)
+{
+ /* close the Chunk I/O reference, but don't delete the real file */
+ if (fsf->chunk) {
+ cio_chunk_close(fsf->chunk, CIO_FALSE);
+ }
+
+ /* release */
+ mk_list_del(&fsf->_head);
+ flb_sds_destroy(fsf->name);
+ if (fsf->meta_buf) {
+ flb_free(fsf->meta_buf);
+ }
+ flb_free(fsf);
+
+ return 0;
+}
+
+/* Delete a file (permantent deletion) */
+int flb_fstore_file_delete(struct flb_fstore *fs,
+ struct flb_fstore_file *fsf)
+{
+ /* close the Chunk I/O reference, but don't delete it the real file */
+ cio_chunk_close(fsf->chunk, CIO_TRUE);
+
+ /* release */
+ mk_list_del(&fsf->_head);
+ if (fsf->meta_buf) {
+ flb_free(fsf->meta_buf);
+ }
+ flb_sds_destroy(fsf->name);
+ flb_free(fsf);
+
+ return 0;
+}
+
+/*
+ * Set an output buffer that contains a copy of the file. Note that this buffer
+ * needs to be freed by the caller (heap memory).
+ */
+int flb_fstore_file_content_copy(struct flb_fstore *fs,
+ struct flb_fstore_file *fsf,
+ void **out_buf, size_t *out_size)
+{
+ int ret;
+
+ ret = cio_chunk_get_content_copy(fsf->chunk, out_buf, out_size);
+ if (ret == CIO_OK) {
+ return 0;
+ }
+
+ return -1;
+}
+
+/* Append data to an existing file */
+int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size)
+{
+ int ret;
+ int set_down = FLB_FALSE;
+
+ /* Check if the chunk is up */
+ if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
+ ret = cio_chunk_up_force(fsf->chunk);
+ if (ret != CIO_OK) {
+ flb_error("[fstore] error loading up file chunk");
+ return -1;
+ }
+ set_down = FLB_TRUE;
+ }
+
+ ret = cio_chunk_write(fsf->chunk, data, size);
+ if (ret != CIO_OK) {
+ flb_error("[fstore] could not write data to file %s", fsf->name);
+
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+
+ return -1;
+ }
+
+ if (set_down == FLB_TRUE) {
+ cio_chunk_down(fsf->chunk);
+ }
+
+ return 0;
+}
+
+/*
+ * Create a new stream, if it already exists, it returns the stream
+ * reference.
+ */
+struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs,
+ char *stream_name)
+{
+ flb_sds_t path = NULL;
+ struct mk_list *head;
+ struct cio_ctx *ctx = NULL;
+ struct cio_stream *stream = NULL;
+ struct flb_fstore_stream *fs_stream = NULL;
+
+ ctx = fs->cio;
+
+ /* Check if the stream already exists in Chunk I/O */
+ mk_list_foreach(head, &ctx->streams) {
+ stream = mk_list_entry(head, struct cio_stream, _head);
+ if (strcmp(stream->name, stream_name) == 0) {
+ break;
+ }
+ stream = NULL;
+ }
+
+ /* If the stream exists, check if we have a fstore_stream reference */
+ if (stream) {
+ mk_list_foreach(head, &fs->streams) {
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+ if (fs_stream->stream == stream) {
+ break;
+ }
+ fs_stream = NULL;
+ }
+
+ /* The stream was found, just return the reference */
+ if (fs_stream) {
+ return fs_stream;
+ }
+ }
+
+ if (!stream) {
+ /* create file-system based stream */
+ stream = cio_stream_create(fs->cio, stream_name, fs->store_type);
+ if (!stream) {
+ flb_error("[fstore] cannot create stream %s", stream_name);
+ return NULL;
+ }
+ }
+
+ fs_stream = flb_calloc(1, sizeof(struct flb_fstore_stream));
+ if (!fs_stream) {
+ flb_errno();
+ cio_stream_destroy(stream);
+ return NULL;
+ }
+ fs_stream->stream = stream;
+
+ path = flb_sds_create_size(256);
+ if (!path) {
+ cio_stream_destroy(stream);
+ flb_free(fs_stream);
+ return NULL;
+ }
+ path = flb_sds_printf(&path, "%s/%s", fs->root_path, stream->name);
+ fs_stream->path = path;
+ fs_stream->name = stream->name;
+
+ mk_list_init(&fs_stream->files);
+ mk_list_add(&fs_stream->_head, &fs->streams);
+
+ return fs_stream;
+}
+
+void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete)
+{
+ if (delete == FLB_TRUE) {
+ cio_stream_delete(stream->stream);
+ }
+
+ /*
+ * FYI: in this function we just release the fstore_stream context, the
+ * underlaying cio_stream is closed when the main Chunk I/O is destroyed.
+ */
+ mk_list_del(&stream->_head);
+ flb_sds_destroy(stream->path);
+ flb_free(stream);
+}
+
+static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_stream,
+ struct cio_stream *stream)
+{
+ struct mk_list *head;
+ struct cio_chunk *chunk;
+ struct flb_fstore_file *fsf;
+
+ mk_list_foreach(head, &stream->chunks) {
+ chunk = mk_list_entry(head, struct cio_chunk, _head);
+
+ fsf = flb_calloc(1, sizeof(struct flb_fstore_file));
+ if (!fsf) {
+ flb_errno();
+ return -1;
+ }
+ fsf->name = flb_sds_create(chunk->name);
+ if (!fsf->name) {
+ flb_free(fsf);
+ flb_error("[fstore] could not create file: %s:%s",
+ stream->name, chunk->name);
+ return -1;
+ }
+
+ fsf->chunk = chunk;
+
+ /* load metadata */
+ flb_fstore_file_meta_get(ctx, fsf);
+ mk_list_add(&fsf->_head, &fs_stream->files);
+ }
+
+ return 0;
+}
+
+static int load_references(struct flb_fstore *fs)
+{
+ int ret;
+ struct mk_list *head;
+ struct cio_stream *stream;
+ struct flb_fstore_stream *fs_stream;
+
+ mk_list_foreach(head, &fs->cio->streams) {
+ stream = mk_list_entry(head, struct cio_stream, _head);
+ fs_stream = flb_fstore_stream_create(fs, stream->name);
+ if (!fs_stream) {
+ flb_error("[fstore] error loading stream reference: %s",
+ stream->name);
+ return -1;
+ }
+
+ /* Map chunks */
+ ret = map_chunks(fs, fs_stream, stream);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+struct flb_fstore *flb_fstore_create(char *path, int store_type)
+{
+ int ret;
+ int flags;
+ struct cio_ctx *cio;
+ struct flb_fstore *fs;
+ struct cio_options opts = {0};
+ flags = CIO_OPEN;
+
+ /* Create Chunk I/O context */
+ cio_options_init(&opts);
+
+ opts.root_path = path;
+ opts.log_cb = log_cb;
+ opts.flags = flags;
+ opts.log_level = CIO_LOG_INFO;
+
+ cio = cio_create(&opts);
+ if (!cio) {
+ flb_error("[fstore] error initializing on path '%s'", path);
+ return NULL;
+ }
+
+ /* Load content from the file system if any */
+ ret = cio_load(cio, NULL);
+ if (ret == -1) {
+ flb_error("[fstore] error scanning root path content: %s", path);
+ cio_destroy(cio);
+ return NULL;
+ }
+
+ fs = flb_calloc(1, sizeof(struct flb_fstore));
+ if (!fs) {
+ flb_errno();
+ cio_destroy(cio);
+ return NULL;
+ }
+ fs->cio = cio;
+ fs->root_path = cio->options.root_path;
+ fs->store_type = store_type;
+ mk_list_init(&fs->streams);
+
+ /* Map Chunk I/O streams and chunks into fstore context */
+ load_references(fs);
+
+ return fs;
+}
+
+int flb_fstore_destroy(struct flb_fstore *fs)
+{
+ int files = 0;
+ int delete;
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct mk_list *tmp;
+ struct mk_list *f_tmp;
+ struct flb_fstore_stream *fs_stream;
+ struct flb_fstore_file *fsf;
+
+ mk_list_foreach_safe(head, tmp, &fs->streams) {
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+
+ /* delete file references */
+ files = 0;
+ mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
+ fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
+ flb_fstore_file_inactive(fs, fsf);
+ files++;
+ }
+
+ if (files == 0) {
+ delete = FLB_TRUE;
+ }
+ else {
+ delete = FLB_FALSE;
+ }
+
+ flb_fstore_stream_destroy(fs_stream, delete);
+ }
+
+ if (fs->cio) {
+ cio_destroy(fs->cio);
+ }
+ flb_free(fs);
+ return 0;
+}
+
+void flb_fstore_dump(struct flb_fstore *fs)
+{
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct flb_fstore_stream *fs_stream;
+ struct flb_fstore_file *fsf;
+
+ printf("===== FSTORE DUMP =====\n");
+ mk_list_foreach(head, &fs->streams) {
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+ printf("- stream: %s\n", fs_stream->name);
+ mk_list_foreach(f_head, &fs_stream->files) {
+ fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
+ printf(" %s/%s\n", fsf->stream->name, fsf->name);
+ }
+ }
+ printf("\n");
+}