diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/chunkio/src | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/chunkio/src')
-rw-r--r-- | fluent-bit/lib/chunkio/src/CMakeLists.txt | 53 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/chunkio.c | 369 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_chunk.c | 642 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_error.c | 59 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_file.c | 1344 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_file_unix.c | 570 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_file_win32.c | 549 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_log.c | 87 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_memfs.c | 156 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_meta.c | 180 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_os.c | 134 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_scan.c | 190 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_sha1.c | 68 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_stats.c | 79 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_stream.c | 276 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/cio_utils.c | 258 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/win32/dirent.c | 135 | ||||
-rw-r--r-- | fluent-bit/lib/chunkio/src/win32/dirent.h | 59 |
18 files changed, 5208 insertions, 0 deletions
diff --git a/fluent-bit/lib/chunkio/src/CMakeLists.txt b/fluent-bit/lib/chunkio/src/CMakeLists.txt new file mode 100644 index 00000000..9e666381 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/CMakeLists.txt @@ -0,0 +1,53 @@ +set(src + cio_os.c + cio_log.c + cio_file.c + cio_memfs.c + cio_chunk.c + cio_meta.c + cio_scan.c + cio_utils.c + cio_stream.c + cio_stats.c + cio_error.c + chunkio.c + ) + +set(libs cio-crc32) + +if(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + set(src + ${src} + cio_file_win32.c + win32/dirent.c + ) + set(libs + ${libs} + Shell32.lib + Shlwapi.lib) +else() + set(src + ${src} + cio_file_unix.c + ) +endif() + +if(CIO_LIB_STATIC) + add_library(chunkio-static STATIC ${src}) + target_link_libraries(chunkio-static ${libs}) + if(CIO_SANITIZE_ADDRESS) + add_sanitizers(chunkio-static) + endif() +endif() + +if (CIO_LIB_SHARED) + add_library(chunkio-shared SHARED ${src}) + target_link_libraries(chunkio-static ${libs}) + if(CIO_SANITIZE_ADDRESS) + add_sanitizers(chunkio-shared) + endif() +endif() + +if (NOT CIO_LIB_STATIC AND NOT CIO_LIB_SHARED) + message(FATAL_ERROR "What are you doing?, you should build something") +endif() diff --git a/fluent-bit/lib/chunkio/src/chunkio.c b/fluent-bit/lib/chunkio/src/chunkio.c new file mode 100644 index 00000000..a69325cf --- /dev/null +++ b/fluent-bit/lib/chunkio/src/chunkio.c @@ -0,0 +1,369 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <chunkio/chunkio.h> +#include <chunkio/chunkio_compat.h> +#include <chunkio/cio_os.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_scan.h> +#include <chunkio/cio_utils.h> + +#include <monkey/mk_core/mk_list.h> + +/* + * Validate if root_path exists, if don't, create it, otherwise + * check if we have write access to it. + */ +static int check_root_path(struct cio_ctx *ctx, const char *root_path) +{ + int ret; + int len; + + if (!root_path) { + return -1; + } + + len = strlen(root_path); + if (len <= 0) { + return -1; + } + + ret = cio_os_isdir(root_path); + if (ret == -1) { + /* Try to create the path */ + ret = cio_os_mkpath(root_path, 0755); + if (ret == -1) { + return -1; + } + cio_log_info(ctx, "created root path %s", root_path); + return 0; + } + + /* Directory already exists, check write access */ + return access(root_path, W_OK); +} + +void cio_options_init(struct cio_options *options) +{ + memset(options, 0, sizeof(struct cio_options)); + + options->initialized = CIO_INITIALIZED; + + options->root_path = NULL; + options->user = NULL; + options->group = NULL; + options->chmod = NULL; + options->log_cb = NULL; + options->log_level = CIO_LOG_INFO; + options->flags = CIO_OPEN_RW; + options->realloc_size_hint = CIO_DISABLE_REALLOC_HINT; +} + +struct cio_ctx *cio_create(struct cio_options *options) +{ + int ret; + struct cio_ctx *ctx; + struct cio_options default_options; + + if (options == NULL) { + cio_options_init(&default_options); + options = &default_options; + } + else { + if (options->initialized != CIO_INITIALIZED) { + /* the caller 'must' call cio_options_init() or pass NULL before creating a context */ + fprintf(stderr, "[cio] 'options' has not been initialized properly\n"); + return NULL; + } + } + + /* sanitize chunk open flags */ + if (!(options->flags & CIO_OPEN_RW) && !(options->flags & CIO_OPEN_RD)) { + options->flags |= CIO_OPEN_RW; + } + + if (options->log_level < CIO_LOG_ERROR || + options->log_level > CIO_LOG_TRACE) { + fprintf(stderr, "[cio] invalid log level, aborting\n"); + return NULL; + } +#ifndef CIO_HAVE_BACKEND_FILESYSTEM + if (root_path) { + fprintf(stderr, "[cio] file system backend not supported\n"); + return NULL; + } +#endif + + /* Create context */ + ctx = calloc(1, sizeof(struct cio_ctx)); + if (!ctx) { + perror("calloc"); + return NULL; + } + mk_list_init(&ctx->streams); + ctx->page_size = cio_getpagesize(); + ctx->max_chunks_up = CIO_MAX_CHUNKS_UP; + ctx->options.flags = options->flags; + ctx->realloc_size_hint = CIO_DISABLE_REALLOC_HINT; + + if (options->user != NULL) { + ctx->options.user = strdup(options->user); + } + + if (options->group != NULL) { + ctx->options.group = strdup(options->group); + } + + if (options->chmod != NULL) { + ctx->options.chmod = strdup(options->chmod); + } + + /* Counters */ + ctx->total_chunks = 0; + ctx->total_chunks_up = 0; + + /* Logging */ + cio_set_log_callback(ctx, options->log_cb); + cio_set_log_level(ctx, options->log_level); + + /* Check or initialize file system root path */ + if (options->root_path) { + ret = check_root_path(ctx, options->root_path); + if (ret == -1) { + cio_log_error(ctx, + "[chunkio] cannot initialize root path %s\n", + options->root_path); + free(ctx); + return NULL; + } + + ctx->options.root_path = strdup(options->root_path); + } + else { + ctx->options.root_path = NULL; + } + + if (ctx->options.user != NULL) { + ret = cio_file_lookup_user(ctx->options.user, &ctx->processed_user); + + if (ret != CIO_OK) { + cio_destroy(ctx); + + return NULL; + } + } + else { + ctx->processed_user = NULL; + } + + if (ctx->options.group != NULL) { + ret = cio_file_lookup_group(ctx->options.group, &ctx->processed_group); + + if (ret != CIO_OK) { + cio_destroy(ctx); + + return NULL; + } + } + else { + ctx->processed_group = NULL; + } + + if (options->realloc_size_hint > 0) { + ret = cio_set_realloc_size_hint(ctx, options->realloc_size_hint); + if (ret == -1) { + cio_log_error(ctx, + "[chunkio] cannot initialize with realloc size hint %d\n", + options->realloc_size_hint); + cio_destroy(ctx); + + return NULL; + } + } + + return ctx; +} + +int cio_load(struct cio_ctx *ctx, char *chunk_extension) +{ + int ret; + + if (ctx->options.root_path) { + ret = cio_scan_streams(ctx, chunk_extension); + return ret; + } + + return 0; +} + +static int qsort_stream(struct cio_stream *stream, + int (*compar)(const void *, const void *)) +{ + int i = 0; + int items; + struct mk_list *tmp; + struct mk_list *head; + struct cio_chunk **arr; + struct cio_chunk *chunk; + + items = mk_list_size(&stream->chunks); + if (items == 0) { + return 0; + } + + arr = malloc(sizeof(struct cio_chunk *) * items); + if (!arr) { + perror("malloc"); + return -1; + } + + /* map chunks to the array and and unlink them */ + mk_list_foreach_safe(head, tmp, &stream->chunks) { + chunk = mk_list_entry(head, struct cio_chunk, _head); + arr[i++] = chunk; + mk_list_del(&chunk->_head); + } + + /* sort the chunks, just trust in 'compar' external function */ + qsort(arr, items, sizeof(struct cio_chunk *), compar); + + /* link the chunks in the proper order back to the list head */ + for (i = 0; i < items; i++) { + chunk = arr[i]; + mk_list_add(&chunk->_head, &stream->chunks); + } + + free(arr); + return 0; +} + +/* + * Sort chunks using the 'compar' callback function. This is pretty much a + * wrapper over qsort(3). The sort is done inside every stream content. + * + * Use this function after cio_load() only. + */ +int cio_qsort(struct cio_ctx *ctx, int (*compar)(const void *, const void *)) +{ + struct mk_list *head; + struct cio_stream *stream; + + mk_list_foreach(head, &ctx->streams) { + stream = mk_list_entry(head, struct cio_stream, _head); + qsort_stream(stream, compar); + } + + return 0; +} + +void cio_destroy(struct cio_ctx *ctx) +{ + if (!ctx) { + return; + } + + cio_stream_destroy_all(ctx); + + if (ctx->options.user != NULL) { + free(ctx->options.user); + } + + if (ctx->options.group != NULL) { + free(ctx->options.group); + } + + if (ctx->options.chmod != NULL) { + free(ctx->options.chmod); + } + + if (ctx->processed_user != NULL) { + free(ctx->processed_user); + } + + if (ctx->processed_group != NULL) { + free(ctx->processed_group); + } + + if (ctx->options.root_path != NULL) { + free(ctx->options.root_path); + } + + free(ctx); +} + +void cio_set_log_callback(struct cio_ctx *ctx, void (*log_cb)) +{ + ctx->options.log_cb = log_cb; +} + +int cio_set_log_level(struct cio_ctx *ctx, int level) +{ + if (level < CIO_LOG_ERROR || level > CIO_LOG_TRACE) { + return -1; + } + + ctx->options.log_level = level; + return 0; +} + +int cio_set_max_chunks_up(struct cio_ctx *ctx, int n) +{ + if (n < 1) { + return -1; + } + + ctx->max_chunks_up = n; + return 0; +} + +int cio_set_realloc_size_hint(struct cio_ctx *ctx, size_t realloc_size_hint) +{ + if (realloc_size_hint < CIO_REALLOC_HINT_MIN) { + cio_log_error(ctx, + "[chunkio] cannot specify less than %zu bytes\n", + CIO_REALLOC_HINT_MIN); + return -1; + } + else if (realloc_size_hint > CIO_REALLOC_HINT_MAX) { + cio_log_error(ctx, + "[chunkio] cannot specify more than %zu bytes\n", + CIO_REALLOC_HINT_MAX); + return -1; + } + + ctx->realloc_size_hint = realloc_size_hint; + + return 0; +} + +void cio_enable_file_trimming(struct cio_ctx *ctx) +{ + ctx->options.flags |= CIO_TRIM_FILES; +} + +void cio_disable_file_trimming(struct cio_ctx *ctx) +{ + ctx->options.flags &= ~CIO_TRIM_FILES; +}
\ No newline at end of file diff --git a/fluent-bit/lib/chunkio/src/cio_chunk.c b/fluent-bit/lib/chunkio/src/cio_chunk.c new file mode 100644 index 00000000..7917cd2a --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_chunk.c @@ -0,0 +1,642 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_version.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_memfs.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_error.h> + +#include <string.h> + +struct cio_chunk *cio_chunk_open(struct cio_ctx *ctx, struct cio_stream *st, + const char *name, int flags, size_t size, + int *err) +{ + int len; + void *backend = NULL; + struct cio_chunk *ch; + + if (!st) { + cio_log_error(ctx, "[cio chunk] invalid stream"); + return NULL; + } + + if (!name) { + cio_log_error(ctx, "[cio chunk] invalid file name"); + return NULL; + } + + len = strlen(name); + if (len == 0) { + cio_log_error(ctx, "[cio chunk] invalid file name"); + return NULL; + } +#ifndef CIO_HAVE_BACKEND_FILESYSTEM + if (st->type == CIO_STORE_FS) { + cio_log_error(ctx, "[cio chunk] file system backend not supported"); + return NULL; + } +#endif + + /* allocate chunk context */ + ch = malloc(sizeof(struct cio_chunk)); + if (!ch) { + cio_errno(); + return NULL; + } + ch->name = strdup(name); + ch->ctx = ctx; + ch->st = st; + ch->lock = CIO_FALSE; + ch->tx_active = CIO_FALSE; + ch->tx_crc = 0; + ch->tx_content_length = 0; + ch->backend = NULL; + + mk_list_add(&ch->_head, &st->chunks); + + cio_error_reset(ch); + + /* create backend context */ + if (st->type == CIO_STORE_FS) { + backend = cio_file_open(ctx, st, ch, flags, size, err); + } + else if (st->type == CIO_STORE_MEM) { + *err = CIO_OK; + backend = cio_memfs_open(ctx, st, ch, flags, size); + } + + if (!backend) { + mk_list_del(&ch->_head); + free(ch->name); + free(ch); + return NULL; + } + + ch->backend = backend; + + /* Adjust counter */ + cio_chunk_counter_total_add(ctx); + + /* Link the chunk state to the proper stream list */ + if (cio_chunk_is_up(ch) == CIO_TRUE) { + mk_list_add(&ch->_state_head, &st->chunks_up); + } + else { + mk_list_add(&ch->_state_head, &st->chunks_down); + } + + return ch; +} + +void cio_chunk_close(struct cio_chunk *ch, int delete) +{ + int type; + struct cio_ctx *ctx; + + if (!ch) { + return; + } + + cio_error_reset(ch); + + ctx = ch->ctx; + type = ch->st->type; + if (type == CIO_STORE_MEM) { + cio_memfs_close(ch); + } + else if (type == CIO_STORE_FS) { + cio_file_close(ch, delete); + } + + mk_list_del(&ch->_head); + mk_list_del(&ch->_state_head); + free(ch->name); + free(ch); + + /* Adjust counter */ + cio_chunk_counter_total_sub(ctx); +} + +int cio_chunk_delete(struct cio_ctx *ctx, struct cio_stream *st, const char *name) +{ + int result; + + if (st == NULL) { + cio_log_error(ctx, "[cio chunk] invalid stream"); + + return CIO_ERROR; + } + + if (name == NULL) { + cio_log_error(ctx, "[cio chunk] invalid file name"); + + return CIO_ERROR; + } + + if (strlen(name) == 0) { + cio_log_error(ctx, "[cio chunk] invalid file name"); + + return CIO_ERROR; + } + +#ifndef CIO_HAVE_BACKEND_FILESYSTEM + if (st->type == CIO_STORE_FS) { + cio_log_error(ctx, "[cio chunk] file system backend not supported"); + + return CIO_ERROR; + } +#endif + + if (st->type == CIO_STORE_FS) { + result = cio_file_delete(ctx, st, name); + } + else { + result = CIO_ERROR; + } + + return result; +} + +/* + * Write at a specific offset of the content area. Offset must be >= 0 and + * less than current data length. + */ +int cio_chunk_write_at(struct cio_chunk *ch, off_t offset, + const void *buf, size_t count) +{ + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + mf->buf_len = offset; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + cf->data_size = offset; + cf->crc_reset = CIO_TRUE; + } + + /* + * By default backends (fs, mem) appends data after the it last position, + * so we just adjust the content size to the given offset. + */ + return cio_chunk_write(ch, buf, count); +} + +int cio_chunk_write(struct cio_chunk *ch, const void *buf, size_t count) +{ + int ret = 0; + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + ret = cio_memfs_write(ch, buf, count); + } + else if (type == CIO_STORE_FS) { + ret = cio_file_write(ch, buf, count); + } + + return ret; +} + +int cio_chunk_sync(struct cio_chunk *ch) +{ + int ret = 0; + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_FS) { + ret = cio_file_sync(ch); + } + + return ret; +} + +int cio_chunk_get_content(struct cio_chunk *ch, char **buf, size_t *size) +{ + int ret = 0; + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + *size = mf->buf_len; + *buf = mf->buf_data; + return ret; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + ret = cio_file_read_prepare(ch->ctx, ch); + if (ret != CIO_OK) { + return ret; + } + *size = cf->data_size; + *buf = cio_file_st_get_content(cf->map); + return ret; + } + + return CIO_ERROR; +} + +/* Using the content of the chunk, generate a copy using the heap */ +int cio_chunk_get_content_copy(struct cio_chunk *ch, + void **out_buf, size_t *out_size) +{ + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + return cio_memfs_content_copy(ch, out_buf, out_size); + } + else if (type == CIO_STORE_FS) { + return cio_file_content_copy(ch, out_buf, out_size); + } + + return CIO_ERROR; +} + +size_t cio_chunk_get_content_end_pos(struct cio_chunk *ch) +{ + int type; + off_t pos = 0; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + pos = (off_t) (mf->buf_data + mf->buf_len); + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + pos = (off_t) (cio_file_st_get_content(cf->map) + cf->data_size); + } + + return pos; +} + +ssize_t cio_chunk_get_content_size(struct cio_chunk *ch) +{ + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + return mf->buf_len; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + return cf->data_size; + } + + return -1; +} + +ssize_t cio_chunk_get_real_size(struct cio_chunk *ch) +{ + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + return mf->buf_len; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + + /* If the file is not open we need to explicitly get its size */ + if (cf->fs_size == 0) { + return cio_file_real_size(cf); + } + + return cf->fs_size; + } + + return -1; +} + +void cio_chunk_close_stream(struct cio_stream *st) +{ + struct mk_list *tmp; + struct mk_list *head; + struct cio_chunk *ch; + + mk_list_foreach_safe(head, tmp, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + cio_chunk_close(ch, CIO_FALSE); + } +} + +char *cio_chunk_hash(struct cio_chunk *ch) +{ + if (ch->st->type == CIO_STORE_FS) { + return cio_file_hash(ch->backend); + } + + return NULL; +} + +int cio_chunk_lock(struct cio_chunk *ch) +{ + cio_error_reset(ch); + + if (ch->lock == CIO_TRUE) { + return CIO_ERROR; + } + + ch->lock = CIO_TRUE; + + if (cio_chunk_is_up(ch) == CIO_TRUE) { + return cio_chunk_sync(ch); + } + + return CIO_OK; +} + +int cio_chunk_unlock(struct cio_chunk *ch) +{ + cio_error_reset(ch); + + if (ch->lock == CIO_FALSE) { + return CIO_ERROR; + } + + ch->lock = CIO_FALSE; + return CIO_OK; +} + +int cio_chunk_is_locked(struct cio_chunk *ch) +{ + return ch->lock; +} + +/* + * Start a transaction context: it keep a state of the current calculated + * CRC32 (if enabled) and the current number of bytes in the content + * area. + */ +int cio_chunk_tx_begin(struct cio_chunk *ch) +{ + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + if (cio_chunk_is_locked(ch)) { + return CIO_RETRY; + } + + if (ch->tx_active == CIO_TRUE) { + return CIO_OK; + } + + ch->tx_active = CIO_TRUE; + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + ch->tx_crc = mf->crc_cur; + ch->tx_content_length = mf->buf_len; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + ch->tx_crc = cf->crc_cur; + ch->tx_content_length = cf->data_size; + } + + return CIO_OK; +} + +/* + * Commit transaction changes, reset transaction context and leave new + * changes in place. + */ +int cio_chunk_tx_commit(struct cio_chunk *ch) +{ + int ret; + + cio_error_reset(ch); + + ret = cio_chunk_sync(ch); + if (ret == -1) { + return CIO_ERROR; + } + + ch->tx_active = CIO_FALSE; + return CIO_OK; +} + +/* + * Drop changes done since a transaction was initiated */ +int cio_chunk_tx_rollback(struct cio_chunk *ch) +{ + int type; + struct cio_memfs *mf; + struct cio_file *cf; + + cio_error_reset(ch); + + if (ch->tx_active == CIO_FALSE) { + return -1; + } + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + mf = ch->backend; + mf->crc_cur = ch->tx_crc; + mf->buf_len = ch->tx_content_length; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + cf->crc_cur = ch->tx_crc; + cf->data_size = ch->tx_content_length; + } + + ch->tx_active = CIO_FALSE; + return CIO_OK; +} + +/* + * Determinate if a Chunk content is available in memory for I/O operations. For + * Memory backend this is always true, for Filesystem backend it checks if the + * memory map exists and file descriptor is open. + */ +int cio_chunk_is_up(struct cio_chunk *ch) +{ + int type; + struct cio_file *cf; + + type = ch->st->type; + if (type == CIO_STORE_MEM) { + return CIO_TRUE; + } + else if (type == CIO_STORE_FS) { + cf = ch->backend; + return cio_file_is_up(ch, cf); + } + + return CIO_FALSE; +} + +int cio_chunk_is_file(struct cio_chunk *ch) +{ + int type; + + type = ch->st->type; + if (type == CIO_STORE_FS) { + return CIO_TRUE; + } + + return CIO_FALSE; +} + +static inline void chunk_state_sync(struct cio_chunk *ch) +{ + struct cio_stream *st; + + if (!ch) { + return; + } + + mk_list_del(&ch->_state_head); + st = ch->st; + if (cio_chunk_is_up(ch) == CIO_TRUE) { + mk_list_add(&ch->_state_head, &st->chunks_up); + } + else { + mk_list_add(&ch->_state_head, &st->chunks_down); + } +} + +int cio_chunk_down(struct cio_chunk *ch) +{ + int ret; + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_FS) { + ret = cio_file_down(ch); + chunk_state_sync(ch); + return ret; + } + + return CIO_OK; +} + +int cio_chunk_up(struct cio_chunk *ch) +{ + int ret; + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_FS) { + ret = cio_file_up(ch); + chunk_state_sync(ch); + return ret; + } + + return CIO_OK; +} + +int cio_chunk_up_force(struct cio_chunk *ch) +{ + int ret; + int type; + + cio_error_reset(ch); + + type = ch->st->type; + if (type == CIO_STORE_FS) { + ret = cio_file_up_force(ch); + chunk_state_sync(ch); + return ret; + } + + return CIO_OK; +} + +char *cio_version() +{ + return CIO_VERSION_STR; +} + +/* + * Counters API + */ + +/* Increase the number of total chunks registered (+1) */ +size_t cio_chunk_counter_total_add(struct cio_ctx *ctx) +{ + ctx->total_chunks++; + return ctx->total_chunks; +} + +/* Decrease the total number of chunks (-1) */ +size_t cio_chunk_counter_total_sub(struct cio_ctx *ctx) +{ + ctx->total_chunks--; + return ctx->total_chunks; +} + +/* Increase the number of total chunks up in memory (+1) */ +size_t cio_chunk_counter_total_up_add(struct cio_ctx *ctx) +{ + ctx->total_chunks_up++; + return ctx->total_chunks_up; +} + +/* Decrease the total number of chunks up in memory (-1) */ +size_t cio_chunk_counter_total_up_sub(struct cio_ctx *ctx) +{ + ctx->total_chunks_up--; + return ctx->total_chunks_up; +} diff --git a/fluent-bit/lib/chunkio/src/cio_error.c b/fluent-bit/lib/chunkio/src/cio_error.c new file mode 100644 index 00000000..9049b0a3 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_error.c @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2021 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <chunkio/chunkio.h> +#include <chunkio/cio_error.h> + +char *cio_error_get_str(struct cio_chunk *ch) +{ + int err = cio_error_get(ch); + + switch (err) { + case CIO_ERR_BAD_CHECKSUM: + return "bad checksum"; + case CIO_ERR_BAD_LAYOUT: + return "bad layout or invalid header"; + case CIO_ERR_PERMISSION: + return "permission error"; + default: + return "no error has been specified"; + } +} + +/* Return the current error number from a chunk */ +int cio_error_get(struct cio_chunk *ch) +{ + return ch->error_n; +} + +/* Set an error number in the chunk */ +void cio_error_set(struct cio_chunk *ch, int status) +{ + ch->error_n = status; + + if (ch->ctx != NULL) { + ch->ctx->last_chunk_error = status; + } +} + +/* Reset the error number in a chunk */ +void cio_error_reset(struct cio_chunk *ch) +{ + ch->error_n = 0; +} diff --git a/fluent-bit/lib/chunkio/src/cio_file.c b/fluent-bit/lib/chunkio/src/cio_file.c new file mode 100644 index 00000000..019baa89 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_file.c @@ -0,0 +1,1344 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <limits.h> + +#include <chunkio/chunkio.h> +#include <chunkio/chunkio_compat.h> +#include <chunkio/cio_crc32.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_file_native.h> +#include <chunkio/cio_file_st.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_error.h> +#include <chunkio/cio_utils.h> + +size_t scio_file_page_size = 0; + +char cio_file_init_bytes[] = { + /* file type (2 bytes) */ + CIO_FILE_ID_00, CIO_FILE_ID_01, + + /* crc32 (4 bytes) in network byte order */ + 0xff, 0x12, 0xd9, 0x41, + + /* padding bytes (we have 16 extra bytes) */ + 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, + + /* metadata length (2 bytes) */ + 0x00, 0x00 +}; + +#define ROUND_UP(N, S) ((((N) + (S) - 1) / (S)) * (S)) + + +/* Calculate content checksum in a variable */ +void cio_file_calculate_checksum(struct cio_file *cf, crc_t *out) +{ + crc_t val; + size_t len; + ssize_t content_length; + unsigned char *in_data; + + if (cf->fs_size == 0) { + cio_file_update_size(cf); + } + + /* Metadata length header + metadata length + content length */ + len = 2; + len += cio_file_st_get_meta_len(cf->map); + + content_length = cio_file_st_get_content_len(cf->map, + cf->fs_size, + cf->page_size); + + if (content_length > 0) { + len += content_length; + } + + in_data = (unsigned char *) cf->map + CIO_FILE_CONTENT_OFFSET; + val = cio_crc32_update(cf->crc_cur, in_data, len); + *out = val; +} + +/* Update crc32 checksum into the memory map */ +static void update_checksum(struct cio_file *cf, + unsigned char *data, size_t len) +{ + crc_t crc; + crc_t tmp; + + if (cf->crc_reset) { + cf->crc_cur = cio_crc32_init(); + cio_file_calculate_checksum(cf, &tmp); + cf->crc_cur = tmp; + cf->crc_reset = CIO_FALSE; + } + + crc = cio_crc32_update(cf->crc_cur, data, len); + memcpy(cf->map + 2, &crc, sizeof(crc)); + cf->crc_cur = crc; +} + +/* Finalize CRC32 context and update the memory map */ +static void finalize_checksum(struct cio_file *cf) +{ + crc_t crc; + + crc = cio_crc32_finalize(cf->crc_cur); + crc = htonl(crc); + + memcpy(cf->map + 2, &crc, sizeof(crc)); +} + +/* + * adjust_layout: if metadata has changed, we need to adjust the content + * data and reference pointers. + */ +static int adjust_layout(struct cio_chunk *ch, + struct cio_file *cf, size_t meta_size) +{ + cio_file_st_set_meta_len(cf->map, (uint16_t) meta_size); + + /* Update checksum */ + if (ch->ctx->options.flags & CIO_CHECKSUM) { + /* reset current crc since we are calculating from zero */ + cf->crc_cur = cio_crc32_init(); + cio_file_calculate_checksum(cf, &cf->crc_cur); + } + + /* Sync changes to disk */ + cf->synced = CIO_FALSE; + + return 0; +} + +/* Initialize Chunk header & structure */ +static void write_init_header(struct cio_chunk *ch, struct cio_file *cf) +{ + memcpy(cf->map, cio_file_init_bytes, sizeof(cio_file_init_bytes)); + + /* If no checksum is enabled, reset the initial crc32 bytes */ + if (!(ch->ctx->options.flags & CIO_CHECKSUM)) { + cf->map[2] = 0; + cf->map[3] = 0; + cf->map[4] = 0; + cf->map[5] = 0; + } + + cio_file_st_set_content_len(cf->map, 0); +} + +/* Return the available size in the file map to write data */ +static size_t get_available_size(struct cio_file *cf, int *meta_len) +{ + size_t av; + int metadata_len; + + /* Get metadata length */ + metadata_len = cio_file_st_get_meta_len(cf->map); + + av = cf->alloc_size; + av -= CIO_FILE_HEADER_MIN; + av -= metadata_len; + av -= cf->data_size; + + *meta_len = metadata_len; + + return av; +} + +/* + * For the recently opened or created file, check the structure format + * and validate relevant fields. + */ +static int cio_file_format_check(struct cio_chunk *ch, + struct cio_file *cf, int flags) +{ + size_t metadata_length; + ssize_t content_length; + ssize_t logical_length; + unsigned char *p; + crc_t crc_check; + crc_t crc; + + (void) flags; + + p = (unsigned char *) cf->map; + + /* If the file is empty, put the structure on it */ + if (cf->fs_size == 0) { + /* check we have write permissions */ + if ((cf->flags & CIO_OPEN) == 0) { + cio_log_warn(ch->ctx, + "[cio file] cannot initialize chunk (read-only)"); + cio_error_set(ch, CIO_ERR_PERMISSION); + + return -1; + } + + /* at least we need 24 bytes as allocated space */ + if (cf->alloc_size < CIO_FILE_HEADER_MIN) { + cio_log_warn(ch->ctx, "[cio file] cannot initialize chunk"); + cio_error_set(ch, CIO_ERR_BAD_LAYOUT); + + return -1; + } + + /* Initialize init bytes */ + write_init_header(ch, cf); + + /* Write checksum in context (note: crc32 not finalized) */ + if (ch->ctx->options.flags & CIO_CHECKSUM) { + cio_file_calculate_checksum(cf, &cf->crc_cur); + } + } + else { + /* Check first two bytes */ + if (p[0] != CIO_FILE_ID_00 || p[1] != CIO_FILE_ID_01) { + cio_log_debug(ch->ctx, "[cio file] invalid header at %s", + ch->name); + cio_error_set(ch, CIO_ERR_BAD_LAYOUT); + + return -1; + } + + /* Expected / logical file size verification */ + content_length = cio_file_st_get_content_len(cf->map, + cf->fs_size, + cf->page_size); + + if (content_length == -1) { + cio_log_debug(ch->ctx, "[cio file] truncated header (%zu / %zu) %s", + cf->fs_size, CIO_FILE_HEADER_MIN, ch->name); + cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); + + return -1; + } + + metadata_length = cio_file_st_get_meta_len(cf->map); + + logical_length = CIO_FILE_HEADER_MIN + + metadata_length + + content_length; + + if (logical_length > cf->fs_size) { + cio_log_debug(ch->ctx, "[cio file] truncated file (%zd / %zd) %s", + cf->fs_size, logical_length, ch->name); + cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); + + return -1; + } + + /* Checksum */ + if (ch->ctx->options.flags & CIO_CHECKSUM) { + /* Initialize CRC variable */ + cf->crc_cur = cio_crc32_init(); + + /* Get checksum stored in the mmap */ + p = (unsigned char *) cio_file_st_get_hash(cf->map); + + /* Calculate content checksum */ + cio_file_calculate_checksum(cf, &crc); + + /* Compare */ + crc_check = cio_crc32_finalize(crc); + crc_check = htonl(crc_check); + + if (memcmp(p, &crc_check, sizeof(crc_check)) != 0) { + cio_log_info(ch->ctx, "[cio file] invalid crc32 at %s/%s", + ch->name, cf->path); + cio_error_set(ch, CIO_ERR_BAD_CHECKSUM); + + return -1; + } + + cf->crc_cur = crc; + } + } + + return 0; +} + +/* + * Unmap the memory for the opened file in question. It make sure + * to sync changes to disk first. + */ +static int munmap_file(struct cio_ctx *ctx, struct cio_chunk *ch) +{ + int ret; + struct cio_file *cf; + + cf = (struct cio_file *) ch->backend; + + if (!cf) { + return -1; + } + + /* File not mapped */ + if (cf->map == NULL) { + return -1; + } + + /* Sync pending changes to disk */ + if (cf->synced == CIO_FALSE) { + ret = cio_file_sync(ch); + if (ret == -1) { + cio_log_error(ch->ctx, + "[cio file] error syncing file at " + "%s:%s", ch->st->name, ch->name); + } + } + + /* Unmap file */ + cio_file_native_unmap(cf); + + cf->data_size = 0; + cf->alloc_size = 0; + + /* Adjust counters */ + cio_chunk_counter_total_up_sub(ctx); + + return 0; +} + +/* + * This function creates the memory map for the open file descriptor plus + * setup the chunk structure reference. + */ +static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) +{ + ssize_t content_size; + size_t fs_size; + int ret; + struct cio_file *cf; + + cf = (struct cio_file *) ch->backend; + + if (cf->map != NULL) { + return CIO_OK; + } + + /* + * 'size' value represents the value of a previous fstat(2) set by a previous + * caller. If the value is greater than zero, just use it, otherwise do a new + * fstat(2) of the file descriptor. + */ + + fs_size = 0; + + if (size > 0) { + fs_size = size; + } + else { + /* Get file size from the file system */ + ret = cio_file_native_get_size(cf, &fs_size); + + if (ret != CIO_OK) { + cio_file_report_os_error(); + + return CIO_ERROR; + } + } + + /* If the file is not empty, use file size for the memory map */ + if (fs_size > 0) { + size = fs_size; + cf->synced = CIO_TRUE; + } + else if (fs_size == 0) { + /* We can only prepare a file if it has been opened in RW mode */ + if ((cf->flags & CIO_OPEN_RW) == 0) { + cio_error_set(ch, CIO_ERR_PERMISSION); + + return CIO_CORRUPTED; + } + + cf->synced = CIO_FALSE; + + /* Adjust size to make room for headers */ + if (size < CIO_FILE_HEADER_MIN) { + size += CIO_FILE_HEADER_MIN; + } + + /* For empty files, make room in the file system */ + size = ROUND_UP(size, ctx->page_size); + ret = cio_file_resize(cf, size); + + if (ret != CIO_OK) { + cio_log_error(ctx, "cannot adjust chunk size '%s' to %lu bytes", + cf->path, size); + + return CIO_ERROR; + } + + cio_log_debug(ctx, "%s:%s adjusting size OK", ch->st->name, ch->name); + } + + cf->alloc_size = size; + + /* Map the file */ + ret = cio_file_native_map(cf, cf->alloc_size); + + if (ret != CIO_OK) { + cio_log_error(ctx, "cannot mmap/read chunk '%s'", cf->path); + + return CIO_ERROR; + } + + /* check content data size */ + if (fs_size > 0) { + content_size = cio_file_st_get_content_len(cf->map, + fs_size, + cf->page_size); + + if (content_size == -1) { + cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); + + cio_log_error(ctx, "invalid content size %s", cf->path); + + cio_file_native_unmap(cf); + + cf->data_size = 0; + cf->alloc_size = 0; + + return CIO_CORRUPTED; + } + + + cf->data_size = content_size; + cf->fs_size = fs_size; + } + else { + cf->data_size = 0; + cf->fs_size = 0; + } + + ret = cio_file_format_check(ch, cf, cf->flags); + + if (ret != 0) { + cio_log_error(ctx, "format check failed: %s/%s", + ch->st->name, ch->name); + + cio_file_native_unmap(cf); + + cf->data_size = 0; + + return CIO_CORRUPTED; + } + + cf->st_content = cio_file_st_get_content(cf->map); + cio_log_debug(ctx, "%s:%s mapped OK", ch->st->name, ch->name); + + /* The mmap succeeded, adjust the counters */ + cio_chunk_counter_total_up_add(ctx); + + return CIO_OK; +} + +int cio_file_lookup_user(char *user, void **result) +{ + return cio_file_native_lookup_user(user, result); +} + +int cio_file_lookup_group(char *group, void **result) +{ + return cio_file_native_lookup_group(group, result); +} + +int cio_file_read_prepare(struct cio_ctx *ctx, struct cio_chunk *ch) +{ + return mmap_file(ctx, ch, 0); +} + +int cio_file_content_copy(struct cio_chunk *ch, + void **out_buf, size_t *out_size) +{ + int ret; + int set_down = CIO_FALSE; + char *buf; + char *data = NULL; + size_t size; + struct cio_file *cf = ch->backend; + + /* If the file content is already up, just do a copy of the memory map */ + if (cio_chunk_is_up(ch) == CIO_FALSE) { + ret = cio_chunk_up_force(ch); + if (ret != CIO_OK ){ + return CIO_ERROR; + } + set_down = CIO_TRUE; + } + + size = cf->data_size; + data = cio_file_st_get_content(cf->map); + + if (!data) { + if (set_down == CIO_TRUE) { + cio_chunk_down(ch); + } + return CIO_ERROR; + } + + buf = malloc(size + 1); + if (!buf) { + cio_errno(); + if (set_down == CIO_TRUE) { + cio_chunk_down(ch); + } + return CIO_ERROR; + } + memcpy(buf, data, size); + buf[size] = '\0'; + + *out_buf = buf; + *out_size = size; + + if (set_down == CIO_TRUE) { + cio_chunk_down(ch); + } + + return CIO_OK; +} + +/* + * If the maximum number of 'up' chunks is reached, put this chunk + * down (only at open time). + */ +static inline int open_and_up(struct cio_ctx *ctx) +{ + if (ctx->total_chunks_up >= ctx->max_chunks_up) { + return CIO_FALSE; + } + + return CIO_TRUE; +} + +/* + * Fetch the file size regardless of if we opened this file or not. + */ +size_t cio_file_real_size(struct cio_file *cf) +{ + size_t file_size; + int ret; + + ret = cio_file_native_get_size(cf, &file_size); + + if (ret != CIO_OK) { + return 0; + } + + return file_size; +} + +static int format_acl_error_message(struct cio_ctx *ctx, + struct cio_file *cf, + char *output_buffer, + size_t output_buffer_size) +{ + char *connector; + int result; + char *group; + char *user; + + user = ctx->options.user; + group = ctx->options.group; + connector = "with group"; + + if (user == NULL) { + user = ""; + connector = ""; + } + + if (group == NULL) { + group = ""; + connector = ""; + } + + result = snprintf(output_buffer, output_buffer_size - 1, + "cannot change ownership of %s to %s %s %s", + cf->path, user, connector, group); + + if (result < 0) { + return CIO_ERROR; + } + + return CIO_OK; +} + +/* + * Open or create a data file: the following behavior is expected depending + * of the passed flags: + * + * CIO_OPEN | CIO_OPEN_RW: + * - Open for read/write, if the file don't exist, it's created and the + * memory map size is assigned to the given value on 'size'. + * + * CIO_OPEN_RD: + * - If file exists, open it in read-only mode. + */ +struct cio_file *cio_file_open(struct cio_ctx *ctx, + struct cio_stream *st, + struct cio_chunk *ch, + int flags, + size_t size, + int *err) +{ + char error_message[256]; + char *path; + int ret; + struct cio_file *cf; + + (void) size; + + ret = cio_file_native_filename_check(ch->name); + if (ret != CIO_OK) { + cio_log_error(ctx, "[cio file] invalid file name"); + + return NULL; + } + + path = cio_file_native_compose_path(ctx->options.root_path, st->name, ch->name); + if (path == NULL) { + return NULL; + } + + /* Create file context */ + cf = calloc(1, sizeof(struct cio_file)); + if (!cf) { + cio_errno(); + free(path); + + return NULL; + } + + cf->fd = -1; + cf->flags = flags; + cf->page_size = cio_getpagesize(); + + if (ctx->realloc_size_hint > 0) { + cf->realloc_size = ctx->realloc_size_hint; + } + else { + cf->realloc_size = CIO_REALLOC_HINT_MIN; + } + + cf->st_content = NULL; + cf->crc_cur = cio_crc32_init(); + cf->path = path; + cf->map = NULL; + ch->backend = cf; + +#ifdef _WIN32 + cf->backing_file = INVALID_HANDLE_VALUE; + cf->backing_mapping = INVALID_HANDLE_VALUE; +#endif + +#if defined (CIO_HAVE_FALLOCATE) + cf->allocate_strategy = CIO_FILE_LINUX_FALLOCATE; +#endif + + /* Should we open and put this file up ? */ + ret = open_and_up(ctx); + + if (ret == CIO_FALSE) { + /* we reached our limit, leave the file 'down' */ + cio_file_update_size(cf); + + /* + * Due to he current resource limiting logic we could + * get to this point without a file existing so we just + * ignore the error. + */ + + return cf; + } + + /* Open the file */ + ret = cio_file_native_open(cf); + + if (ret != CIO_OK) { + free(path); + free(cf); + + *err = ret; + + return NULL; + } + + /* Update the file size field */ + ret = cio_file_update_size(cf); + + if (ret != CIO_OK) { + cio_file_native_close(cf); + + free(path); + free(cf); + + *err = ret; + + return NULL; + } + + /* Set the file ownership and permissions */ + ret = cio_file_native_apply_acl_and_settings(ctx, cf); + + if (ret != CIO_OK) { + *err = ret; + + ret = format_acl_error_message(ctx, cf, error_message, sizeof(error_message)); + + if (ret != CIO_OK) { + cio_log_error(ctx, "error generating error message for acl failure"); + } + else { + cio_log_error(ctx, error_message); + } + + cio_file_native_close(cf); + + free(path); + free(cf); + + return NULL; + } + + /* Map the file */ + ret = mmap_file(ctx, ch, cf->fs_size); + if (ret == CIO_ERROR || ret == CIO_CORRUPTED || ret == CIO_RETRY) { + cio_file_native_close(cf); + + free(path); + free(cf); + + *err = ret; + + return NULL; + } + + *err = CIO_OK; + + return cf; +} + +/* This function is used to delete a chunk by name, its only purpose is to delete + * chunks that cannnot be loaded (otherwise we would set them down with the delete + * flag set to TRUE). + */ +int cio_file_delete(struct cio_ctx *ctx, struct cio_stream *st, const char *name) +{ + char *path; + int ret; + + ret = cio_file_native_filename_check((char *) name); + if (ret != CIO_OK) { + cio_log_error(ctx, "[cio file] invalid file name"); + + return CIO_ERROR; + } + + path = cio_file_native_compose_path(ctx->options.root_path, st->name, (char *) name); + if (path == NULL) { + return CIO_ERROR; + } + + ret = cio_file_native_delete_by_path(path); + + free(path); + + return ret; +} + +/* + * Put a file content back into memory, only IF it has been set 'down' + * before. + */ +static int _cio_file_up(struct cio_chunk *ch, int enforced) +{ + int ret; + struct cio_file *cf = (struct cio_file *) ch->backend; + + if (cf->map) { + cio_log_error(ch->ctx, "[cio file] file is already mapped: %s/%s", + ch->st->name, ch->name); + return CIO_ERROR; + } + + if (cf->fd > 0) { + cio_log_error(ch->ctx, "[cio file] file descriptor already exists: " + "[fd=%i] %s:%s", cf->fd, ch->st->name, ch->name); + return CIO_ERROR; + } + + /* + * Enforced mechanism provides safety based on Chunk I/O storage + * pre-set limits. + */ + if (enforced == CIO_TRUE) { + ret = open_and_up(ch->ctx); + if (ret == CIO_FALSE) { + return CIO_ERROR; + } + } + + /* Open file */ + ret = cio_file_native_open(cf); + + if (ret != CIO_OK) { + cio_log_error(ch->ctx, "[cio file] cannot open chunk: %s/%s", + ch->st->name, ch->name); + return CIO_ERROR; + } + + ret = cio_file_update_size(cf); + if (ret != CIO_OK) { + return CIO_ERROR; + } + + /* + * Map content: + * + * return values = CIO_OK, CIO_ERROR, CIO_CORRUPTED or CIO_RETRY + */ + ret = mmap_file(ch->ctx, ch, cf->fs_size); + if (ret == CIO_ERROR) { + cio_log_error(ch->ctx, "[cio file] cannot map chunk: %s/%s", + ch->st->name, ch->name); + } + + /* + * 'ret' can still be CIO_CORRUPTED or CIO_RETRY on those cases we + * close the file descriptor + */ + if (ret == CIO_CORRUPTED || ret == CIO_RETRY) { + /* + * we just remove resources: close the recently opened file + * descriptor, we never delete the Chunk at this stage since + * the caller must take that action. + */ + cio_file_native_close(cf); + } + + return ret; +} + +/* + * Load a file using 'enforced' mode: do not load the file in memory + * if we already passed memory or max_chunks_up restrictions. + */ +int cio_file_up(struct cio_chunk *ch) +{ + return _cio_file_up(ch, CIO_TRUE); +} + +/* Load a file in non-enforced mode. This means it will load the file + * in memory skipping restrictions set by configuration. + * + * The use case of this call is when the caller needs to write data + * to a file which is down due to restrictions. But then the caller + * must put the chunk 'down' again if that was it original status. + */ +int cio_file_up_force(struct cio_chunk *ch) +{ + return _cio_file_up(ch, CIO_FALSE); +} + +int cio_file_update_size(struct cio_file *cf) +{ + int result; + + result = cio_file_native_get_size(cf, &cf->fs_size); + + if (result != CIO_OK) { + cf->fs_size = 0; + } + + return result; +} + +/* Release memory and file descriptor resources but keep context */ +int cio_file_down(struct cio_chunk *ch) +{ + int ret; + struct cio_file *cf; + + cf = (struct cio_file *) ch->backend; + + if (cf->map == NULL) { + cio_log_error(ch->ctx, "[cio file] file is not mapped: %s/%s", + ch->st->name, ch->name); + return -1; + } + + /* unmap memory */ + munmap_file(ch->ctx, ch); + + /* Allocated map size is zero */ + cf->alloc_size = 0; + + /* Update the file size */ + ret = cio_file_update_size(cf); + + if (ret != CIO_OK) { + cio_errno(); + } + + /* Close file descriptor */ + cio_file_native_close(cf); + + return 0; +} + +void cio_file_close(struct cio_chunk *ch, int delete) +{ + int ret; + struct cio_file *cf; + + cf = (struct cio_file *) ch->backend; + + if (cf == NULL) { + return; + } + + /* Safe unmap of the file content */ + munmap_file(ch->ctx, ch); + + /* Close file descriptor */ + cio_file_native_close(cf); + + /* Should we delete the content from the file system ? */ + if (delete == CIO_TRUE) { + ret = cio_file_native_delete(cf); + + if (ret != CIO_OK) { + cio_log_error(ch->ctx, + "[cio file] error deleting file at close %s:%s", + ch->st->name, ch->name); + } + } + + free(cf->path); + free(cf); +} + + +int cio_file_write(struct cio_chunk *ch, const void *buf, size_t count) +{ + int ret; + int meta_len; + int pre_content; + size_t av_size; + size_t old_size; + size_t new_size; + struct cio_file *cf; + + if (count == 0) { + /* do nothing */ + return 0; + } + + if (!ch) { + return -1; + } + + cf = (struct cio_file *) ch->backend; + + if (cio_chunk_is_up(ch) == CIO_FALSE) { + cio_log_error(ch->ctx, "[cio file] file is not mmap()ed: %s:%s", + ch->st->name, ch->name); + return -1; + } + + /* get available size */ + av_size = get_available_size(cf, &meta_len); + + /* validate there is enough space, otherwise resize */ + if (av_size < count) { + /* Set the pre-content size (chunk header + metadata) */ + pre_content = (CIO_FILE_HEADER_MIN + meta_len); + + new_size = cf->alloc_size + cf->realloc_size; + while (new_size < (pre_content + cf->data_size + count)) { + new_size += cf->realloc_size; + } + + old_size = cf->alloc_size; + new_size = ROUND_UP(new_size, ch->ctx->page_size); + + ret = cio_file_resize(cf, new_size); + + if (ret != CIO_OK) { + cio_log_error(ch->ctx, + "[cio_file] error setting new file size on write"); + return -1; + } + + cio_log_debug(ch->ctx, + "[cio file] alloc_size from %lu to %lu", + old_size, new_size); + } + + /* If crc_reset was toggled we know that data_size was + * modified by cio_chunk_write_at which means we need + * to update the header before we recalculate the checksum + */ + if (cf->crc_reset) { + cio_file_st_set_content_len(cf->map, cf->data_size); + } + + if (ch->ctx->options.flags & CIO_CHECKSUM) { + update_checksum(cf, (unsigned char *) buf, count); + } + + cf->st_content = cio_file_st_get_content(cf->map); + memcpy(cf->st_content + cf->data_size, buf, count); + + cf->data_size += count; + cf->synced = CIO_FALSE; + + cio_file_st_set_content_len(cf->map, cf->data_size); + + return 0; +} + +int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size) +{ + int ret; + char *meta; + char *cur_content_data; + char *new_content_data; + size_t new_size; + size_t content_av; + size_t meta_av; + struct cio_file *cf; + + cf = ch->backend; + + if (cio_file_is_up(ch, cf) == CIO_FALSE) { + return -1; + } + + /* Get metadata pointer */ + meta = cio_file_st_get_meta(cf->map); + + /* Check if meta already have some space available to overwrite */ + meta_av = cio_file_st_get_meta_len(cf->map); + + /* If there is some space available, just overwrite */ + if (meta_av >= size) { + /* copy new metadata */ + memcpy(meta, buf, size); + + /* there are some remaining bytes, adjust.. */ + cur_content_data = cio_file_st_get_content(cf->map); + new_content_data = meta + size; + memmove(new_content_data, cur_content_data, cf->data_size); + adjust_layout(ch, cf, size); + + return 0; + } + + /* + * The optimal case is if there is no content data, the non-optimal case + * where we need to increase the memory map size, move the content area + * bytes to a different position and write the metadata. + * + * Calculate the available space in the content area. + */ + content_av = cf->alloc_size - cf->data_size; + + /* If there is no enough space, increase the file size and it memory map */ + if (content_av < size) { + new_size = (size - meta_av) + cf->data_size + CIO_FILE_HEADER_MIN; + + ret = cio_file_resize(cf, new_size); + + if (ret != CIO_OK) { + cio_log_error(ch->ctx, + "[cio meta] error resizing mapped file"); + + return -1; + } + } + + /* get meta reference again in case the map address has changed */ + meta = cio_file_st_get_meta(cf->map); + + /* set new position for the content data */ + cur_content_data = cio_file_st_get_content(cf->map); + new_content_data = meta + size; + memmove(new_content_data, cur_content_data, size); + + /* copy new metadata */ + memcpy(meta, buf, size); + adjust_layout(ch, cf, size); + + return 0; +} + +int cio_file_sync(struct cio_chunk *ch) +{ + int ret; + int meta_len; + size_t desired_size; + size_t file_size; + size_t av_size; + struct cio_file *cf; + + if (ch == NULL) { + return -1; + } + + cf = (struct cio_file *) ch->backend; + + if (cf == NULL) { + return -1; + } + + if (cf->flags & CIO_OPEN_RD) { + return 0; + } + + if (cf->synced == CIO_TRUE) { + return 0; + } + + ret = cio_file_native_get_size(cf, &file_size); + + if (ret != CIO_OK) { + cio_file_report_os_error(); + + return -1; + } + + /* File trimming has been made opt-in because it causes + * performance degradation and excessive fragmentation + * in XFS. + */ + if ((ch->ctx->options.flags & CIO_TRIM_FILES) != 0) { + /* If there are extra space, truncate the file size */ + av_size = get_available_size(cf, &meta_len); + + if (av_size > 0) { + desired_size = cf->alloc_size - av_size; + } + else if (cf->alloc_size > file_size) { + desired_size = cf->alloc_size; + } + else { + desired_size = file_size; + } + + if (desired_size != file_size) { + /* When file trimming is enabled we still round the file size up + * to the memory page size because even though not explicitly + * stated there seems to be a performance degradation issue that + * correlates with sub-page mapping. + */ + desired_size = ROUND_UP(desired_size, ch->ctx->page_size); + + ret = cio_file_resize(cf, desired_size); + + if (ret != CIO_OK) { + cio_log_error(ch->ctx, + "[cio file sync] error adjusting size at: " + " %s/%s", ch->st->name, ch->name); + + return ret; + } + } + } + + /* Finalize CRC32 checksum */ + if (ch->ctx->options.flags & CIO_CHECKSUM) { + finalize_checksum(cf); + } + + /* Commit changes to disk */ + ret = cio_file_native_sync(cf, ch->ctx->options.flags); + + if (ret != CIO_OK) { + return -1; + } + + cf->synced = CIO_TRUE; + + ret = cio_file_update_size(cf); + + if (ret != CIO_OK) { + return -1; + } + + cio_log_debug(ch->ctx, "[cio file] synced at: %s/%s", + ch->st->name, ch->name); + + return 0; +} + +int cio_file_resize(struct cio_file *cf, size_t new_size) +{ + int inner_result; + size_t mapped_size; + int mapped_flag; + int result; + + mapped_flag = cio_file_native_is_mapped(cf); + mapped_size = cf->alloc_size; + +#ifdef _WIN32 + if (mapped_flag) { + result = cio_file_native_unmap(cf); + + if (result != CIO_OK) { + return result; + } + } +#endif + + result = cio_file_native_resize(cf, new_size); + + if (result != CIO_OK) { + cio_file_native_report_os_error(); + +#ifdef _WIN32 + if (mapped_flag) { + inner_result = cio_file_native_map(cf, mapped_size); + } +#endif + + return result; + } + + if (mapped_flag) { +#ifdef _WIN32 + result = cio_file_native_map(cf, new_size); +#else + result = cio_file_native_remap(cf, new_size); +#endif + + if (result != CIO_OK) { + return result; + } + } + + (void) mapped_size; + (void) inner_result; + + return CIO_OK; +} + +char *cio_file_hash(struct cio_file *cf) +{ + return (cf->map + 2); +} + +void cio_file_hash_print(struct cio_file *cf) +{ + printf("crc cur=%lu\n", (long unsigned int)cf->crc_cur); + printf("%08lx\n", (long unsigned int ) cf->crc_cur); +} + +/* Dump files from given stream */ +void cio_file_scan_dump(struct cio_ctx *ctx, struct cio_stream *st) +{ + int ret; + int meta_len; + int set_down = CIO_FALSE; + char *p; + crc_t crc; + crc_t crc_fs; + char tmp[PATH_MAX]; + struct mk_list *head; + struct cio_chunk *ch; + struct cio_file *cf; + + mk_list_foreach(head, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + cf = ch->backend; + + if (cio_file_is_up(ch, cf) == CIO_FALSE) { + ret = cio_file_up(ch); + if (ret == -1) { + continue; + } + set_down = CIO_TRUE; + } + + snprintf(tmp, sizeof(tmp) -1, "%s/%s", st->name, ch->name); + meta_len = cio_file_st_get_meta_len(cf->map); + + p = cio_file_st_get_hash(cf->map); + + memcpy(&crc_fs, p, sizeof(crc_fs)); + crc_fs = ntohl(crc_fs); + + printf(" %-60s", tmp); + + /* + * the crc32 specified in the file is stored in 'val' now, if + * checksum mode is enabled we have to verify it. + */ + if (ctx->options.flags & CIO_CHECKSUM) { + cio_file_calculate_checksum(cf, &crc); + + /* + * finalize the checksum and compare it value using the + * host byte order. + */ + crc = cio_crc32_finalize(crc); + if (crc != crc_fs) { + printf("checksum error=%08x expected=%08x, ", + (uint32_t) crc_fs, (uint32_t) crc); + } + } + printf("meta_len=%d, data_size=%zu, crc=%08x\n", + meta_len, cf->data_size, (uint32_t) crc_fs); + + if (set_down == CIO_TRUE) { + cio_file_down(ch); + } + } +} + +/* Check if a file content is up in memory and a file descriptor is set */ +int cio_file_is_up(struct cio_chunk *ch, struct cio_file *cf) +{ + (void) ch; + + if (cio_file_native_is_open(cf) && + cio_file_native_is_mapped(cf)) { + return CIO_TRUE; + } + + return CIO_FALSE; +} diff --git a/fluent-bit/lib/chunkio/src/cio_file_unix.c b/fluent-bit/lib/chunkio/src/cio_file_unix.c new file mode 100644 index 00000000..72d49312 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_file_unix.c @@ -0,0 +1,570 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/mman.h> +#include <limits.h> +#include <pwd.h> +#include <grp.h> + +#include <chunkio/chunkio.h> +#include <chunkio/chunkio_compat.h> +#include <chunkio/cio_crc32.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_file_native.h> +#include <chunkio/cio_file_st.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_error.h> +#include <chunkio/cio_utils.h> + + +int cio_file_native_unmap(struct cio_file *cf) +{ + int ret; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (!cio_file_native_is_mapped(cf)) { + return CIO_OK; + } + + ret = munmap(cf->map, cf->alloc_size); + + if (ret != 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->alloc_size = 0; + cf->map = NULL; + + return CIO_OK; +} + +int cio_file_native_map(struct cio_file *cf, size_t map_size) +{ + int flags; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (!cio_file_native_is_open(cf)) { + return CIO_ERROR; + } + + if (cio_file_native_is_mapped(cf)) { + return CIO_OK; + } + + if (cf->flags & CIO_OPEN_RW) { + flags = PROT_READ | PROT_WRITE; + } + else if (cf->flags & CIO_OPEN_RD) { + flags = PROT_READ; + } + else { + return CIO_ERROR; + } + + cf->map = mmap(0, map_size, flags, MAP_SHARED, cf->fd, 0); + + if (cf->map == MAP_FAILED) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->alloc_size = map_size; + + return CIO_OK; +} + +int cio_file_native_remap(struct cio_file *cf, size_t new_size) +{ + int result; + void *tmp; + + result = 0; + +/* OSX mman does not implement mremap or MREMAP_MAYMOVE. */ +#ifndef MREMAP_MAYMOVE + result = cio_file_native_unmap(cf); + + if (result == -1) { + return result; + } + + tmp = mmap(0, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, cf->fd, 0); +#else + (void) result; + + tmp = mremap(cf->map, cf->alloc_size, new_size, MREMAP_MAYMOVE); +#endif + + if (tmp == MAP_FAILED) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->map = tmp; + cf->alloc_size = new_size; + + return CIO_OK; +} + +int cio_file_native_lookup_user(char *user, void **result) +{ + long query_buffer_size; + struct passwd *query_result; + char *query_buffer; + struct passwd passwd_entry; + int api_result; + + if (user == NULL) { + *result = calloc(1, sizeof(uid_t)); + + if (*result == NULL) { + cio_file_native_report_runtime_error(); + + return CIO_ERROR; + } + + **(uid_t **) result = (uid_t) -1; + } + + query_buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX); + + if (query_buffer_size == -1) { + query_buffer_size = 4096 * 10; + } + + query_buffer = calloc(1, query_buffer_size); + + if (query_buffer == NULL) { + return CIO_ERROR; + } + + query_result = NULL; + + api_result = getpwnam_r(user, &passwd_entry, query_buffer, + query_buffer_size, &query_result); + + if (api_result != 0 || query_result == NULL) { + cio_file_native_report_os_error(); + + free(query_buffer); + + return CIO_ERROR; + } + + *result = calloc(1, sizeof(uid_t)); + + if (*result == NULL) { + cio_file_native_report_runtime_error(); + + free(query_buffer); + + return CIO_ERROR; + } + + **(uid_t **) result = query_result->pw_uid; + + free(query_buffer); + + return CIO_OK; +} + +int cio_file_native_lookup_group(char *group, void **result) +{ + long query_buffer_size; + struct group *query_result; + char *query_buffer; + struct group group_entry; + int api_result; + + if (group == NULL) { + *result = calloc(1, sizeof(gid_t)); + + if (*result == NULL) { + cio_file_native_report_runtime_error(); + + return CIO_ERROR; + } + + **(gid_t **) result = (gid_t) -1; + } + + query_buffer_size = sysconf(_SC_GETGR_R_SIZE_MAX); + + if (query_buffer_size == -1) { + query_buffer_size = 4096 * 10; + } + + query_buffer = calloc(1, query_buffer_size); + + if (query_buffer == NULL) { + return CIO_ERROR; + } + + query_result = NULL; + + api_result = getgrnam_r(group, &group_entry, query_buffer, + query_buffer_size, &query_result); + + if (api_result != 0 || query_result == NULL) { + cio_file_native_report_os_error(); + + free(query_buffer); + + return CIO_ERROR; + } + + *result = calloc(1, sizeof(gid_t)); + + if (*result == NULL) { + cio_file_native_report_runtime_error(); + + free(query_buffer); + + return CIO_ERROR; + } + + **(gid_t **) result = query_result->gr_gid; + + free(query_buffer); + + return CIO_OK; +} + +int cio_file_native_apply_acl_and_settings(struct cio_ctx *ctx, struct cio_file *cf) +{ + mode_t filesystem_acl; + gid_t numeric_group; + uid_t numeric_user; + int result; + + numeric_group = -1; + numeric_user = -1; + + if (ctx->processed_user != NULL) { + numeric_user = *(uid_t *) ctx->processed_user; + } + + if (ctx->processed_group != NULL) { + numeric_group = *(gid_t *) ctx->processed_group; + } + + if (numeric_user != -1 || numeric_group != -1) { + result = chown(cf->path, numeric_user, numeric_group); + + if (result == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + } + + if (ctx->options.chmod != NULL) { + filesystem_acl = strtoul(ctx->options.chmod, NULL, 8); + + result = chmod(cf->path, filesystem_acl); + + if (result == -1) { + cio_file_native_report_os_error(); + + cio_log_error(ctx, "cannot change acl of %s to %s", + cf->path, ctx->options.user); + + return CIO_ERROR; + } + } + + return CIO_OK; +} + +int cio_file_native_get_size(struct cio_file *cf, size_t *file_size) +{ + int ret; + struct stat st; + + ret = -1; + + if (cio_file_native_is_open(cf)) { + ret = fstat(cf->fd, &st); + } + + if (ret == -1) { + ret = stat(cf->path, &st); + } + + if (ret == -1) { + return CIO_ERROR; + } + + if (file_size != NULL) { + *file_size = st.st_size; + } + + return CIO_OK; +} + +char *cio_file_native_compose_path(char *root_path, char *stream_name, + char *chunk_name) +{ + size_t psize; + char *path; + int ret; + + /* Compose path for the file */ + psize = strlen(root_path) + + strlen(stream_name) + + strlen(chunk_name) + + 8; + + path = malloc(psize); + + if (path == NULL) { + cio_file_native_report_runtime_error(); + + return NULL; + } + + ret = snprintf(path, psize, "%s/%s/%s", + root_path, stream_name, chunk_name); + + if (ret == -1) { + cio_file_native_report_runtime_error(); + + free(path); + + return NULL; + } + + return path; +} + +int cio_file_native_filename_check(char *name) +{ + size_t len; + + len = strlen(name); + + if (len == 0) { + return CIO_ERROR; + } + if (len == 1) { + if ((name[0] == '.' || name[0] == '/')) { + return CIO_ERROR; + } + } + + return CIO_OK; +} + +int cio_file_native_open(struct cio_file *cf) +{ + if (cio_file_native_is_open(cf)) { + return CIO_OK; + } + + /* Open file descriptor */ + if (cf->flags & CIO_OPEN_RW) { + cf->fd = open(cf->path, O_RDWR | O_CREAT, (mode_t) 0600); + } + else if (cf->flags & CIO_OPEN_RD) { + cf->fd = open(cf->path, O_RDONLY); + } + + if (cf->fd == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_close(struct cio_file *cf) +{ + int result; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (cio_file_native_is_open(cf)) { + result = close(cf->fd); + + if (result == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->fd = -1; + } + + return CIO_OK; +} + +int cio_file_native_delete_by_path(const char *path) +{ + int result; + + result = unlink(path); + + if (result == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_delete(struct cio_file *cf) +{ + int result; + + if (cio_file_native_is_open(cf) || + cio_file_native_is_mapped(cf)) { + return CIO_ERROR; + } + + result = unlink(cf->path); + + if (result == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_sync(struct cio_file *cf, int sync_mode) +{ + int result; + + if (sync_mode & CIO_FULL_SYNC) { + sync_mode = MS_SYNC; + } + else { + sync_mode = MS_ASYNC; + } + + result = msync(cf->map, cf->alloc_size, sync_mode); + + if (result == -1) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_resize(struct cio_file *cf, size_t new_size) +{ + int fallocate_available; + int result; + + result = -1; + +#if defined(CIO_HAVE_FALLOCATE) || defined(CIO_HAVE_POSIX_FALLOCATE) + fallocate_available = CIO_TRUE; +#else + fallocate_available = CIO_FALSE; +#endif + + /* + * fallocate() is not portable an Linux only. Since macOS does not have + * fallocate() we use ftruncate(). + */ + if (fallocate_available && new_size > cf->fs_size) { + retry: + + if (cf->allocate_strategy == CIO_FILE_LINUX_FALLOCATE) { + /* + * To increase the file size we use fallocate() since this option + * will send a proper ENOSPC error if the file system ran out of + * space. ftruncate() will not fail and upon memcpy() over the + * mmap area it will trigger a 'Bus Error' crashing the program. + * + * fallocate() is not portable, Linux only. + */ +#if defined(CIO_HAVE_FALLOCATE) + result = fallocate(cf->fd, 0, 0, new_size); + +#elif defined(CIO_HAVE_POSIX_FALLOCATE) + result = -1; + errno = EOPNOTSUPP; +#endif + + if (result == -1 && errno == EOPNOTSUPP) { + /* + * If fallocate fails with an EOPNOTSUPP try operation using + * posix_fallocate. Required since some filesystems do not support + * the fallocate operation e.g. ext3 and reiserfs. + */ + cf->allocate_strategy = CIO_FILE_LINUX_POSIX_FALLOCATE; + goto retry; + } + } + else if (cf->allocate_strategy == CIO_FILE_LINUX_POSIX_FALLOCATE) { +#if defined(CIO_HAVE_POSIX_FALLOCATE) + result = posix_fallocate(cf->fd, 0, new_size); +#else + goto fallback; +#endif + } + } + else + { +#if !defined(CIO_HAVE_POSIX_FALLOCATE) + fallback: +#endif + + result = ftruncate(cf->fd, new_size); + } + + if (result) { + cio_file_native_report_os_error(); + } + else { + cf->fs_size = new_size; + } + + return result; +} diff --git a/fluent-bit/lib/chunkio/src/cio_file_win32.c b/fluent-bit/lib/chunkio/src/cio_file_win32.c new file mode 100644 index 00000000..18044c40 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_file_win32.c @@ -0,0 +1,549 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <inttypes.h> +#include <stdio.h> + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_crc32.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_file_native.h> +#include <chunkio/cio_file_st.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_stream.h> + +int cio_file_native_unmap(struct cio_file *cf) +{ + int result; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (!cio_file_native_is_open(cf)) { + return CIO_OK; + } + + if (!cio_file_native_is_mapped(cf)) { + return CIO_OK; + } + + result = UnmapViewOfFile(cf->map); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + CloseHandle(cf->backing_mapping); + + cf->backing_mapping = INVALID_HANDLE_VALUE; + cf->alloc_size = 0; + cf->map = NULL; + + return CIO_OK; +} + +int cio_file_native_map(struct cio_file *cf, size_t map_size) +{ + DWORD desired_protection; + DWORD desired_access; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (!cio_file_native_is_open(cf)) { + return CIO_ERROR; + } + + if (cio_file_native_is_mapped(cf)) { + return CIO_OK; + } + + if (cf->flags & CIO_OPEN_RW) { + desired_protection = PAGE_READWRITE; + desired_access = FILE_MAP_ALL_ACCESS; + } + else if (cf->flags & CIO_OPEN_RD) { + desired_protection = PAGE_READONLY; + desired_access = FILE_MAP_READ; + } + else { + return CIO_ERROR; + } + + cf->backing_mapping = CreateFileMappingA(cf->backing_file, NULL, + desired_protection, + 0, 0, NULL); + + if (cf->backing_mapping == NULL) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->map = MapViewOfFile(cf->backing_mapping, desired_access, 0, 0, map_size); + + if (cf->map == NULL) { + cio_file_native_report_os_error(); + + CloseHandle(cf->backing_mapping); + + cf->backing_mapping = INVALID_HANDLE_VALUE; + + return CIO_ERROR; + } + + cf->alloc_size = map_size; + + return CIO_OK; +} + +int cio_file_native_remap(struct cio_file *cf, size_t new_size) +{ + /* + * There's no reason for this function to exist because in windows + * we need to unmap, resize and then map again so there's no benefit + * from remapping and I'm not implementing a dummy version because I + * don't want anyone to read it and think there are any reasonable use + * cases for it. + */ + + (void) cf; + (void) new_size; + + return CIO_ERROR; +} + +static SID *perform_sid_lookup(char *account_name, SID_NAME_USE *result_sid_type) +{ + DWORD referenced_domain_name_length; + char referenced_domain_name[256]; + SID *reallocated_sid_buffer; + DWORD sid_buffer_size; + size_t retry_index; + SID *sid_buffer; + SID_NAME_USE sid_type; + int result; + + referenced_domain_name_length = 256; + sid_buffer_size = 256; + + sid_buffer = calloc(1, sid_buffer_size); + + if (sid_buffer == NULL) { + cio_file_native_report_runtime_error(); + + return NULL; + } + + result = 0; + sid_type = SidTypeUnknown; + + for (retry_index = 0 ; retry_index < 5 && !result ; retry_index++) { + result = LookupAccountNameA(NULL, + account_name, + sid_buffer, + &sid_buffer_size, + referenced_domain_name, + &referenced_domain_name_length, + &sid_type); + + if (!result) { + if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) { + sid_buffer_size *= 2; + + reallocated_sid_buffer = realloc(sid_buffer, sid_buffer_size); + + if (reallocated_sid_buffer == NULL) { + cio_file_native_report_runtime_error(); + + free(sid_buffer); + + return NULL; + } + } + else { + cio_file_native_report_os_error(); + + free(sid_buffer); + + return NULL; + } + } + } + + if (result_sid_type != NULL) { + *result_sid_type = sid_type; + } + + return sid_buffer; +} + +static int perform_entity_lookup(char *name, + void **result, + SID_NAME_USE desired_sid_type) +{ + SID_NAME_USE result_sid_type; + + *result = (void **) perform_sid_lookup(name, &result_sid_type); + + if (*result == NULL) { + return CIO_ERROR; + } + + if (desired_sid_type != result_sid_type) { + free(*result); + + *result = NULL; + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_lookup_user(char *user, void **result) +{ + return perform_entity_lookup(user, result, SidTypeUser); +} + +int cio_file_native_lookup_group(char *group, void **result) +{ + return perform_entity_lookup(group, result, SidTypeGroup); +} + +static DWORD cio_file_win_chown(char *path, SID *user, SID *group) +{ + int result; + + /* Ownership here does not work in the same way it works in unixes + * so specifying both a user and group will end up with the group + * overriding the user if possible which can cause some misunderstandings. + */ + + result = ERROR_SUCCESS; + + if (user != NULL) { + result = SetNamedSecurityInfoA(path, SE_FILE_OBJECT, + OWNER_SECURITY_INFORMATION, + user, NULL, NULL, NULL); + } + + if (group != NULL && result == ERROR_SUCCESS) { + result = SetNamedSecurityInfoA(path, SE_FILE_OBJECT, + GROUP_SECURITY_INFORMATION, + group, NULL, NULL, NULL); + } + + return result; +} + +int cio_file_native_apply_acl_and_settings(struct cio_ctx *ctx, struct cio_file *cf) +{ + int result; + + if (ctx->processed_user != NULL) { + result = cio_file_win_chown(cf->path, ctx->processed_user, ctx->processed_group); + + if (result != ERROR_SUCCESS) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + } + + return CIO_OK; +} + +static int get_file_size_by_handle(struct cio_file *cf, size_t *file_size) +{ + LARGE_INTEGER native_file_size; + int ret; + + memset(&native_file_size, 0, sizeof(native_file_size)); + + ret = GetFileSizeEx(cf->backing_file, &native_file_size); + + if (ret == 0) { + return CIO_ERROR; + } + + if (file_size != NULL) { + *file_size = (size_t) native_file_size.QuadPart; + } + + return CIO_OK; +} + +static int get_file_size_by_path(struct cio_file *cf, size_t *file_size) +{ + int ret; +#ifdef _WIN64 + struct _stat64 st; +#else + struct _stat32 st; +#endif + +#ifdef _WIN64 + ret = _stat64(cf->path, &st); +#else + ret = _stat32(cf->path, &st); +#endif + + if (ret == -1) { + return CIO_ERROR; + } + + if (file_size != NULL) { + *file_size = st.st_size; + } + + return CIO_OK; +} + +int cio_file_native_get_size(struct cio_file *cf, size_t *file_size) +{ + int ret; + + ret = CIO_ERROR; + + if (cf->backing_file != INVALID_HANDLE_VALUE) { + ret = get_file_size_by_handle(cf, file_size); + } + + if (ret != CIO_OK) { + ret = get_file_size_by_path(cf, file_size); + } + + return ret; +} + +char *cio_file_native_compose_path(char *root_path, char *stream_name, + char *chunk_name) +{ + size_t psize; + char *path; + int ret; + + /* Compose path for the file */ + psize = strlen(root_path) + + strlen(stream_name) + + strlen(chunk_name) + + 3; + + path = malloc(psize); + + if (path == NULL) { + cio_file_native_report_runtime_error(); + + return NULL; + } + + ret = snprintf(path, psize, "%s\\%s\\%s", + root_path, stream_name, chunk_name); + + if (ret == -1) { + cio_file_native_report_runtime_error(); + + free(path); + + return NULL; + } + + return path; +} + +int cio_file_native_filename_check(char *name) +{ + size_t len; + + len = strlen(name); + + if (len == 0) { + return CIO_ERROR; + } + else if (len == 1) { + if (name[0] == '\\' || name[0] == '.' || name[0] == '/') { + return CIO_ERROR; + } + } + + return CIO_OK; +} + +int cio_file_native_open(struct cio_file *cf) +{ + DWORD creation_disposition; + DWORD desired_access; + + if (cio_file_native_is_open(cf)) { + return CIO_OK; + } + + if (cf->flags & CIO_OPEN) { + desired_access = GENERIC_READ | GENERIC_WRITE; + creation_disposition = OPEN_ALWAYS; + } + else if (cf->flags & CIO_OPEN_RD) { + desired_access = GENERIC_READ; + creation_disposition = OPEN_EXISTING; + } + else { + return CIO_ERROR; + } + + cf->backing_file = CreateFileA(cf->path, + desired_access, + FILE_SHARE_DELETE | + FILE_SHARE_READ | + FILE_SHARE_WRITE, + NULL, + creation_disposition, + FILE_ATTRIBUTE_NORMAL, + NULL); + + if (cf->backing_file == INVALID_HANDLE_VALUE) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_close(struct cio_file *cf) +{ + int result; + + if (cf == NULL) { + return CIO_ERROR; + } + + if (cio_file_native_is_open(cf)) { + result = CloseHandle(cf->backing_file); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->backing_file = INVALID_HANDLE_VALUE; + } + + return CIO_OK; +} + +int cio_file_native_delete_by_path(const char *path) +{ + int result; + + result = DeleteFileA(path); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_delete(struct cio_file *cf) +{ + int result; + + result = DeleteFileA(cf->path); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_file_native_sync(struct cio_file *cf, int sync_mode) +{ + int result; + + result = FlushViewOfFile(cf->map, cf->alloc_size); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + if (sync_mode & CIO_FULL_SYNC) { + result = FlushFileBuffers(cf->backing_file); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + } + + return CIO_OK; +} + +int cio_file_native_resize(struct cio_file *cf, size_t new_size) +{ + LARGE_INTEGER movement_distance; + int result; + + if (!cio_file_native_is_open(cf)) { + return CIO_ERROR; + } + + if (cio_file_native_is_mapped(cf)) { + return CIO_ERROR; + } + + movement_distance.QuadPart = new_size; + + result = SetFilePointerEx(cf->backing_file, + movement_distance, + NULL, FILE_BEGIN); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + result = SetEndOfFile(cf->backing_file); + + if (result == 0) { + cio_file_native_report_os_error(); + + return CIO_ERROR; + } + + cf->fs_size = new_size; + + return CIO_OK; +} diff --git a/fluent-bit/lib/chunkio/src/cio_log.c b/fluent-bit/lib/chunkio/src/cio_log.c new file mode 100644 index 00000000..8c6e8123 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_log.c @@ -0,0 +1,87 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdarg.h> +#include <string.h> + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_log.h> + +void cio_log_print(void *ctx, int level, const char *file, int line, + const char *fmt, ...) +{ + int ret; + char buf[CIO_LOG_BUF_SIZE]; + va_list args; + struct cio_ctx *cio = ctx; + + if (!cio->options.log_cb) { + return; + } + + if (level > cio->options.log_level) { + return; + } + + va_start(args, fmt); + ret = vsnprintf(buf, CIO_LOG_BUF_SIZE - 1, fmt, args); + + if (ret >= 0) { + buf[ret] = '\0'; + } + va_end(args); + + cio->options.log_cb(ctx, level, file, line, buf); +} + +int cio_errno_print(int errnum, const char *file, int line) +{ + char buf[256]; + + strerror_r(errnum, buf, sizeof(buf) - 1); + fprintf(stderr, "[%s:%i errno=%i] %s\n", + file, line, errnum, buf); + return 0; +} + +#ifdef _WIN32 +void cio_winapi_error_print(const char *func, int line) +{ + int error = GetLastError(); + char buf[256]; + int success; + + success = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + error, + LANG_SYSTEM_DEFAULT, + buf, + sizeof(buf), + NULL); + if (success) { + fprintf(stderr, "[%s() line=%i error=%i] %s\n", func, line, error, buf); + } + else { + fprintf(stderr, "[%s() line=%i error=%i] Win32 API failed\n", func, line, error); + } +} +#endif diff --git a/fluent-bit/lib/chunkio/src/cio_memfs.c b/fluent-bit/lib/chunkio/src/cio_memfs.c new file mode 100644 index 00000000..8cd0f6c1 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_memfs.c @@ -0,0 +1,156 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <chunkio/chunkio.h> +#include <chunkio/cio_utils.h> +#include <chunkio/chunkio_compat.h> +#include <chunkio/cio_memfs.h> +#include <chunkio/cio_log.h> + +#include <stdio.h> +#include <string.h> +#include <limits.h> + +struct cio_memfs *cio_memfs_open(struct cio_ctx *ctx, struct cio_stream *st, + struct cio_chunk *ch, int flags, + size_t size) +{ + struct cio_memfs *mf; + + (void) flags; + (void) ctx; + (void) ch; + (void) st; + + mf = calloc(1, sizeof(struct cio_memfs)); + if (!mf) { + cio_errno(); + return NULL; + } + mf->crc_cur = cio_crc32_init(); + + mf->buf_data = malloc(size); + if (!mf->buf_data) { + cio_errno(); + free(mf->name); + free(mf); + return NULL; + } + mf->buf_size = size; + mf->buf_len = 0; + if (ctx->realloc_size_hint > 0) { + mf->realloc_size = ctx->realloc_size_hint; + } else { + mf->realloc_size = cio_getpagesize() * 8; + } + + return mf; +} + +void cio_memfs_close(struct cio_chunk *ch) +{ + struct cio_memfs *mf = ch->backend; + + if (!mf) { + return; + } + + free(mf->name); + free(mf->buf_data); + free(mf->meta_data); + free(mf); +} + +int cio_memfs_write(struct cio_chunk *ch, const void *buf, size_t count) +{ + size_t av_size; + size_t new_size; + char *tmp; + struct cio_memfs *mf = ch->backend; + + if (count == 0) { + return 0; + } + + /* Calculate available size */ + av_size = (mf->buf_size - mf->buf_len); + if (av_size < count) { + + /* Suggest initial new size */ + new_size = mf->buf_size + mf->realloc_size; + while (new_size < (mf->buf_len + count)) { + new_size += mf->realloc_size; + } + + tmp = realloc(mf->buf_data, new_size); + if (!tmp) { + cio_errno(); + return -1; + } + + mf->buf_data = tmp; + mf->buf_size = new_size; + } + + memcpy(mf->buf_data + mf->buf_len, buf, count); + mf->buf_len += count; + + return 0; +} + +int cio_memfs_content_copy(struct cio_chunk *ch, + void **out_buf, size_t *out_size) +{ + char *buf; + struct cio_memfs *mf = ch->backend; + + buf = malloc(mf->buf_len + 1); + if (!buf) { + cio_errno(); + return -1; + } + + /* Copy the data and append an extra NULL byte */ + memcpy(buf, mf->buf_data, mf->buf_len); + buf[mf->buf_len] = '\0'; + + *out_buf = buf; + *out_size = mf->buf_len; + + return 0; +} + +void cio_memfs_scan_dump(struct cio_ctx *ctx, struct cio_stream *st) +{ + char tmp[PATH_MAX]; + struct mk_list *head; + struct cio_memfs *mf; + struct cio_chunk *ch; + + (void) ctx; + + mk_list_foreach(head, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + mf = ch->backend; + + snprintf(tmp, sizeof(tmp) -1, "%s/%s", ch->st->name, ch->name); + printf(" %-60s", tmp); + printf("meta_len=%i, data_size=%zu\n", mf->meta_len, mf->buf_len); + } +} diff --git a/fluent-bit/lib/chunkio/src/cio_meta.c b/fluent-bit/lib/chunkio/src/cio_meta.c new file mode 100644 index 00000000..03e852e0 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_meta.c @@ -0,0 +1,180 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE +#include <string.h> + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_file_st.h> +#include <chunkio/cio_memfs.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_log.h> + +/* + * Metadata is an optional information stored before the content of each file + * and can be used for different purposes. Manipulating metadata can have + * some performance impacts depending on 'when' it's added and how often + * is modified. + * + * For performance reasons, we suggest the metadata be stored before to write + * any data to the content area, otherwise if metadata grows in terms of bytes + * we need to move all the content data to a different position which is not + * ideal. + * + * The caller might want to fix the performance penalties setting up some + * empty metadata with specific sizes. + */ + +int cio_meta_write(struct cio_chunk *ch, char *buf, size_t size) +{ + struct cio_memfs *mf; + + if (size > 65535) { + return -1; + } + + if (ch->st->type == CIO_STORE_MEM) { + mf = (struct cio_memfs *) ch->backend; + if (mf->meta_data) { + free(mf->meta_data); + } + + mf->meta_data = malloc(size); + if (!mf->meta_data) { + cio_errno(); + return -1; + } + memcpy(mf->meta_data, buf, size); + mf->meta_len = size; + return 0; + } + else if (ch->st->type == CIO_STORE_FS) { + return cio_file_write_metadata(ch, buf, size); + } + return -1; +} + +int cio_meta_size(struct cio_chunk *ch) { + if (ch->st->type == CIO_STORE_MEM) { + struct cio_memfs *mf = (struct cio_memfs *) ch->backend; + return mf->meta_len; + } + else if (ch->st->type == CIO_STORE_FS) { + if (cio_file_read_prepare(ch->ctx, ch)) { + return -1; + } + struct cio_file *cf = ch->backend; + return cio_file_st_get_meta_len(cf->map); + } + + return -1; +} + +int cio_meta_read(struct cio_chunk *ch, char **meta_buf, int *meta_len) +{ + int len; + char *meta; + struct cio_file *cf; + struct cio_memfs *mf; + + /* In-memory type */ + if (ch->st->type == CIO_STORE_MEM) { + mf = (struct cio_memfs *) ch->backend; + + /* no metadata */ + if (!mf->meta_data) { + return -1; + } + + *meta_buf = mf->meta_data; + *meta_len = mf->meta_len; + + return 0; + } + else if (ch->st->type == CIO_STORE_FS) { + if (cio_file_read_prepare(ch->ctx, ch)) { + return -1; + } + + cf = ch->backend; + len = cio_file_st_get_meta_len(cf->map); + if (len <= 0) { + return -1; + } + + meta = cio_file_st_get_meta(cf->map); + *meta_buf = meta; + *meta_len = len; + + return 0; + } + + return -1; + +} + +int cio_meta_cmp(struct cio_chunk *ch, char *meta_buf, int meta_len) +{ + int len; + char *meta; + struct cio_file *cf = ch->backend; + struct cio_memfs *mf; + + /* In-memory type */ + if (ch->st->type == CIO_STORE_MEM) { + mf = (struct cio_memfs *) ch->backend; + + /* no metadata */ + if (!mf->meta_data) { + return -1; + } + + /* different lengths */ + if (mf->meta_len != meta_len) { + return -1; + } + + /* perfect match */ + if (memcmp(mf->meta_data, meta_buf, meta_len) == 0) { + return 0; + } + + return -1; + } + + if (cio_file_read_prepare(ch->ctx, ch)) { + return -1; + } + + /* File system type */ + len = cio_file_st_get_meta_len(cf->map); + if (len != meta_len) { + return -1; + } + + /* compare metadata */ + meta = cio_file_st_get_meta(cf->map); + if (memcmp(meta, meta_buf, meta_len) == 0) { + return 0; + } + + return -1; +} diff --git a/fluent-bit/lib/chunkio/src/cio_os.c b/fluent-bit/lib/chunkio/src/cio_os.c new file mode 100644 index 00000000..0adbc617 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_os.c @@ -0,0 +1,134 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <sys/types.h> +#include <sys/stat.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> + +#include <chunkio/chunkio_compat.h> + +#ifdef _WIN32 +#include <Shlobj.h> +#endif + +/* Check if a path is a directory */ +int cio_os_isdir(const char *dir) +{ + int ret; + struct stat st; + + ret = stat(dir, &st); + if (ret == -1) { + return -1; + } + + if (st.st_mode & S_IFDIR) { + return 0; + } + + return -1; +} + +/* Create directory */ +int cio_os_mkpath(const char *dir, mode_t mode) +{ + struct stat st; + +#ifdef _WIN32 + char path[MAX_PATH]; +#else +# ifdef __APPLE__ + char *parent_dir = NULL; + char *path = NULL; +# endif + char *dup_dir; +#endif + + if (!dir) { + errno = EINVAL; + return 1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return 1; + } + + if (!stat(dir, &st)) { + return 0; + } + +#ifdef _WIN32 + (void) mode; + + if (_fullpath(path, dir, MAX_PATH) == NULL) { + return 1; + } + + if (SHCreateDirectoryExA(NULL, path, NULL) != ERROR_SUCCESS) { + return 1; + } + return 0; +#elif __APPLE__ + dup_dir = strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR (st.st_mode)) { + mkdir(dup_dir, mode); + free(dup_dir); + return 0; + } + } + + /* Create directories straightforward except for the last one hierarchy. */ + for (path = strchr(dup_dir + 1, '/'); path; path = strchr(path + 1, '/')) { + *path = '\0'; + if (mkdir(dup_dir, mode) == -1) { + if (errno != EEXIST) { + *path = '/'; + return -1; + } + } + *path = '/'; + } + + free(dup_dir); + return mkdir(dir, mode); +#else + dup_dir = strdup(dir); + if (!dup_dir) { + return 1; + } + cio_os_mkpath(dirname(dup_dir), mode); + free(dup_dir); + return mkdir(dir, mode); +#endif +} diff --git a/fluent-bit/lib/chunkio/src/cio_scan.c b/fluent-bit/lib/chunkio/src/cio_scan.c new file mode 100644 index 00000000..5c90641d --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_scan.c @@ -0,0 +1,190 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_file.h> +#include <chunkio/cio_memfs.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_error.h> +#include <chunkio/cio_log.h> + +#ifdef _WIN32 +#include "win32/dirent.h" +#endif + +#ifdef CIO_HAVE_BACKEND_FILESYSTEM +static int cio_scan_stream_files(struct cio_ctx *ctx, struct cio_stream *st, + char *chunk_extension) +{ + int len; + int ret; + int err; + int ext_off; + int ext_len = 0; + char *path; + DIR *dir; + struct dirent *ent; + + len = strlen(ctx->options.root_path) + strlen(st->name) + 2; + path = malloc(len); + if (!path) { + cio_errno(); + return -1; + } + + ret = snprintf(path, len, "%s/%s", ctx->options.root_path, st->name); + if (ret == -1) { + cio_errno(); + free(path); + return -1; + } + + dir = opendir(path); + if (!dir) { + cio_errno(); + free(path); + return -1; + } + + if (chunk_extension) { + ext_len = strlen(chunk_extension); + } + + cio_log_debug(ctx, "[cio scan] opening stream %s", st->name); + + /* Iterate the root_path */ + while ((ent = readdir(dir)) != NULL) { + if ((ent->d_name[0] == '.') || (strcmp(ent->d_name, "..") == 0)) { + continue; + } + + /* Look just for directories */ + if (ent->d_type != DT_REG) { + continue; + } + + /* Check the file matches the desired extension (if set) */ + if (chunk_extension) { + len = strlen(ent->d_name); + if (len <= ext_len) { + continue; + } + + ext_off = len - ext_len; + if (strncmp(ent->d_name + ext_off, chunk_extension, ext_len) != 0) { + continue; + } + } + + ctx->last_chunk_error = 0; + + /* register every directory as a stream */ + cio_chunk_open(ctx, st, ent->d_name, ctx->options.flags, 0, &err); + + if (ctx->options.flags & CIO_DELETE_IRRECOVERABLE) { + if (err == CIO_CORRUPTED) { + if (ctx->last_chunk_error == CIO_ERR_BAD_FILE_SIZE || + ctx->last_chunk_error == CIO_ERR_BAD_LAYOUT) + { + cio_log_error(ctx, "[cio scan] discarding irrecoverable chunk"); + + cio_chunk_delete(ctx, st, ent->d_name); + } + } + } + } + + closedir(dir); + free(path); + + return 0; +} + +/* Given a cio context, scan it root_path and populate stream/files */ +int cio_scan_streams(struct cio_ctx *ctx, char *chunk_extension) +{ + DIR *dir; + struct dirent *ent; + struct cio_stream *st; + + dir = opendir(ctx->options.root_path); + if (!dir) { + cio_errno(); + return -1; + } + + cio_log_debug(ctx, "[cio scan] opening path %s", ctx->options.root_path); + + /* Iterate the root_path */ + while ((ent = readdir(dir)) != NULL) { + if ((ent->d_name[0] == '.') || (strcmp(ent->d_name, "..") == 0)) { + continue; + } + + /* Look just for directories */ + if (ent->d_type != DT_DIR) { + continue; + } + + /* register every directory as a stream */ + st = cio_stream_create(ctx, ent->d_name, CIO_STORE_FS); + if (st) { + cio_scan_stream_files(ctx, st, chunk_extension); + } + } + + closedir(dir); + return 0; +} +#else +int cio_scan_streams(struct cio_ctx *ctx) +{ + cio_log_error(ctx, "[cio scan] file system backend not supported"); + return -1; +} +#endif + +void cio_scan_dump(struct cio_ctx *ctx) +{ + struct mk_list *head; + struct cio_stream *st; + + cio_log_info(ctx, "scan dump of %s", ctx->options.root_path); + + /* Iterate streams */ + mk_list_foreach(head, &ctx->streams) { + st = mk_list_entry(head, struct cio_stream, _head); + printf(" stream:%-60s%i chunks\n", + st->name, mk_list_size(&st->chunks)); + + if (st->type == CIO_STORE_MEM) { + cio_memfs_scan_dump(ctx, st); + } + else if (st->type == CIO_STORE_FS) { + cio_file_scan_dump(ctx, st); + } + } +} diff --git a/fluent-bit/lib/chunkio/src/cio_sha1.c b/fluent-bit/lib/chunkio/src/cio_sha1.c new file mode 100644 index 00000000..1748b683 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_sha1.c @@ -0,0 +1,68 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Just a simple wrapper over sha1 routines */ + +#include <stdio.h> +#include <string.h> +#include <chunkio/cio_sha1.h> + +void cio_sha1_init(struct cio_sha1 *ctx) +{ + SHA1_Init(&ctx->sha); +} + +void cio_sha1_update(struct cio_sha1 *ctx, const void *data, unsigned long len) +{ + SHA1_Update(&ctx->sha, data, len); +} + +void cio_sha1_final(unsigned char hash[20], struct cio_sha1 *ctx) +{ + SHA1_Final(hash, &ctx->sha); +} + +void cio_sha1_hash(const void *data_in, unsigned long length, + unsigned char *data_out, void *state) +{ + SHA_CTX sha; + SHA1_Init(&sha); + SHA1_Update(&sha, data_in, length); + + /* + * If state is not NULL, make a copy of the SHA context for future + * iterations and updates. + */ + if (state != NULL) { + memcpy(state, &sha, sizeof(SHA_CTX)); + } + + SHA1_Final(data_out, &sha); +} + +void cio_sha1_to_hex(unsigned char *in, char *out) +{ + int i; + + for (i = 0; i < 20; ++i) { + sprintf(&out[i*2], "%02x", in[i]); + } + + out[40] = '\0'; +} diff --git a/fluent-bit/lib/chunkio/src/cio_stats.c b/fluent-bit/lib/chunkio/src/cio_stats.c new file mode 100644 index 00000000..2135f66f --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_stats.c @@ -0,0 +1,79 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <string.h> + +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_stats.h> + +void cio_stats_get(struct cio_ctx *ctx, struct cio_stats *stats) +{ + struct mk_list *head; + struct mk_list *f_head; + struct cio_chunk *ch; + struct cio_stream *stream; + + memset(stats, 0, sizeof(struct cio_stats)); + + /* Iterate each stream */ + mk_list_foreach(head, &ctx->streams) { + stream = mk_list_entry(head, struct cio_stream, _head); + stats->streams_total++; + + /* Iterate chunks */ + mk_list_foreach(f_head, &stream->chunks) { + stats->chunks_total++; + + if (stream->type == CIO_STORE_MEM) { + stats->chunks_mem++; + continue; + } + + /* Only applicable for 'file' type chunks */ + ch = mk_list_entry(f_head, struct cio_chunk, _head); + stats->chunks_fs++; + + if (cio_chunk_is_up(ch) == CIO_TRUE) { + stats->chunks_fs_up++; + } + else { + stats->chunks_fs_down++; + } + } + } +} + +void cio_stats_print_summary(struct cio_ctx *ctx) +{ + struct cio_stats st; + + /* retrieve stats */ + cio_stats_get(ctx, &st); + + printf("======== Chunk I/O Stats ========\n"); + printf("- streams total : %i\n", st.streams_total); + printf("- chunks total : %i\n", st.chunks_total); + printf("- chunks memfs total: %i\n", st.chunks_mem); + printf("- chunks file total : %i\n", st.chunks_fs); + printf(" - files up : %i\n", st.chunks_fs_up); + printf(" - files down : %i\n", st.chunks_fs_down); +} diff --git a/fluent-bit/lib/chunkio/src/cio_stream.c b/fluent-bit/lib/chunkio/src/cio_stream.c new file mode 100644 index 00000000..547d7eeb --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_stream.c @@ -0,0 +1,276 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <chunkio/chunkio_compat.h> + +#include <chunkio/chunkio.h> +#include <chunkio/cio_os.h> +#include <chunkio/cio_log.h> +#include <chunkio/cio_chunk.h> +#include <chunkio/cio_stream.h> +#include <chunkio/cio_utils.h> + +#include <monkey/mk_core/mk_list.h> + +static char *get_stream_path(struct cio_ctx *ctx, struct cio_stream *st) +{ + int ret; + int len; + char *p; + + /* Compose final path */ + len = strlen(ctx->options.root_path) + strlen(st->name) + 2; + p = malloc(len + 1); + if (!p) { + cio_errno(); + return NULL; + } + + ret = snprintf(p, len, "%s/%s", ctx->options.root_path, st->name); + if (ret == -1) { + cio_errno(); + free(p); + return NULL; + } + + return p; +} + +static int check_stream_path(struct cio_ctx *ctx, const char *path) +{ + int ret; + int len; + char *p; + + /* Compose final path */ + len = strlen(ctx->options.root_path) + strlen(path) + 2; + p = malloc(len + 1); + if (!p) { + cio_errno(); + return -1; + } + ret = snprintf(p, len, "%s/%s", ctx->options.root_path, path); + if (ret == -1) { + cio_errno(); + free(p); + return -1; + } + + ret = cio_os_isdir(p); + if (ret == -1) { + /* Try to create the path */ + ret = cio_os_mkpath(p, 0755); + if (ret == -1) { + cio_log_error(ctx, "cannot create stream path %s", p); + free(p); + return -1; + } + cio_log_debug(ctx, "created stream path %s", p); + free(p); + return 0; + } + + /* Check write access and release*/ + ret = access(p, W_OK); + free(p); + return ret; +} + +struct cio_stream *cio_stream_get(struct cio_ctx *ctx, const char *name) +{ + struct mk_list *head; + struct cio_stream *st; + + mk_list_foreach(head, &ctx->streams) { + st = mk_list_entry(head, struct cio_stream, _head); + if (strcmp(st->name, name) == 0) { + return st; + } + } + + return NULL; +} + +struct cio_stream *cio_stream_create(struct cio_ctx *ctx, const char *name, + int type) +{ + int ret; + int len; + struct cio_stream *st; + + if (!name) { + cio_log_error(ctx, "[stream create] stream name not set"); + return NULL; + } + + len = strlen(name); + if (len == 0) { + cio_log_error(ctx, "[stream create] invalid stream name"); + return NULL; + } + + if (len == 1 && (name[0] == '.' || name[0] == '/')) { + cio_log_error(ctx, "[stream create] invalid stream name"); + return NULL; + } +#ifndef CIO_HAVE_BACKEND_FILESYSTEM + if (type == CIO_STORE_FS) { + cio_log_error(ctx, "[stream create] file system backend not supported"); + return NULL; + } +#endif + + /* Find duplicated */ + st = cio_stream_get(ctx, name); + if (st) { + cio_log_error(ctx, "[cio stream] stream already registered: %s", name); + return NULL; + } + + /* If backend is the file system, validate the stream path */ + if (type == CIO_STORE_FS) { + ret = check_stream_path(ctx, name); + if (ret == -1) { + return NULL; + } + } + + st = malloc(sizeof(struct cio_stream)); + if (!st) { + cio_errno(); + return NULL; + } + st->type = type; + st->name = strdup(name); + if (!st->name) { + cio_errno(); + free(st); + return NULL; + } + + st->parent = ctx; + mk_list_init(&st->chunks); + mk_list_init(&st->chunks_up); + mk_list_init(&st->chunks_down); + mk_list_add(&st->_head, &ctx->streams); + + cio_log_debug(ctx, "[cio stream] new stream registered: %s", name); + return st; +} + +void cio_stream_destroy(struct cio_stream *st) +{ + if (!st) { + return; + } + /* close all files */ + cio_chunk_close_stream(st); + + /* destroy stream */ + mk_list_del(&st->_head); + free(st->name); + free(st); +} + +/* Deletes a complete stream, this include all chunks available */ +int cio_stream_delete(struct cio_stream *st) +{ + int ret; + char *path; + struct mk_list *tmp; + struct mk_list *head; + struct cio_chunk *ch; + struct cio_ctx *ctx; + + ctx = st->parent; + + /* delete all chunks */ + mk_list_foreach_safe(head, tmp, &st->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + cio_chunk_close(ch, CIO_TRUE); + } + +#ifdef CIO_HAVE_BACKEND_FILESYSTEM + /* If the stream is filesystem based, destroy the real directory */ + if (st->type == CIO_STORE_FS) { + path = get_stream_path(ctx, st); + if (!path) { + cio_log_error(ctx, + "content from stream '%s' has been deleted, but the " + "directory might still exists.", path); + return -1; + } + + cio_log_debug(ctx, "[cio stream] delete stream path: %s", path); + + /* Recursive deletion */ + ret = cio_utils_recursive_delete(path); + if (ret == -1) { + cio_log_error(ctx, "error in recursive deletion of path %s", path); + free(path); + return -1; + } + free(path); + + return ret; + } +#endif + + return 0; +} + +void cio_stream_destroy_all(struct cio_ctx *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct cio_stream *st; + + if (!ctx) { + return; + } + + mk_list_foreach_safe(head, tmp, &ctx->streams) { + st = mk_list_entry(head, struct cio_stream, _head); + cio_stream_destroy(st); + } +} + +/* Return the total number of bytes being used by Chunks up in memory */ +size_t cio_stream_size_chunks_up(struct cio_stream *st) +{ + ssize_t bytes; + size_t total = 0; + struct cio_chunk *ch; + struct mk_list *head; + + mk_list_foreach(head, &st->chunks_up) { + ch = mk_list_entry(head, struct cio_chunk, _state_head); + + bytes = cio_chunk_get_content_size(ch); + if (bytes <= 0) { + continue; + } + total += bytes; + } + + return total; +} diff --git a/fluent-bit/lib/chunkio/src/cio_utils.c b/fluent-bit/lib/chunkio/src/cio_utils.c new file mode 100644 index 00000000..45cb0ae8 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/cio_utils.c @@ -0,0 +1,258 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <errno.h> +#ifndef _MSC_VER +#include <fts.h> +#endif + +#include <chunkio/cio_info.h> +#include <chunkio/chunkio_compat.h> +#include <chunkio/chunkio.h> +#include <chunkio/cio_log.h> + +#ifndef _MSC_VER +/* + * Taken from StackOverflow: + * + * https://stackoverflow.com/questions/2256945/removing-a-non-empty-directory-programmatically-in-c-or-c + */ +int cio_utils_recursive_delete(const char *dir) +{ + int ret = 0; + FTS *ftsp = NULL; + FTSENT *curr; + char *files[] = { (char *) dir, NULL }; + struct stat st; + + ret = stat(dir, &st); + if (ret == -1) { + return -1; + } + + ftsp = fts_open(files, FTS_NOCHDIR | FTS_PHYSICAL | FTS_XDEV, NULL); + if (!ftsp) { + fprintf(stderr, "%s: fts_open failed: %s\n", dir, strerror(errno)); + ret = -1; + goto finish; + } + + while ((curr = fts_read(ftsp))) { + switch (curr->fts_info) { + case FTS_NS: + case FTS_DNR: + case FTS_ERR: + fprintf(stderr, "%s: fts_read error: %s\n", + curr->fts_accpath, strerror(curr->fts_errno)); + break; + case FTS_DC: + case FTS_DOT: + case FTS_NSOK: + break; + case FTS_D: + break; + case FTS_DP: + case FTS_F: + case FTS_SL: + case FTS_SLNONE: + case FTS_DEFAULT: + if (remove(curr->fts_accpath) < 0) { + fprintf(stderr, "%s: Failed to remove: %s\n", + curr->fts_path, strerror(errno)); + ret = -1; + } + break; + } + } + + finish: + if (ftsp) { + fts_close(ftsp); + } + + return ret; +} +#else +static int cio_utils_recursive_delete_handler(const char *path, + size_t current_depth, + size_t depth_limit) +{ + char search_path[MAX_PATH]; + char entry_path[MAX_PATH]; + DWORD target_file_flags; + HANDLE find_file_handle; + WIN32_FIND_DATAA find_file_data; + int error_detected; + DWORD result; + + result = snprintf(search_path, sizeof(search_path) - 1, "%s\\*", path); + + if (result <= 0) { + return CIO_ERROR; + } + + find_file_handle = FindFirstFileA(search_path, &find_file_data); + + if (find_file_handle == INVALID_HANDLE_VALUE) { + return CIO_ERROR; + } + + target_file_flags = FILE_ATTRIBUTE_NORMAL | FILE_ATTRIBUTE_ARCHIVE; + error_detected = CIO_FALSE; + result = 0; + + do { + if (strcmp(find_file_data.cFileName, ".") != 0 && + strcmp(find_file_data.cFileName, "..") != 0) { + + result = snprintf(entry_path, sizeof(entry_path) - 1, "%s\\%s", path, + find_file_data.cFileName); + + if (result > 0) { + if (find_file_data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + if (current_depth < depth_limit) { + result = (DWORD) cio_utils_recursive_delete_handler(entry_path, + current_depth + 1, + depth_limit); + + if (result != CIO_OK) { + error_detected = CIO_TRUE; + } + } + else { + error_detected = CIO_TRUE; + } + } + else if (find_file_data.dwFileAttributes & target_file_flags) { + result = DeleteFileA(entry_path); + + if (result == 0) { + error_detected = CIO_TRUE; + } + } + + } + else { + error_detected = CIO_TRUE; + } + } + + if (error_detected == CIO_FALSE) { + result = FindNextFile(find_file_handle, &find_file_data); + + if (result == 0) { + result = GetLastError(); + + if (result != ERROR_NO_MORE_FILES) { + error_detected = CIO_TRUE; + } + + break; + } + } + } + while (error_detected == CIO_FALSE); + + FindClose(find_file_handle); + + if (error_detected) { + return CIO_ERROR; + } + + result = RemoveDirectoryA(path); + + if (result == 0) { + return CIO_ERROR; + } + + return CIO_OK; +} + +int cio_utils_recursive_delete(const char *dir) +{ + DWORD result; + + result = cio_utils_recursive_delete_handler(dir, 0, 100); + + if (result != CIO_OK) { + return -1; + } + + return 0; +} +#endif + +int cio_utils_read_file(const char *path, char **buf, size_t *size) +{ + int ret; + char *data; + FILE *fp; + struct stat st; + + fp = fopen(path, "rb"); + if (fp == NULL) { + perror("fopen"); + return -1; + } + + ret = fstat(fileno(fp), &st); + if (ret == -1) { + fclose(fp); + perror("fstat"); + return -1; + } + if (!S_ISREG(st.st_mode)) { + fclose(fp); + return -1; + } + + data = calloc(st.st_size, 1); + if (!data) { + perror("calloc"); + fclose(fp); + return -1; + } + + ret = fread(data, st.st_size, 1, fp); + if (ret != 1) { + free(data); + fclose(fp); + return -1; + } + fclose(fp); + + *buf = data; + *size = st.st_size; + + return 0; +} + +#ifdef CIO_HAVE_GETPAGESIZE +int cio_getpagesize() +{ + return getpagesize(); +} +#endif diff --git a/fluent-bit/lib/chunkio/src/win32/dirent.c b/fluent-bit/lib/chunkio/src/win32/dirent.c new file mode 100644 index 00000000..6ea57f96 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/win32/dirent.c @@ -0,0 +1,135 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This module implements <dirent.h> emulation layer based on + * Win32's FIndFirstFile/FindNextFile API. + */ + +#include <Windows.h> +#include <shlwapi.h> + +#include "dirent.h" + +struct CIO_WIN32_DIR { + HANDLE h; + char *pattern; + int count; + WIN32_FIND_DATA find_data; + struct cio_win32_dirent dir; +}; + +/* + * Guess POSIX flle type from Win32 file attributes. + */ +static unsigned char get_filetype(int dwFileAttributes) +{ + if (dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + return DT_DIR; + } + else if (dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { + return DT_LNK; + } + + return DT_REG; +} + +/* + * Construct a match pattern (e.g. 'c:\var\data\*') + */ +static char *create_pattern(const char *path) +{ + char *buf; + size_t len = strlen(path); + size_t buflen = len + 3; + + buf = malloc(buflen); + if (buf == NULL) { + return NULL; + } + + strcpy_s(buf, buflen, path); + + if (path[len - 1] == '\\') { + strcat_s(buf, buflen, "*"); + } + else { + strcat_s(buf, buflen, "\\*"); + } + return buf; +} + +struct CIO_WIN32_DIR *cio_win32_opendir(const char *path) +{ + struct CIO_WIN32_DIR *d; + + if (!PathIsDirectoryA(path)) { + return NULL; + } + + d = calloc(1, sizeof(struct CIO_WIN32_DIR)); + if (d == NULL) { + return NULL; + } + + d->pattern = create_pattern(path); + if (d->pattern == NULL) { + free(d); + return NULL; + } + + d->h = FindFirstFileA(d->pattern, &d->find_data); + if (d->h == INVALID_HANDLE_VALUE) { + return d; + } + return d; +} + +struct cio_win32_dirent *cio_win32_readdir(struct CIO_WIN32_DIR *d) +{ + if (d->h == INVALID_HANDLE_VALUE) { + return NULL; + } + + /* + * The initial entry should be retrieved by FindFirstFile(), + * so we can skip FindNextFile() on the first call. + */ + if (d->count > 0) { + if (FindNextFile(d->h, &d->find_data) == 0) { + return NULL; + } + } + + d->count++; + d->dir.d_name = d->find_data.cFileName; + d->dir.d_type = get_filetype(d->find_data.dwFileAttributes); + + return &d->dir; +} + +int cio_win32_closedir(struct CIO_WIN32_DIR *d) +{ + if (d->h != INVALID_HANDLE_VALUE) { + FindClose(d->h); + } + free(d->pattern); + free(d); + return 0; +} diff --git a/fluent-bit/lib/chunkio/src/win32/dirent.h b/fluent-bit/lib/chunkio/src/win32/dirent.h new file mode 100644 index 00000000..b21148a6 --- /dev/null +++ b/fluent-bit/lib/chunkio/src/win32/dirent.h @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Chunk I/O + * ========= + * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * POSIX <dirent.h> emulation for Windows. + * + * This header file provies a drop-in replacement of opendir(), + * readdir() and closedir() for Windows platform. + */ + +#ifndef CIO_WIN32_DIRENT +#define CIO_WIN32_DIRENT + +struct CIO_WIN32_DIR; + +struct cio_win32_dirent { + int d_ino; + int d_off; + unsigned short d_reclen; + unsigned char d_type; + char *d_name; +}; + +struct CIO_WIN32_DIR *cio_win32_opendir(const char *path); +struct cio_win32_dirent *cio_win32_readdir(struct CIO_WIN32_DIR *d); +int cio_win32_closedir(struct CIO_WIN32_DIR *d); + +#define DIR struct CIO_WIN32_DIR +#define dirent cio_win32_dirent +#define closedir cio_win32_closedir +#define opendir cio_win32_opendir +#define readdir cio_win32_readdir + +#define DT_UNKNOWN -1 +#define DT_BLK 1 +#define DT_CHR 2 +#define DT_DIR 3 +#define DT_FIFO 4 +#define DT_LNK 5 +#define DT_REG 6 +#define DT_SOCK 7 + +#endif |