diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_fstore.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_fstore.c')
-rw-r--r-- | fluent-bit/src/flb_fstore.c | 558 |
1 files changed, 558 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_fstore.c b/fluent-bit/src/flb_fstore.c new file mode 100644 index 00000000..03bcc99d --- /dev/null +++ b/fluent-bit/src/flb_fstore.c @@ -0,0 +1,558 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_fstore.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_sds.h> +#include <chunkio/chunkio.h> + +static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line, + char *str) +{ + if (level == CIO_LOG_ERROR) { + flb_error("[fstore] %s", str); + } + else if (level == CIO_LOG_WARN) { + flb_warn("[fstore] %s", str); + } + else if (level == CIO_LOG_INFO) { + flb_info("[fstore] %s", str); + } + else if (level == CIO_LOG_DEBUG) { + flb_debug("[fstore] %s", str); + } + + return 0; +} + +/* + * this function sets metadata into a fstore_file structure, note that it makes + * it own copy of the data to set a NULL byte at the end. + */ +static int meta_set(struct flb_fstore_file *fsf, void *meta, size_t size) +{ + + char *p; + + p = flb_calloc(1, size + 1); + if (!p) { + flb_errno(); + flb_error("[fstore] could not cache metadata in file: %s:%s", + fsf->stream->name, fsf->chunk->name); + return -1; + } + + if (fsf->meta_buf) { + flb_free(fsf->meta_buf); + } + fsf->meta_buf = p; + memcpy(fsf->meta_buf, meta, size); + fsf->meta_size = size; + + return 0; +} + +/* Set a file metadata */ +int flb_fstore_file_meta_set(struct flb_fstore *fs, + struct flb_fstore_file *fsf, + void *meta, size_t size) +{ + int ret; + int set_down = FLB_FALSE; + + /* Check if the chunk is up */ + if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) { + ret = cio_chunk_up_force(fsf->chunk); + if (ret != CIO_OK) { + flb_error("[fstore] error loading up file chunk"); + return -1; + } + set_down = FLB_TRUE; + } + + ret = cio_meta_write(fsf->chunk, meta, size); + if (ret == -1) { + flb_error("[fstore] could not write metadata to file: %s:%s", + fsf->stream->name, fsf->chunk->name); + + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + + return -1; + } + + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + + return meta_set(fsf, meta, size); +} + +/* Re-read Chunk I/O metadata into fstore file */ +int flb_fstore_file_meta_get(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + int ret; + int set_down = FLB_FALSE; + char *meta_buf = NULL; + int meta_size = 0; + + /* Check if the chunk is up */ + if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) { + ret = cio_chunk_up_force(fsf->chunk); + if (ret != CIO_OK) { + flb_error("[fstore] error loading up file chunk"); + return -1; + } + set_down = FLB_TRUE; + } + + ret = cio_meta_read(fsf->chunk, &meta_buf, &meta_size); + if (ret == -1) { + flb_error("[fstore] error reading file chunk metadata"); + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + } + + ret = meta_set(fsf, meta_buf, meta_size); + if (ret == -1) { + flb_free(meta_buf); + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + return -1; + } + + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + return 0; +} + +/* Create a new file */ +struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs, + struct flb_fstore_stream *fs_stream, + char *name, size_t size) +{ + int err; + struct cio_chunk *chunk; + struct flb_fstore_file *fsf; + + fsf = flb_calloc(1, sizeof(struct flb_fstore_file)); + if (!fsf) { + flb_errno(); + return NULL; + } + fsf->stream = fs_stream->stream; + + fsf->name = flb_sds_create(name); + if (!fsf->name) { + flb_error("[fstore] could not create file: %s:%s", + fsf->stream->name, name); + flb_free(fsf); + return NULL; + } + + chunk = cio_chunk_open(fs->cio, fs_stream->stream, name, + CIO_OPEN, size, &err); + if (!chunk) { + flb_error("[fstore] could not create file: %s:%s", + fsf->stream->name, name); + flb_sds_destroy(fsf->name); + flb_free(fsf); + return NULL; + } + + fsf->chunk = chunk; + mk_list_add(&fsf->_head, &fs_stream->files); + + return fsf; +} + +/* Lookup file on stream by using it name */ +struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs, + struct flb_fstore_stream *fs_stream, + char *name, size_t size) +{ + struct mk_list *head; + struct flb_fstore_file *fsf; + + mk_list_foreach(head, &fs_stream->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + if (flb_sds_len(fsf->name) != size) { + continue; + } + + if (strncmp(fsf->name, name, size) == 0) { + return fsf; + } + } + + return NULL; +} + +/* + * Set a file to inactive mode. Inactive means just to remove the reference + * from the list. + */ +int flb_fstore_file_inactive(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + /* close the Chunk I/O reference, but don't delete the real file */ + if (fsf->chunk) { + cio_chunk_close(fsf->chunk, CIO_FALSE); + } + + /* release */ + mk_list_del(&fsf->_head); + flb_sds_destroy(fsf->name); + if (fsf->meta_buf) { + flb_free(fsf->meta_buf); + } + flb_free(fsf); + + return 0; +} + +/* Delete a file (permantent deletion) */ +int flb_fstore_file_delete(struct flb_fstore *fs, + struct flb_fstore_file *fsf) +{ + /* close the Chunk I/O reference, but don't delete it the real file */ + cio_chunk_close(fsf->chunk, CIO_TRUE); + + /* release */ + mk_list_del(&fsf->_head); + if (fsf->meta_buf) { + flb_free(fsf->meta_buf); + } + flb_sds_destroy(fsf->name); + flb_free(fsf); + + return 0; +} + +/* + * Set an output buffer that contains a copy of the file. Note that this buffer + * needs to be freed by the caller (heap memory). + */ +int flb_fstore_file_content_copy(struct flb_fstore *fs, + struct flb_fstore_file *fsf, + void **out_buf, size_t *out_size) +{ + int ret; + + ret = cio_chunk_get_content_copy(fsf->chunk, out_buf, out_size); + if (ret == CIO_OK) { + return 0; + } + + return -1; +} + +/* Append data to an existing file */ +int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size) +{ + int ret; + int set_down = FLB_FALSE; + + /* Check if the chunk is up */ + if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) { + ret = cio_chunk_up_force(fsf->chunk); + if (ret != CIO_OK) { + flb_error("[fstore] error loading up file chunk"); + return -1; + } + set_down = FLB_TRUE; + } + + ret = cio_chunk_write(fsf->chunk, data, size); + if (ret != CIO_OK) { + flb_error("[fstore] could not write data to file %s", fsf->name); + + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + + return -1; + } + + if (set_down == FLB_TRUE) { + cio_chunk_down(fsf->chunk); + } + + return 0; +} + +/* + * Create a new stream, if it already exists, it returns the stream + * reference. + */ +struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs, + char *stream_name) +{ + flb_sds_t path = NULL; + struct mk_list *head; + struct cio_ctx *ctx = NULL; + struct cio_stream *stream = NULL; + struct flb_fstore_stream *fs_stream = NULL; + + ctx = fs->cio; + + /* Check if the stream already exists in Chunk I/O */ + mk_list_foreach(head, &ctx->streams) { + stream = mk_list_entry(head, struct cio_stream, _head); + if (strcmp(stream->name, stream_name) == 0) { + break; + } + stream = NULL; + } + + /* If the stream exists, check if we have a fstore_stream reference */ + if (stream) { + mk_list_foreach(head, &fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream->stream == stream) { + break; + } + fs_stream = NULL; + } + + /* The stream was found, just return the reference */ + if (fs_stream) { + return fs_stream; + } + } + + if (!stream) { + /* create file-system based stream */ + stream = cio_stream_create(fs->cio, stream_name, fs->store_type); + if (!stream) { + flb_error("[fstore] cannot create stream %s", stream_name); + return NULL; + } + } + + fs_stream = flb_calloc(1, sizeof(struct flb_fstore_stream)); + if (!fs_stream) { + flb_errno(); + cio_stream_destroy(stream); + return NULL; + } + fs_stream->stream = stream; + + path = flb_sds_create_size(256); + if (!path) { + cio_stream_destroy(stream); + flb_free(fs_stream); + return NULL; + } + path = flb_sds_printf(&path, "%s/%s", fs->root_path, stream->name); + fs_stream->path = path; + fs_stream->name = stream->name; + + mk_list_init(&fs_stream->files); + mk_list_add(&fs_stream->_head, &fs->streams); + + return fs_stream; +} + +void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete) +{ + if (delete == FLB_TRUE) { + cio_stream_delete(stream->stream); + } + + /* + * FYI: in this function we just release the fstore_stream context, the + * underlaying cio_stream is closed when the main Chunk I/O is destroyed. + */ + mk_list_del(&stream->_head); + flb_sds_destroy(stream->path); + flb_free(stream); +} + +static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_stream, + struct cio_stream *stream) +{ + struct mk_list *head; + struct cio_chunk *chunk; + struct flb_fstore_file *fsf; + + mk_list_foreach(head, &stream->chunks) { + chunk = mk_list_entry(head, struct cio_chunk, _head); + + fsf = flb_calloc(1, sizeof(struct flb_fstore_file)); + if (!fsf) { + flb_errno(); + return -1; + } + fsf->name = flb_sds_create(chunk->name); + if (!fsf->name) { + flb_free(fsf); + flb_error("[fstore] could not create file: %s:%s", + stream->name, chunk->name); + return -1; + } + + fsf->chunk = chunk; + + /* load metadata */ + flb_fstore_file_meta_get(ctx, fsf); + mk_list_add(&fsf->_head, &fs_stream->files); + } + + return 0; +} + +static int load_references(struct flb_fstore *fs) +{ + int ret; + struct mk_list *head; + struct cio_stream *stream; + struct flb_fstore_stream *fs_stream; + + mk_list_foreach(head, &fs->cio->streams) { + stream = mk_list_entry(head, struct cio_stream, _head); + fs_stream = flb_fstore_stream_create(fs, stream->name); + if (!fs_stream) { + flb_error("[fstore] error loading stream reference: %s", + stream->name); + return -1; + } + + /* Map chunks */ + ret = map_chunks(fs, fs_stream, stream); + if (ret == -1) { + return -1; + } + } + + return 0; +} + +struct flb_fstore *flb_fstore_create(char *path, int store_type) +{ + int ret; + int flags; + struct cio_ctx *cio; + struct flb_fstore *fs; + struct cio_options opts = {0}; + flags = CIO_OPEN; + + /* Create Chunk I/O context */ + cio_options_init(&opts); + + opts.root_path = path; + opts.log_cb = log_cb; + opts.flags = flags; + opts.log_level = CIO_LOG_INFO; + + cio = cio_create(&opts); + if (!cio) { + flb_error("[fstore] error initializing on path '%s'", path); + return NULL; + } + + /* Load content from the file system if any */ + ret = cio_load(cio, NULL); + if (ret == -1) { + flb_error("[fstore] error scanning root path content: %s", path); + cio_destroy(cio); + return NULL; + } + + fs = flb_calloc(1, sizeof(struct flb_fstore)); + if (!fs) { + flb_errno(); + cio_destroy(cio); + return NULL; + } + fs->cio = cio; + fs->root_path = cio->options.root_path; + fs->store_type = store_type; + mk_list_init(&fs->streams); + + /* Map Chunk I/O streams and chunks into fstore context */ + load_references(fs); + + return fs; +} + +int flb_fstore_destroy(struct flb_fstore *fs) +{ + int files = 0; + int delete; + struct mk_list *head; + struct mk_list *f_head; + struct mk_list *tmp; + struct mk_list *f_tmp; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + + mk_list_foreach_safe(head, tmp, &fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + + /* delete file references */ + files = 0; + mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + flb_fstore_file_inactive(fs, fsf); + files++; + } + + if (files == 0) { + delete = FLB_TRUE; + } + else { + delete = FLB_FALSE; + } + + flb_fstore_stream_destroy(fs_stream, delete); + } + + if (fs->cio) { + cio_destroy(fs->cio); + } + flb_free(fs); + return 0; +} + +void flb_fstore_dump(struct flb_fstore *fs) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + + printf("===== FSTORE DUMP =====\n"); + mk_list_foreach(head, &fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + printf("- stream: %s\n", fs_stream->name); + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + printf(" %s/%s\n", fsf->stream->name, fsf->name); + } + } + printf("\n"); +} |