summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/chunkio/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/chunkio/src
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/chunkio/src')
-rw-r--r--fluent-bit/lib/chunkio/src/CMakeLists.txt53
-rw-r--r--fluent-bit/lib/chunkio/src/chunkio.c369
-rw-r--r--fluent-bit/lib/chunkio/src/cio_chunk.c642
-rw-r--r--fluent-bit/lib/chunkio/src/cio_error.c59
-rw-r--r--fluent-bit/lib/chunkio/src/cio_file.c1344
-rw-r--r--fluent-bit/lib/chunkio/src/cio_file_unix.c570
-rw-r--r--fluent-bit/lib/chunkio/src/cio_file_win32.c549
-rw-r--r--fluent-bit/lib/chunkio/src/cio_log.c87
-rw-r--r--fluent-bit/lib/chunkio/src/cio_memfs.c156
-rw-r--r--fluent-bit/lib/chunkio/src/cio_meta.c180
-rw-r--r--fluent-bit/lib/chunkio/src/cio_os.c134
-rw-r--r--fluent-bit/lib/chunkio/src/cio_scan.c190
-rw-r--r--fluent-bit/lib/chunkio/src/cio_sha1.c68
-rw-r--r--fluent-bit/lib/chunkio/src/cio_stats.c79
-rw-r--r--fluent-bit/lib/chunkio/src/cio_stream.c276
-rw-r--r--fluent-bit/lib/chunkio/src/cio_utils.c258
-rw-r--r--fluent-bit/lib/chunkio/src/win32/dirent.c135
-rw-r--r--fluent-bit/lib/chunkio/src/win32/dirent.h59
18 files changed, 5208 insertions, 0 deletions
diff --git a/fluent-bit/lib/chunkio/src/CMakeLists.txt b/fluent-bit/lib/chunkio/src/CMakeLists.txt
new file mode 100644
index 00000000..9e666381
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/CMakeLists.txt
@@ -0,0 +1,53 @@
+set(src
+ cio_os.c
+ cio_log.c
+ cio_file.c
+ cio_memfs.c
+ cio_chunk.c
+ cio_meta.c
+ cio_scan.c
+ cio_utils.c
+ cio_stream.c
+ cio_stats.c
+ cio_error.c
+ chunkio.c
+ )
+
+set(libs cio-crc32)
+
+if(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+ set(src
+ ${src}
+ cio_file_win32.c
+ win32/dirent.c
+ )
+ set(libs
+ ${libs}
+ Shell32.lib
+ Shlwapi.lib)
+else()
+ set(src
+ ${src}
+ cio_file_unix.c
+ )
+endif()
+
+if(CIO_LIB_STATIC)
+ add_library(chunkio-static STATIC ${src})
+ target_link_libraries(chunkio-static ${libs})
+ if(CIO_SANITIZE_ADDRESS)
+ add_sanitizers(chunkio-static)
+ endif()
+endif()
+
+if (CIO_LIB_SHARED)
+ add_library(chunkio-shared SHARED ${src})
+ target_link_libraries(chunkio-static ${libs})
+ if(CIO_SANITIZE_ADDRESS)
+ add_sanitizers(chunkio-shared)
+ endif()
+endif()
+
+if (NOT CIO_LIB_STATIC AND NOT CIO_LIB_SHARED)
+ message(FATAL_ERROR "What are you doing?, you should build something")
+endif()
diff --git a/fluent-bit/lib/chunkio/src/chunkio.c b/fluent-bit/lib/chunkio/src/chunkio.c
new file mode 100644
index 00000000..a69325cf
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/chunkio.c
@@ -0,0 +1,369 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <chunkio/chunkio.h>
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/cio_os.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_scan.h>
+#include <chunkio/cio_utils.h>
+
+#include <monkey/mk_core/mk_list.h>
+
+/*
+ * Validate if root_path exists, if don't, create it, otherwise
+ * check if we have write access to it.
+ */
+static int check_root_path(struct cio_ctx *ctx, const char *root_path)
+{
+ int ret;
+ int len;
+
+ if (!root_path) {
+ return -1;
+ }
+
+ len = strlen(root_path);
+ if (len <= 0) {
+ return -1;
+ }
+
+ ret = cio_os_isdir(root_path);
+ if (ret == -1) {
+ /* Try to create the path */
+ ret = cio_os_mkpath(root_path, 0755);
+ if (ret == -1) {
+ return -1;
+ }
+ cio_log_info(ctx, "created root path %s", root_path);
+ return 0;
+ }
+
+ /* Directory already exists, check write access */
+ return access(root_path, W_OK);
+}
+
+void cio_options_init(struct cio_options *options)
+{
+ memset(options, 0, sizeof(struct cio_options));
+
+ options->initialized = CIO_INITIALIZED;
+
+ options->root_path = NULL;
+ options->user = NULL;
+ options->group = NULL;
+ options->chmod = NULL;
+ options->log_cb = NULL;
+ options->log_level = CIO_LOG_INFO;
+ options->flags = CIO_OPEN_RW;
+ options->realloc_size_hint = CIO_DISABLE_REALLOC_HINT;
+}
+
+struct cio_ctx *cio_create(struct cio_options *options)
+{
+ int ret;
+ struct cio_ctx *ctx;
+ struct cio_options default_options;
+
+ if (options == NULL) {
+ cio_options_init(&default_options);
+ options = &default_options;
+ }
+ else {
+ if (options->initialized != CIO_INITIALIZED) {
+ /* the caller 'must' call cio_options_init() or pass NULL before creating a context */
+ fprintf(stderr, "[cio] 'options' has not been initialized properly\n");
+ return NULL;
+ }
+ }
+
+ /* sanitize chunk open flags */
+ if (!(options->flags & CIO_OPEN_RW) && !(options->flags & CIO_OPEN_RD)) {
+ options->flags |= CIO_OPEN_RW;
+ }
+
+ if (options->log_level < CIO_LOG_ERROR ||
+ options->log_level > CIO_LOG_TRACE) {
+ fprintf(stderr, "[cio] invalid log level, aborting\n");
+ return NULL;
+ }
+#ifndef CIO_HAVE_BACKEND_FILESYSTEM
+ if (root_path) {
+ fprintf(stderr, "[cio] file system backend not supported\n");
+ return NULL;
+ }
+#endif
+
+ /* Create context */
+ ctx = calloc(1, sizeof(struct cio_ctx));
+ if (!ctx) {
+ perror("calloc");
+ return NULL;
+ }
+ mk_list_init(&ctx->streams);
+ ctx->page_size = cio_getpagesize();
+ ctx->max_chunks_up = CIO_MAX_CHUNKS_UP;
+ ctx->options.flags = options->flags;
+ ctx->realloc_size_hint = CIO_DISABLE_REALLOC_HINT;
+
+ if (options->user != NULL) {
+ ctx->options.user = strdup(options->user);
+ }
+
+ if (options->group != NULL) {
+ ctx->options.group = strdup(options->group);
+ }
+
+ if (options->chmod != NULL) {
+ ctx->options.chmod = strdup(options->chmod);
+ }
+
+ /* Counters */
+ ctx->total_chunks = 0;
+ ctx->total_chunks_up = 0;
+
+ /* Logging */
+ cio_set_log_callback(ctx, options->log_cb);
+ cio_set_log_level(ctx, options->log_level);
+
+ /* Check or initialize file system root path */
+ if (options->root_path) {
+ ret = check_root_path(ctx, options->root_path);
+ if (ret == -1) {
+ cio_log_error(ctx,
+ "[chunkio] cannot initialize root path %s\n",
+ options->root_path);
+ free(ctx);
+ return NULL;
+ }
+
+ ctx->options.root_path = strdup(options->root_path);
+ }
+ else {
+ ctx->options.root_path = NULL;
+ }
+
+ if (ctx->options.user != NULL) {
+ ret = cio_file_lookup_user(ctx->options.user, &ctx->processed_user);
+
+ if (ret != CIO_OK) {
+ cio_destroy(ctx);
+
+ return NULL;
+ }
+ }
+ else {
+ ctx->processed_user = NULL;
+ }
+
+ if (ctx->options.group != NULL) {
+ ret = cio_file_lookup_group(ctx->options.group, &ctx->processed_group);
+
+ if (ret != CIO_OK) {
+ cio_destroy(ctx);
+
+ return NULL;
+ }
+ }
+ else {
+ ctx->processed_group = NULL;
+ }
+
+ if (options->realloc_size_hint > 0) {
+ ret = cio_set_realloc_size_hint(ctx, options->realloc_size_hint);
+ if (ret == -1) {
+ cio_log_error(ctx,
+ "[chunkio] cannot initialize with realloc size hint %d\n",
+ options->realloc_size_hint);
+ cio_destroy(ctx);
+
+ return NULL;
+ }
+ }
+
+ return ctx;
+}
+
+int cio_load(struct cio_ctx *ctx, char *chunk_extension)
+{
+ int ret;
+
+ if (ctx->options.root_path) {
+ ret = cio_scan_streams(ctx, chunk_extension);
+ return ret;
+ }
+
+ return 0;
+}
+
+static int qsort_stream(struct cio_stream *stream,
+ int (*compar)(const void *, const void *))
+{
+ int i = 0;
+ int items;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct cio_chunk **arr;
+ struct cio_chunk *chunk;
+
+ items = mk_list_size(&stream->chunks);
+ if (items == 0) {
+ return 0;
+ }
+
+ arr = malloc(sizeof(struct cio_chunk *) * items);
+ if (!arr) {
+ perror("malloc");
+ return -1;
+ }
+
+ /* map chunks to the array and and unlink them */
+ mk_list_foreach_safe(head, tmp, &stream->chunks) {
+ chunk = mk_list_entry(head, struct cio_chunk, _head);
+ arr[i++] = chunk;
+ mk_list_del(&chunk->_head);
+ }
+
+ /* sort the chunks, just trust in 'compar' external function */
+ qsort(arr, items, sizeof(struct cio_chunk *), compar);
+
+ /* link the chunks in the proper order back to the list head */
+ for (i = 0; i < items; i++) {
+ chunk = arr[i];
+ mk_list_add(&chunk->_head, &stream->chunks);
+ }
+
+ free(arr);
+ return 0;
+}
+
+/*
+ * Sort chunks using the 'compar' callback function. This is pretty much a
+ * wrapper over qsort(3). The sort is done inside every stream content.
+ *
+ * Use this function after cio_load() only.
+ */
+int cio_qsort(struct cio_ctx *ctx, int (*compar)(const void *, const void *))
+{
+ struct mk_list *head;
+ struct cio_stream *stream;
+
+ mk_list_foreach(head, &ctx->streams) {
+ stream = mk_list_entry(head, struct cio_stream, _head);
+ qsort_stream(stream, compar);
+ }
+
+ return 0;
+}
+
+void cio_destroy(struct cio_ctx *ctx)
+{
+ if (!ctx) {
+ return;
+ }
+
+ cio_stream_destroy_all(ctx);
+
+ if (ctx->options.user != NULL) {
+ free(ctx->options.user);
+ }
+
+ if (ctx->options.group != NULL) {
+ free(ctx->options.group);
+ }
+
+ if (ctx->options.chmod != NULL) {
+ free(ctx->options.chmod);
+ }
+
+ if (ctx->processed_user != NULL) {
+ free(ctx->processed_user);
+ }
+
+ if (ctx->processed_group != NULL) {
+ free(ctx->processed_group);
+ }
+
+ if (ctx->options.root_path != NULL) {
+ free(ctx->options.root_path);
+ }
+
+ free(ctx);
+}
+
+void cio_set_log_callback(struct cio_ctx *ctx, void (*log_cb))
+{
+ ctx->options.log_cb = log_cb;
+}
+
+int cio_set_log_level(struct cio_ctx *ctx, int level)
+{
+ if (level < CIO_LOG_ERROR || level > CIO_LOG_TRACE) {
+ return -1;
+ }
+
+ ctx->options.log_level = level;
+ return 0;
+}
+
+int cio_set_max_chunks_up(struct cio_ctx *ctx, int n)
+{
+ if (n < 1) {
+ return -1;
+ }
+
+ ctx->max_chunks_up = n;
+ return 0;
+}
+
+int cio_set_realloc_size_hint(struct cio_ctx *ctx, size_t realloc_size_hint)
+{
+ if (realloc_size_hint < CIO_REALLOC_HINT_MIN) {
+ cio_log_error(ctx,
+ "[chunkio] cannot specify less than %zu bytes\n",
+ CIO_REALLOC_HINT_MIN);
+ return -1;
+ }
+ else if (realloc_size_hint > CIO_REALLOC_HINT_MAX) {
+ cio_log_error(ctx,
+ "[chunkio] cannot specify more than %zu bytes\n",
+ CIO_REALLOC_HINT_MAX);
+ return -1;
+ }
+
+ ctx->realloc_size_hint = realloc_size_hint;
+
+ return 0;
+}
+
+void cio_enable_file_trimming(struct cio_ctx *ctx)
+{
+ ctx->options.flags |= CIO_TRIM_FILES;
+}
+
+void cio_disable_file_trimming(struct cio_ctx *ctx)
+{
+ ctx->options.flags &= ~CIO_TRIM_FILES;
+} \ No newline at end of file
diff --git a/fluent-bit/lib/chunkio/src/cio_chunk.c b/fluent-bit/lib/chunkio/src/cio_chunk.c
new file mode 100644
index 00000000..7917cd2a
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_chunk.c
@@ -0,0 +1,642 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_version.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_memfs.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_error.h>
+
+#include <string.h>
+
+struct cio_chunk *cio_chunk_open(struct cio_ctx *ctx, struct cio_stream *st,
+ const char *name, int flags, size_t size,
+ int *err)
+{
+ int len;
+ void *backend = NULL;
+ struct cio_chunk *ch;
+
+ if (!st) {
+ cio_log_error(ctx, "[cio chunk] invalid stream");
+ return NULL;
+ }
+
+ if (!name) {
+ cio_log_error(ctx, "[cio chunk] invalid file name");
+ return NULL;
+ }
+
+ len = strlen(name);
+ if (len == 0) {
+ cio_log_error(ctx, "[cio chunk] invalid file name");
+ return NULL;
+ }
+#ifndef CIO_HAVE_BACKEND_FILESYSTEM
+ if (st->type == CIO_STORE_FS) {
+ cio_log_error(ctx, "[cio chunk] file system backend not supported");
+ return NULL;
+ }
+#endif
+
+ /* allocate chunk context */
+ ch = malloc(sizeof(struct cio_chunk));
+ if (!ch) {
+ cio_errno();
+ return NULL;
+ }
+ ch->name = strdup(name);
+ ch->ctx = ctx;
+ ch->st = st;
+ ch->lock = CIO_FALSE;
+ ch->tx_active = CIO_FALSE;
+ ch->tx_crc = 0;
+ ch->tx_content_length = 0;
+ ch->backend = NULL;
+
+ mk_list_add(&ch->_head, &st->chunks);
+
+ cio_error_reset(ch);
+
+ /* create backend context */
+ if (st->type == CIO_STORE_FS) {
+ backend = cio_file_open(ctx, st, ch, flags, size, err);
+ }
+ else if (st->type == CIO_STORE_MEM) {
+ *err = CIO_OK;
+ backend = cio_memfs_open(ctx, st, ch, flags, size);
+ }
+
+ if (!backend) {
+ mk_list_del(&ch->_head);
+ free(ch->name);
+ free(ch);
+ return NULL;
+ }
+
+ ch->backend = backend;
+
+ /* Adjust counter */
+ cio_chunk_counter_total_add(ctx);
+
+ /* Link the chunk state to the proper stream list */
+ if (cio_chunk_is_up(ch) == CIO_TRUE) {
+ mk_list_add(&ch->_state_head, &st->chunks_up);
+ }
+ else {
+ mk_list_add(&ch->_state_head, &st->chunks_down);
+ }
+
+ return ch;
+}
+
+void cio_chunk_close(struct cio_chunk *ch, int delete)
+{
+ int type;
+ struct cio_ctx *ctx;
+
+ if (!ch) {
+ return;
+ }
+
+ cio_error_reset(ch);
+
+ ctx = ch->ctx;
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ cio_memfs_close(ch);
+ }
+ else if (type == CIO_STORE_FS) {
+ cio_file_close(ch, delete);
+ }
+
+ mk_list_del(&ch->_head);
+ mk_list_del(&ch->_state_head);
+ free(ch->name);
+ free(ch);
+
+ /* Adjust counter */
+ cio_chunk_counter_total_sub(ctx);
+}
+
+int cio_chunk_delete(struct cio_ctx *ctx, struct cio_stream *st, const char *name)
+{
+ int result;
+
+ if (st == NULL) {
+ cio_log_error(ctx, "[cio chunk] invalid stream");
+
+ return CIO_ERROR;
+ }
+
+ if (name == NULL) {
+ cio_log_error(ctx, "[cio chunk] invalid file name");
+
+ return CIO_ERROR;
+ }
+
+ if (strlen(name) == 0) {
+ cio_log_error(ctx, "[cio chunk] invalid file name");
+
+ return CIO_ERROR;
+ }
+
+#ifndef CIO_HAVE_BACKEND_FILESYSTEM
+ if (st->type == CIO_STORE_FS) {
+ cio_log_error(ctx, "[cio chunk] file system backend not supported");
+
+ return CIO_ERROR;
+ }
+#endif
+
+ if (st->type == CIO_STORE_FS) {
+ result = cio_file_delete(ctx, st, name);
+ }
+ else {
+ result = CIO_ERROR;
+ }
+
+ return result;
+}
+
+/*
+ * Write at a specific offset of the content area. Offset must be >= 0 and
+ * less than current data length.
+ */
+int cio_chunk_write_at(struct cio_chunk *ch, off_t offset,
+ const void *buf, size_t count)
+{
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ mf->buf_len = offset;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ cf->data_size = offset;
+ cf->crc_reset = CIO_TRUE;
+ }
+
+ /*
+ * By default backends (fs, mem) appends data after the it last position,
+ * so we just adjust the content size to the given offset.
+ */
+ return cio_chunk_write(ch, buf, count);
+}
+
+int cio_chunk_write(struct cio_chunk *ch, const void *buf, size_t count)
+{
+ int ret = 0;
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ ret = cio_memfs_write(ch, buf, count);
+ }
+ else if (type == CIO_STORE_FS) {
+ ret = cio_file_write(ch, buf, count);
+ }
+
+ return ret;
+}
+
+int cio_chunk_sync(struct cio_chunk *ch)
+{
+ int ret = 0;
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_FS) {
+ ret = cio_file_sync(ch);
+ }
+
+ return ret;
+}
+
+int cio_chunk_get_content(struct cio_chunk *ch, char **buf, size_t *size)
+{
+ int ret = 0;
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ *size = mf->buf_len;
+ *buf = mf->buf_data;
+ return ret;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ ret = cio_file_read_prepare(ch->ctx, ch);
+ if (ret != CIO_OK) {
+ return ret;
+ }
+ *size = cf->data_size;
+ *buf = cio_file_st_get_content(cf->map);
+ return ret;
+ }
+
+ return CIO_ERROR;
+}
+
+/* Using the content of the chunk, generate a copy using the heap */
+int cio_chunk_get_content_copy(struct cio_chunk *ch,
+ void **out_buf, size_t *out_size)
+{
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ return cio_memfs_content_copy(ch, out_buf, out_size);
+ }
+ else if (type == CIO_STORE_FS) {
+ return cio_file_content_copy(ch, out_buf, out_size);
+ }
+
+ return CIO_ERROR;
+}
+
+size_t cio_chunk_get_content_end_pos(struct cio_chunk *ch)
+{
+ int type;
+ off_t pos = 0;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ pos = (off_t) (mf->buf_data + mf->buf_len);
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ pos = (off_t) (cio_file_st_get_content(cf->map) + cf->data_size);
+ }
+
+ return pos;
+}
+
+ssize_t cio_chunk_get_content_size(struct cio_chunk *ch)
+{
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ return mf->buf_len;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ return cf->data_size;
+ }
+
+ return -1;
+}
+
+ssize_t cio_chunk_get_real_size(struct cio_chunk *ch)
+{
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ return mf->buf_len;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+
+ /* If the file is not open we need to explicitly get its size */
+ if (cf->fs_size == 0) {
+ return cio_file_real_size(cf);
+ }
+
+ return cf->fs_size;
+ }
+
+ return -1;
+}
+
+void cio_chunk_close_stream(struct cio_stream *st)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct cio_chunk *ch;
+
+ mk_list_foreach_safe(head, tmp, &st->chunks) {
+ ch = mk_list_entry(head, struct cio_chunk, _head);
+ cio_chunk_close(ch, CIO_FALSE);
+ }
+}
+
+char *cio_chunk_hash(struct cio_chunk *ch)
+{
+ if (ch->st->type == CIO_STORE_FS) {
+ return cio_file_hash(ch->backend);
+ }
+
+ return NULL;
+}
+
+int cio_chunk_lock(struct cio_chunk *ch)
+{
+ cio_error_reset(ch);
+
+ if (ch->lock == CIO_TRUE) {
+ return CIO_ERROR;
+ }
+
+ ch->lock = CIO_TRUE;
+
+ if (cio_chunk_is_up(ch) == CIO_TRUE) {
+ return cio_chunk_sync(ch);
+ }
+
+ return CIO_OK;
+}
+
+int cio_chunk_unlock(struct cio_chunk *ch)
+{
+ cio_error_reset(ch);
+
+ if (ch->lock == CIO_FALSE) {
+ return CIO_ERROR;
+ }
+
+ ch->lock = CIO_FALSE;
+ return CIO_OK;
+}
+
+int cio_chunk_is_locked(struct cio_chunk *ch)
+{
+ return ch->lock;
+}
+
+/*
+ * Start a transaction context: it keep a state of the current calculated
+ * CRC32 (if enabled) and the current number of bytes in the content
+ * area.
+ */
+int cio_chunk_tx_begin(struct cio_chunk *ch)
+{
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ if (cio_chunk_is_locked(ch)) {
+ return CIO_RETRY;
+ }
+
+ if (ch->tx_active == CIO_TRUE) {
+ return CIO_OK;
+ }
+
+ ch->tx_active = CIO_TRUE;
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ ch->tx_crc = mf->crc_cur;
+ ch->tx_content_length = mf->buf_len;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ ch->tx_crc = cf->crc_cur;
+ ch->tx_content_length = cf->data_size;
+ }
+
+ return CIO_OK;
+}
+
+/*
+ * Commit transaction changes, reset transaction context and leave new
+ * changes in place.
+ */
+int cio_chunk_tx_commit(struct cio_chunk *ch)
+{
+ int ret;
+
+ cio_error_reset(ch);
+
+ ret = cio_chunk_sync(ch);
+ if (ret == -1) {
+ return CIO_ERROR;
+ }
+
+ ch->tx_active = CIO_FALSE;
+ return CIO_OK;
+}
+
+/*
+ * Drop changes done since a transaction was initiated */
+int cio_chunk_tx_rollback(struct cio_chunk *ch)
+{
+ int type;
+ struct cio_memfs *mf;
+ struct cio_file *cf;
+
+ cio_error_reset(ch);
+
+ if (ch->tx_active == CIO_FALSE) {
+ return -1;
+ }
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ mf = ch->backend;
+ mf->crc_cur = ch->tx_crc;
+ mf->buf_len = ch->tx_content_length;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ cf->crc_cur = ch->tx_crc;
+ cf->data_size = ch->tx_content_length;
+ }
+
+ ch->tx_active = CIO_FALSE;
+ return CIO_OK;
+}
+
+/*
+ * Determinate if a Chunk content is available in memory for I/O operations. For
+ * Memory backend this is always true, for Filesystem backend it checks if the
+ * memory map exists and file descriptor is open.
+ */
+int cio_chunk_is_up(struct cio_chunk *ch)
+{
+ int type;
+ struct cio_file *cf;
+
+ type = ch->st->type;
+ if (type == CIO_STORE_MEM) {
+ return CIO_TRUE;
+ }
+ else if (type == CIO_STORE_FS) {
+ cf = ch->backend;
+ return cio_file_is_up(ch, cf);
+ }
+
+ return CIO_FALSE;
+}
+
+int cio_chunk_is_file(struct cio_chunk *ch)
+{
+ int type;
+
+ type = ch->st->type;
+ if (type == CIO_STORE_FS) {
+ return CIO_TRUE;
+ }
+
+ return CIO_FALSE;
+}
+
+static inline void chunk_state_sync(struct cio_chunk *ch)
+{
+ struct cio_stream *st;
+
+ if (!ch) {
+ return;
+ }
+
+ mk_list_del(&ch->_state_head);
+ st = ch->st;
+ if (cio_chunk_is_up(ch) == CIO_TRUE) {
+ mk_list_add(&ch->_state_head, &st->chunks_up);
+ }
+ else {
+ mk_list_add(&ch->_state_head, &st->chunks_down);
+ }
+}
+
+int cio_chunk_down(struct cio_chunk *ch)
+{
+ int ret;
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_FS) {
+ ret = cio_file_down(ch);
+ chunk_state_sync(ch);
+ return ret;
+ }
+
+ return CIO_OK;
+}
+
+int cio_chunk_up(struct cio_chunk *ch)
+{
+ int ret;
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_FS) {
+ ret = cio_file_up(ch);
+ chunk_state_sync(ch);
+ return ret;
+ }
+
+ return CIO_OK;
+}
+
+int cio_chunk_up_force(struct cio_chunk *ch)
+{
+ int ret;
+ int type;
+
+ cio_error_reset(ch);
+
+ type = ch->st->type;
+ if (type == CIO_STORE_FS) {
+ ret = cio_file_up_force(ch);
+ chunk_state_sync(ch);
+ return ret;
+ }
+
+ return CIO_OK;
+}
+
+char *cio_version()
+{
+ return CIO_VERSION_STR;
+}
+
+/*
+ * Counters API
+ */
+
+/* Increase the number of total chunks registered (+1) */
+size_t cio_chunk_counter_total_add(struct cio_ctx *ctx)
+{
+ ctx->total_chunks++;
+ return ctx->total_chunks;
+}
+
+/* Decrease the total number of chunks (-1) */
+size_t cio_chunk_counter_total_sub(struct cio_ctx *ctx)
+{
+ ctx->total_chunks--;
+ return ctx->total_chunks;
+}
+
+/* Increase the number of total chunks up in memory (+1) */
+size_t cio_chunk_counter_total_up_add(struct cio_ctx *ctx)
+{
+ ctx->total_chunks_up++;
+ return ctx->total_chunks_up;
+}
+
+/* Decrease the total number of chunks up in memory (-1) */
+size_t cio_chunk_counter_total_up_sub(struct cio_ctx *ctx)
+{
+ ctx->total_chunks_up--;
+ return ctx->total_chunks_up;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_error.c b/fluent-bit/lib/chunkio/src/cio_error.c
new file mode 100644
index 00000000..9049b0a3
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_error.c
@@ -0,0 +1,59 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2021 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_error.h>
+
+char *cio_error_get_str(struct cio_chunk *ch)
+{
+ int err = cio_error_get(ch);
+
+ switch (err) {
+ case CIO_ERR_BAD_CHECKSUM:
+ return "bad checksum";
+ case CIO_ERR_BAD_LAYOUT:
+ return "bad layout or invalid header";
+ case CIO_ERR_PERMISSION:
+ return "permission error";
+ default:
+ return "no error has been specified";
+ }
+}
+
+/* Return the current error number from a chunk */
+int cio_error_get(struct cio_chunk *ch)
+{
+ return ch->error_n;
+}
+
+/* Set an error number in the chunk */
+void cio_error_set(struct cio_chunk *ch, int status)
+{
+ ch->error_n = status;
+
+ if (ch->ctx != NULL) {
+ ch->ctx->last_chunk_error = status;
+ }
+}
+
+/* Reset the error number in a chunk */
+void cio_error_reset(struct cio_chunk *ch)
+{
+ ch->error_n = 0;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_file.c b/fluent-bit/lib/chunkio/src/cio_file.c
new file mode 100644
index 00000000..019baa89
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_file.c
@@ -0,0 +1,1344 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <limits.h>
+
+#include <chunkio/chunkio.h>
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/cio_crc32.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_file_native.h>
+#include <chunkio/cio_file_st.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_error.h>
+#include <chunkio/cio_utils.h>
+
+size_t scio_file_page_size = 0;
+
+char cio_file_init_bytes[] = {
+ /* file type (2 bytes) */
+ CIO_FILE_ID_00, CIO_FILE_ID_01,
+
+ /* crc32 (4 bytes) in network byte order */
+ 0xff, 0x12, 0xd9, 0x41,
+
+ /* padding bytes (we have 16 extra bytes) */
+ 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00,
+
+ /* metadata length (2 bytes) */
+ 0x00, 0x00
+};
+
+#define ROUND_UP(N, S) ((((N) + (S) - 1) / (S)) * (S))
+
+
+/* Calculate content checksum in a variable */
+void cio_file_calculate_checksum(struct cio_file *cf, crc_t *out)
+{
+ crc_t val;
+ size_t len;
+ ssize_t content_length;
+ unsigned char *in_data;
+
+ if (cf->fs_size == 0) {
+ cio_file_update_size(cf);
+ }
+
+ /* Metadata length header + metadata length + content length */
+ len = 2;
+ len += cio_file_st_get_meta_len(cf->map);
+
+ content_length = cio_file_st_get_content_len(cf->map,
+ cf->fs_size,
+ cf->page_size);
+
+ if (content_length > 0) {
+ len += content_length;
+ }
+
+ in_data = (unsigned char *) cf->map + CIO_FILE_CONTENT_OFFSET;
+ val = cio_crc32_update(cf->crc_cur, in_data, len);
+ *out = val;
+}
+
+/* Update crc32 checksum into the memory map */
+static void update_checksum(struct cio_file *cf,
+ unsigned char *data, size_t len)
+{
+ crc_t crc;
+ crc_t tmp;
+
+ if (cf->crc_reset) {
+ cf->crc_cur = cio_crc32_init();
+ cio_file_calculate_checksum(cf, &tmp);
+ cf->crc_cur = tmp;
+ cf->crc_reset = CIO_FALSE;
+ }
+
+ crc = cio_crc32_update(cf->crc_cur, data, len);
+ memcpy(cf->map + 2, &crc, sizeof(crc));
+ cf->crc_cur = crc;
+}
+
+/* Finalize CRC32 context and update the memory map */
+static void finalize_checksum(struct cio_file *cf)
+{
+ crc_t crc;
+
+ crc = cio_crc32_finalize(cf->crc_cur);
+ crc = htonl(crc);
+
+ memcpy(cf->map + 2, &crc, sizeof(crc));
+}
+
+/*
+ * adjust_layout: if metadata has changed, we need to adjust the content
+ * data and reference pointers.
+ */
+static int adjust_layout(struct cio_chunk *ch,
+ struct cio_file *cf, size_t meta_size)
+{
+ cio_file_st_set_meta_len(cf->map, (uint16_t) meta_size);
+
+ /* Update checksum */
+ if (ch->ctx->options.flags & CIO_CHECKSUM) {
+ /* reset current crc since we are calculating from zero */
+ cf->crc_cur = cio_crc32_init();
+ cio_file_calculate_checksum(cf, &cf->crc_cur);
+ }
+
+ /* Sync changes to disk */
+ cf->synced = CIO_FALSE;
+
+ return 0;
+}
+
+/* Initialize Chunk header & structure */
+static void write_init_header(struct cio_chunk *ch, struct cio_file *cf)
+{
+ memcpy(cf->map, cio_file_init_bytes, sizeof(cio_file_init_bytes));
+
+ /* If no checksum is enabled, reset the initial crc32 bytes */
+ if (!(ch->ctx->options.flags & CIO_CHECKSUM)) {
+ cf->map[2] = 0;
+ cf->map[3] = 0;
+ cf->map[4] = 0;
+ cf->map[5] = 0;
+ }
+
+ cio_file_st_set_content_len(cf->map, 0);
+}
+
+/* Return the available size in the file map to write data */
+static size_t get_available_size(struct cio_file *cf, int *meta_len)
+{
+ size_t av;
+ int metadata_len;
+
+ /* Get metadata length */
+ metadata_len = cio_file_st_get_meta_len(cf->map);
+
+ av = cf->alloc_size;
+ av -= CIO_FILE_HEADER_MIN;
+ av -= metadata_len;
+ av -= cf->data_size;
+
+ *meta_len = metadata_len;
+
+ return av;
+}
+
+/*
+ * For the recently opened or created file, check the structure format
+ * and validate relevant fields.
+ */
+static int cio_file_format_check(struct cio_chunk *ch,
+ struct cio_file *cf, int flags)
+{
+ size_t metadata_length;
+ ssize_t content_length;
+ ssize_t logical_length;
+ unsigned char *p;
+ crc_t crc_check;
+ crc_t crc;
+
+ (void) flags;
+
+ p = (unsigned char *) cf->map;
+
+ /* If the file is empty, put the structure on it */
+ if (cf->fs_size == 0) {
+ /* check we have write permissions */
+ if ((cf->flags & CIO_OPEN) == 0) {
+ cio_log_warn(ch->ctx,
+ "[cio file] cannot initialize chunk (read-only)");
+ cio_error_set(ch, CIO_ERR_PERMISSION);
+
+ return -1;
+ }
+
+ /* at least we need 24 bytes as allocated space */
+ if (cf->alloc_size < CIO_FILE_HEADER_MIN) {
+ cio_log_warn(ch->ctx, "[cio file] cannot initialize chunk");
+ cio_error_set(ch, CIO_ERR_BAD_LAYOUT);
+
+ return -1;
+ }
+
+ /* Initialize init bytes */
+ write_init_header(ch, cf);
+
+ /* Write checksum in context (note: crc32 not finalized) */
+ if (ch->ctx->options.flags & CIO_CHECKSUM) {
+ cio_file_calculate_checksum(cf, &cf->crc_cur);
+ }
+ }
+ else {
+ /* Check first two bytes */
+ if (p[0] != CIO_FILE_ID_00 || p[1] != CIO_FILE_ID_01) {
+ cio_log_debug(ch->ctx, "[cio file] invalid header at %s",
+ ch->name);
+ cio_error_set(ch, CIO_ERR_BAD_LAYOUT);
+
+ return -1;
+ }
+
+ /* Expected / logical file size verification */
+ content_length = cio_file_st_get_content_len(cf->map,
+ cf->fs_size,
+ cf->page_size);
+
+ if (content_length == -1) {
+ cio_log_debug(ch->ctx, "[cio file] truncated header (%zu / %zu) %s",
+ cf->fs_size, CIO_FILE_HEADER_MIN, ch->name);
+ cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE);
+
+ return -1;
+ }
+
+ metadata_length = cio_file_st_get_meta_len(cf->map);
+
+ logical_length = CIO_FILE_HEADER_MIN +
+ metadata_length +
+ content_length;
+
+ if (logical_length > cf->fs_size) {
+ cio_log_debug(ch->ctx, "[cio file] truncated file (%zd / %zd) %s",
+ cf->fs_size, logical_length, ch->name);
+ cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE);
+
+ return -1;
+ }
+
+ /* Checksum */
+ if (ch->ctx->options.flags & CIO_CHECKSUM) {
+ /* Initialize CRC variable */
+ cf->crc_cur = cio_crc32_init();
+
+ /* Get checksum stored in the mmap */
+ p = (unsigned char *) cio_file_st_get_hash(cf->map);
+
+ /* Calculate content checksum */
+ cio_file_calculate_checksum(cf, &crc);
+
+ /* Compare */
+ crc_check = cio_crc32_finalize(crc);
+ crc_check = htonl(crc_check);
+
+ if (memcmp(p, &crc_check, sizeof(crc_check)) != 0) {
+ cio_log_info(ch->ctx, "[cio file] invalid crc32 at %s/%s",
+ ch->name, cf->path);
+ cio_error_set(ch, CIO_ERR_BAD_CHECKSUM);
+
+ return -1;
+ }
+
+ cf->crc_cur = crc;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Unmap the memory for the opened file in question. It make sure
+ * to sync changes to disk first.
+ */
+static int munmap_file(struct cio_ctx *ctx, struct cio_chunk *ch)
+{
+ int ret;
+ struct cio_file *cf;
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (!cf) {
+ return -1;
+ }
+
+ /* File not mapped */
+ if (cf->map == NULL) {
+ return -1;
+ }
+
+ /* Sync pending changes to disk */
+ if (cf->synced == CIO_FALSE) {
+ ret = cio_file_sync(ch);
+ if (ret == -1) {
+ cio_log_error(ch->ctx,
+ "[cio file] error syncing file at "
+ "%s:%s", ch->st->name, ch->name);
+ }
+ }
+
+ /* Unmap file */
+ cio_file_native_unmap(cf);
+
+ cf->data_size = 0;
+ cf->alloc_size = 0;
+
+ /* Adjust counters */
+ cio_chunk_counter_total_up_sub(ctx);
+
+ return 0;
+}
+
+/*
+ * This function creates the memory map for the open file descriptor plus
+ * setup the chunk structure reference.
+ */
+static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size)
+{
+ ssize_t content_size;
+ size_t fs_size;
+ int ret;
+ struct cio_file *cf;
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (cf->map != NULL) {
+ return CIO_OK;
+ }
+
+ /*
+ * 'size' value represents the value of a previous fstat(2) set by a previous
+ * caller. If the value is greater than zero, just use it, otherwise do a new
+ * fstat(2) of the file descriptor.
+ */
+
+ fs_size = 0;
+
+ if (size > 0) {
+ fs_size = size;
+ }
+ else {
+ /* Get file size from the file system */
+ ret = cio_file_native_get_size(cf, &fs_size);
+
+ if (ret != CIO_OK) {
+ cio_file_report_os_error();
+
+ return CIO_ERROR;
+ }
+ }
+
+ /* If the file is not empty, use file size for the memory map */
+ if (fs_size > 0) {
+ size = fs_size;
+ cf->synced = CIO_TRUE;
+ }
+ else if (fs_size == 0) {
+ /* We can only prepare a file if it has been opened in RW mode */
+ if ((cf->flags & CIO_OPEN_RW) == 0) {
+ cio_error_set(ch, CIO_ERR_PERMISSION);
+
+ return CIO_CORRUPTED;
+ }
+
+ cf->synced = CIO_FALSE;
+
+ /* Adjust size to make room for headers */
+ if (size < CIO_FILE_HEADER_MIN) {
+ size += CIO_FILE_HEADER_MIN;
+ }
+
+ /* For empty files, make room in the file system */
+ size = ROUND_UP(size, ctx->page_size);
+ ret = cio_file_resize(cf, size);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ctx, "cannot adjust chunk size '%s' to %lu bytes",
+ cf->path, size);
+
+ return CIO_ERROR;
+ }
+
+ cio_log_debug(ctx, "%s:%s adjusting size OK", ch->st->name, ch->name);
+ }
+
+ cf->alloc_size = size;
+
+ /* Map the file */
+ ret = cio_file_native_map(cf, cf->alloc_size);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ctx, "cannot mmap/read chunk '%s'", cf->path);
+
+ return CIO_ERROR;
+ }
+
+ /* check content data size */
+ if (fs_size > 0) {
+ content_size = cio_file_st_get_content_len(cf->map,
+ fs_size,
+ cf->page_size);
+
+ if (content_size == -1) {
+ cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE);
+
+ cio_log_error(ctx, "invalid content size %s", cf->path);
+
+ cio_file_native_unmap(cf);
+
+ cf->data_size = 0;
+ cf->alloc_size = 0;
+
+ return CIO_CORRUPTED;
+ }
+
+
+ cf->data_size = content_size;
+ cf->fs_size = fs_size;
+ }
+ else {
+ cf->data_size = 0;
+ cf->fs_size = 0;
+ }
+
+ ret = cio_file_format_check(ch, cf, cf->flags);
+
+ if (ret != 0) {
+ cio_log_error(ctx, "format check failed: %s/%s",
+ ch->st->name, ch->name);
+
+ cio_file_native_unmap(cf);
+
+ cf->data_size = 0;
+
+ return CIO_CORRUPTED;
+ }
+
+ cf->st_content = cio_file_st_get_content(cf->map);
+ cio_log_debug(ctx, "%s:%s mapped OK", ch->st->name, ch->name);
+
+ /* The mmap succeeded, adjust the counters */
+ cio_chunk_counter_total_up_add(ctx);
+
+ return CIO_OK;
+}
+
+int cio_file_lookup_user(char *user, void **result)
+{
+ return cio_file_native_lookup_user(user, result);
+}
+
+int cio_file_lookup_group(char *group, void **result)
+{
+ return cio_file_native_lookup_group(group, result);
+}
+
+int cio_file_read_prepare(struct cio_ctx *ctx, struct cio_chunk *ch)
+{
+ return mmap_file(ctx, ch, 0);
+}
+
+int cio_file_content_copy(struct cio_chunk *ch,
+ void **out_buf, size_t *out_size)
+{
+ int ret;
+ int set_down = CIO_FALSE;
+ char *buf;
+ char *data = NULL;
+ size_t size;
+ struct cio_file *cf = ch->backend;
+
+ /* If the file content is already up, just do a copy of the memory map */
+ if (cio_chunk_is_up(ch) == CIO_FALSE) {
+ ret = cio_chunk_up_force(ch);
+ if (ret != CIO_OK ){
+ return CIO_ERROR;
+ }
+ set_down = CIO_TRUE;
+ }
+
+ size = cf->data_size;
+ data = cio_file_st_get_content(cf->map);
+
+ if (!data) {
+ if (set_down == CIO_TRUE) {
+ cio_chunk_down(ch);
+ }
+ return CIO_ERROR;
+ }
+
+ buf = malloc(size + 1);
+ if (!buf) {
+ cio_errno();
+ if (set_down == CIO_TRUE) {
+ cio_chunk_down(ch);
+ }
+ return CIO_ERROR;
+ }
+ memcpy(buf, data, size);
+ buf[size] = '\0';
+
+ *out_buf = buf;
+ *out_size = size;
+
+ if (set_down == CIO_TRUE) {
+ cio_chunk_down(ch);
+ }
+
+ return CIO_OK;
+}
+
+/*
+ * If the maximum number of 'up' chunks is reached, put this chunk
+ * down (only at open time).
+ */
+static inline int open_and_up(struct cio_ctx *ctx)
+{
+ if (ctx->total_chunks_up >= ctx->max_chunks_up) {
+ return CIO_FALSE;
+ }
+
+ return CIO_TRUE;
+}
+
+/*
+ * Fetch the file size regardless of if we opened this file or not.
+ */
+size_t cio_file_real_size(struct cio_file *cf)
+{
+ size_t file_size;
+ int ret;
+
+ ret = cio_file_native_get_size(cf, &file_size);
+
+ if (ret != CIO_OK) {
+ return 0;
+ }
+
+ return file_size;
+}
+
+static int format_acl_error_message(struct cio_ctx *ctx,
+ struct cio_file *cf,
+ char *output_buffer,
+ size_t output_buffer_size)
+{
+ char *connector;
+ int result;
+ char *group;
+ char *user;
+
+ user = ctx->options.user;
+ group = ctx->options.group;
+ connector = "with group";
+
+ if (user == NULL) {
+ user = "";
+ connector = "";
+ }
+
+ if (group == NULL) {
+ group = "";
+ connector = "";
+ }
+
+ result = snprintf(output_buffer, output_buffer_size - 1,
+ "cannot change ownership of %s to %s %s %s",
+ cf->path, user, connector, group);
+
+ if (result < 0) {
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+/*
+ * Open or create a data file: the following behavior is expected depending
+ * of the passed flags:
+ *
+ * CIO_OPEN | CIO_OPEN_RW:
+ * - Open for read/write, if the file don't exist, it's created and the
+ * memory map size is assigned to the given value on 'size'.
+ *
+ * CIO_OPEN_RD:
+ * - If file exists, open it in read-only mode.
+ */
+struct cio_file *cio_file_open(struct cio_ctx *ctx,
+ struct cio_stream *st,
+ struct cio_chunk *ch,
+ int flags,
+ size_t size,
+ int *err)
+{
+ char error_message[256];
+ char *path;
+ int ret;
+ struct cio_file *cf;
+
+ (void) size;
+
+ ret = cio_file_native_filename_check(ch->name);
+ if (ret != CIO_OK) {
+ cio_log_error(ctx, "[cio file] invalid file name");
+
+ return NULL;
+ }
+
+ path = cio_file_native_compose_path(ctx->options.root_path, st->name, ch->name);
+ if (path == NULL) {
+ return NULL;
+ }
+
+ /* Create file context */
+ cf = calloc(1, sizeof(struct cio_file));
+ if (!cf) {
+ cio_errno();
+ free(path);
+
+ return NULL;
+ }
+
+ cf->fd = -1;
+ cf->flags = flags;
+ cf->page_size = cio_getpagesize();
+
+ if (ctx->realloc_size_hint > 0) {
+ cf->realloc_size = ctx->realloc_size_hint;
+ }
+ else {
+ cf->realloc_size = CIO_REALLOC_HINT_MIN;
+ }
+
+ cf->st_content = NULL;
+ cf->crc_cur = cio_crc32_init();
+ cf->path = path;
+ cf->map = NULL;
+ ch->backend = cf;
+
+#ifdef _WIN32
+ cf->backing_file = INVALID_HANDLE_VALUE;
+ cf->backing_mapping = INVALID_HANDLE_VALUE;
+#endif
+
+#if defined (CIO_HAVE_FALLOCATE)
+ cf->allocate_strategy = CIO_FILE_LINUX_FALLOCATE;
+#endif
+
+ /* Should we open and put this file up ? */
+ ret = open_and_up(ctx);
+
+ if (ret == CIO_FALSE) {
+ /* we reached our limit, leave the file 'down' */
+ cio_file_update_size(cf);
+
+ /*
+ * Due to he current resource limiting logic we could
+ * get to this point without a file existing so we just
+ * ignore the error.
+ */
+
+ return cf;
+ }
+
+ /* Open the file */
+ ret = cio_file_native_open(cf);
+
+ if (ret != CIO_OK) {
+ free(path);
+ free(cf);
+
+ *err = ret;
+
+ return NULL;
+ }
+
+ /* Update the file size field */
+ ret = cio_file_update_size(cf);
+
+ if (ret != CIO_OK) {
+ cio_file_native_close(cf);
+
+ free(path);
+ free(cf);
+
+ *err = ret;
+
+ return NULL;
+ }
+
+ /* Set the file ownership and permissions */
+ ret = cio_file_native_apply_acl_and_settings(ctx, cf);
+
+ if (ret != CIO_OK) {
+ *err = ret;
+
+ ret = format_acl_error_message(ctx, cf, error_message, sizeof(error_message));
+
+ if (ret != CIO_OK) {
+ cio_log_error(ctx, "error generating error message for acl failure");
+ }
+ else {
+ cio_log_error(ctx, error_message);
+ }
+
+ cio_file_native_close(cf);
+
+ free(path);
+ free(cf);
+
+ return NULL;
+ }
+
+ /* Map the file */
+ ret = mmap_file(ctx, ch, cf->fs_size);
+ if (ret == CIO_ERROR || ret == CIO_CORRUPTED || ret == CIO_RETRY) {
+ cio_file_native_close(cf);
+
+ free(path);
+ free(cf);
+
+ *err = ret;
+
+ return NULL;
+ }
+
+ *err = CIO_OK;
+
+ return cf;
+}
+
+/* This function is used to delete a chunk by name, its only purpose is to delete
+ * chunks that cannnot be loaded (otherwise we would set them down with the delete
+ * flag set to TRUE).
+ */
+int cio_file_delete(struct cio_ctx *ctx, struct cio_stream *st, const char *name)
+{
+ char *path;
+ int ret;
+
+ ret = cio_file_native_filename_check((char *) name);
+ if (ret != CIO_OK) {
+ cio_log_error(ctx, "[cio file] invalid file name");
+
+ return CIO_ERROR;
+ }
+
+ path = cio_file_native_compose_path(ctx->options.root_path, st->name, (char *) name);
+ if (path == NULL) {
+ return CIO_ERROR;
+ }
+
+ ret = cio_file_native_delete_by_path(path);
+
+ free(path);
+
+ return ret;
+}
+
+/*
+ * Put a file content back into memory, only IF it has been set 'down'
+ * before.
+ */
+static int _cio_file_up(struct cio_chunk *ch, int enforced)
+{
+ int ret;
+ struct cio_file *cf = (struct cio_file *) ch->backend;
+
+ if (cf->map) {
+ cio_log_error(ch->ctx, "[cio file] file is already mapped: %s/%s",
+ ch->st->name, ch->name);
+ return CIO_ERROR;
+ }
+
+ if (cf->fd > 0) {
+ cio_log_error(ch->ctx, "[cio file] file descriptor already exists: "
+ "[fd=%i] %s:%s", cf->fd, ch->st->name, ch->name);
+ return CIO_ERROR;
+ }
+
+ /*
+ * Enforced mechanism provides safety based on Chunk I/O storage
+ * pre-set limits.
+ */
+ if (enforced == CIO_TRUE) {
+ ret = open_and_up(ch->ctx);
+ if (ret == CIO_FALSE) {
+ return CIO_ERROR;
+ }
+ }
+
+ /* Open file */
+ ret = cio_file_native_open(cf);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ch->ctx, "[cio file] cannot open chunk: %s/%s",
+ ch->st->name, ch->name);
+ return CIO_ERROR;
+ }
+
+ ret = cio_file_update_size(cf);
+ if (ret != CIO_OK) {
+ return CIO_ERROR;
+ }
+
+ /*
+ * Map content:
+ *
+ * return values = CIO_OK, CIO_ERROR, CIO_CORRUPTED or CIO_RETRY
+ */
+ ret = mmap_file(ch->ctx, ch, cf->fs_size);
+ if (ret == CIO_ERROR) {
+ cio_log_error(ch->ctx, "[cio file] cannot map chunk: %s/%s",
+ ch->st->name, ch->name);
+ }
+
+ /*
+ * 'ret' can still be CIO_CORRUPTED or CIO_RETRY on those cases we
+ * close the file descriptor
+ */
+ if (ret == CIO_CORRUPTED || ret == CIO_RETRY) {
+ /*
+ * we just remove resources: close the recently opened file
+ * descriptor, we never delete the Chunk at this stage since
+ * the caller must take that action.
+ */
+ cio_file_native_close(cf);
+ }
+
+ return ret;
+}
+
+/*
+ * Load a file using 'enforced' mode: do not load the file in memory
+ * if we already passed memory or max_chunks_up restrictions.
+ */
+int cio_file_up(struct cio_chunk *ch)
+{
+ return _cio_file_up(ch, CIO_TRUE);
+}
+
+/* Load a file in non-enforced mode. This means it will load the file
+ * in memory skipping restrictions set by configuration.
+ *
+ * The use case of this call is when the caller needs to write data
+ * to a file which is down due to restrictions. But then the caller
+ * must put the chunk 'down' again if that was it original status.
+ */
+int cio_file_up_force(struct cio_chunk *ch)
+{
+ return _cio_file_up(ch, CIO_FALSE);
+}
+
+int cio_file_update_size(struct cio_file *cf)
+{
+ int result;
+
+ result = cio_file_native_get_size(cf, &cf->fs_size);
+
+ if (result != CIO_OK) {
+ cf->fs_size = 0;
+ }
+
+ return result;
+}
+
+/* Release memory and file descriptor resources but keep context */
+int cio_file_down(struct cio_chunk *ch)
+{
+ int ret;
+ struct cio_file *cf;
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (cf->map == NULL) {
+ cio_log_error(ch->ctx, "[cio file] file is not mapped: %s/%s",
+ ch->st->name, ch->name);
+ return -1;
+ }
+
+ /* unmap memory */
+ munmap_file(ch->ctx, ch);
+
+ /* Allocated map size is zero */
+ cf->alloc_size = 0;
+
+ /* Update the file size */
+ ret = cio_file_update_size(cf);
+
+ if (ret != CIO_OK) {
+ cio_errno();
+ }
+
+ /* Close file descriptor */
+ cio_file_native_close(cf);
+
+ return 0;
+}
+
+void cio_file_close(struct cio_chunk *ch, int delete)
+{
+ int ret;
+ struct cio_file *cf;
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (cf == NULL) {
+ return;
+ }
+
+ /* Safe unmap of the file content */
+ munmap_file(ch->ctx, ch);
+
+ /* Close file descriptor */
+ cio_file_native_close(cf);
+
+ /* Should we delete the content from the file system ? */
+ if (delete == CIO_TRUE) {
+ ret = cio_file_native_delete(cf);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ch->ctx,
+ "[cio file] error deleting file at close %s:%s",
+ ch->st->name, ch->name);
+ }
+ }
+
+ free(cf->path);
+ free(cf);
+}
+
+
+int cio_file_write(struct cio_chunk *ch, const void *buf, size_t count)
+{
+ int ret;
+ int meta_len;
+ int pre_content;
+ size_t av_size;
+ size_t old_size;
+ size_t new_size;
+ struct cio_file *cf;
+
+ if (count == 0) {
+ /* do nothing */
+ return 0;
+ }
+
+ if (!ch) {
+ return -1;
+ }
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (cio_chunk_is_up(ch) == CIO_FALSE) {
+ cio_log_error(ch->ctx, "[cio file] file is not mmap()ed: %s:%s",
+ ch->st->name, ch->name);
+ return -1;
+ }
+
+ /* get available size */
+ av_size = get_available_size(cf, &meta_len);
+
+ /* validate there is enough space, otherwise resize */
+ if (av_size < count) {
+ /* Set the pre-content size (chunk header + metadata) */
+ pre_content = (CIO_FILE_HEADER_MIN + meta_len);
+
+ new_size = cf->alloc_size + cf->realloc_size;
+ while (new_size < (pre_content + cf->data_size + count)) {
+ new_size += cf->realloc_size;
+ }
+
+ old_size = cf->alloc_size;
+ new_size = ROUND_UP(new_size, ch->ctx->page_size);
+
+ ret = cio_file_resize(cf, new_size);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ch->ctx,
+ "[cio_file] error setting new file size on write");
+ return -1;
+ }
+
+ cio_log_debug(ch->ctx,
+ "[cio file] alloc_size from %lu to %lu",
+ old_size, new_size);
+ }
+
+ /* If crc_reset was toggled we know that data_size was
+ * modified by cio_chunk_write_at which means we need
+ * to update the header before we recalculate the checksum
+ */
+ if (cf->crc_reset) {
+ cio_file_st_set_content_len(cf->map, cf->data_size);
+ }
+
+ if (ch->ctx->options.flags & CIO_CHECKSUM) {
+ update_checksum(cf, (unsigned char *) buf, count);
+ }
+
+ cf->st_content = cio_file_st_get_content(cf->map);
+ memcpy(cf->st_content + cf->data_size, buf, count);
+
+ cf->data_size += count;
+ cf->synced = CIO_FALSE;
+
+ cio_file_st_set_content_len(cf->map, cf->data_size);
+
+ return 0;
+}
+
+int cio_file_write_metadata(struct cio_chunk *ch, char *buf, size_t size)
+{
+ int ret;
+ char *meta;
+ char *cur_content_data;
+ char *new_content_data;
+ size_t new_size;
+ size_t content_av;
+ size_t meta_av;
+ struct cio_file *cf;
+
+ cf = ch->backend;
+
+ if (cio_file_is_up(ch, cf) == CIO_FALSE) {
+ return -1;
+ }
+
+ /* Get metadata pointer */
+ meta = cio_file_st_get_meta(cf->map);
+
+ /* Check if meta already have some space available to overwrite */
+ meta_av = cio_file_st_get_meta_len(cf->map);
+
+ /* If there is some space available, just overwrite */
+ if (meta_av >= size) {
+ /* copy new metadata */
+ memcpy(meta, buf, size);
+
+ /* there are some remaining bytes, adjust.. */
+ cur_content_data = cio_file_st_get_content(cf->map);
+ new_content_data = meta + size;
+ memmove(new_content_data, cur_content_data, cf->data_size);
+ adjust_layout(ch, cf, size);
+
+ return 0;
+ }
+
+ /*
+ * The optimal case is if there is no content data, the non-optimal case
+ * where we need to increase the memory map size, move the content area
+ * bytes to a different position and write the metadata.
+ *
+ * Calculate the available space in the content area.
+ */
+ content_av = cf->alloc_size - cf->data_size;
+
+ /* If there is no enough space, increase the file size and it memory map */
+ if (content_av < size) {
+ new_size = (size - meta_av) + cf->data_size + CIO_FILE_HEADER_MIN;
+
+ ret = cio_file_resize(cf, new_size);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ch->ctx,
+ "[cio meta] error resizing mapped file");
+
+ return -1;
+ }
+ }
+
+ /* get meta reference again in case the map address has changed */
+ meta = cio_file_st_get_meta(cf->map);
+
+ /* set new position for the content data */
+ cur_content_data = cio_file_st_get_content(cf->map);
+ new_content_data = meta + size;
+ memmove(new_content_data, cur_content_data, size);
+
+ /* copy new metadata */
+ memcpy(meta, buf, size);
+ adjust_layout(ch, cf, size);
+
+ return 0;
+}
+
+int cio_file_sync(struct cio_chunk *ch)
+{
+ int ret;
+ int meta_len;
+ size_t desired_size;
+ size_t file_size;
+ size_t av_size;
+ struct cio_file *cf;
+
+ if (ch == NULL) {
+ return -1;
+ }
+
+ cf = (struct cio_file *) ch->backend;
+
+ if (cf == NULL) {
+ return -1;
+ }
+
+ if (cf->flags & CIO_OPEN_RD) {
+ return 0;
+ }
+
+ if (cf->synced == CIO_TRUE) {
+ return 0;
+ }
+
+ ret = cio_file_native_get_size(cf, &file_size);
+
+ if (ret != CIO_OK) {
+ cio_file_report_os_error();
+
+ return -1;
+ }
+
+ /* File trimming has been made opt-in because it causes
+ * performance degradation and excessive fragmentation
+ * in XFS.
+ */
+ if ((ch->ctx->options.flags & CIO_TRIM_FILES) != 0) {
+ /* If there are extra space, truncate the file size */
+ av_size = get_available_size(cf, &meta_len);
+
+ if (av_size > 0) {
+ desired_size = cf->alloc_size - av_size;
+ }
+ else if (cf->alloc_size > file_size) {
+ desired_size = cf->alloc_size;
+ }
+ else {
+ desired_size = file_size;
+ }
+
+ if (desired_size != file_size) {
+ /* When file trimming is enabled we still round the file size up
+ * to the memory page size because even though not explicitly
+ * stated there seems to be a performance degradation issue that
+ * correlates with sub-page mapping.
+ */
+ desired_size = ROUND_UP(desired_size, ch->ctx->page_size);
+
+ ret = cio_file_resize(cf, desired_size);
+
+ if (ret != CIO_OK) {
+ cio_log_error(ch->ctx,
+ "[cio file sync] error adjusting size at: "
+ " %s/%s", ch->st->name, ch->name);
+
+ return ret;
+ }
+ }
+ }
+
+ /* Finalize CRC32 checksum */
+ if (ch->ctx->options.flags & CIO_CHECKSUM) {
+ finalize_checksum(cf);
+ }
+
+ /* Commit changes to disk */
+ ret = cio_file_native_sync(cf, ch->ctx->options.flags);
+
+ if (ret != CIO_OK) {
+ return -1;
+ }
+
+ cf->synced = CIO_TRUE;
+
+ ret = cio_file_update_size(cf);
+
+ if (ret != CIO_OK) {
+ return -1;
+ }
+
+ cio_log_debug(ch->ctx, "[cio file] synced at: %s/%s",
+ ch->st->name, ch->name);
+
+ return 0;
+}
+
+int cio_file_resize(struct cio_file *cf, size_t new_size)
+{
+ int inner_result;
+ size_t mapped_size;
+ int mapped_flag;
+ int result;
+
+ mapped_flag = cio_file_native_is_mapped(cf);
+ mapped_size = cf->alloc_size;
+
+#ifdef _WIN32
+ if (mapped_flag) {
+ result = cio_file_native_unmap(cf);
+
+ if (result != CIO_OK) {
+ return result;
+ }
+ }
+#endif
+
+ result = cio_file_native_resize(cf, new_size);
+
+ if (result != CIO_OK) {
+ cio_file_native_report_os_error();
+
+#ifdef _WIN32
+ if (mapped_flag) {
+ inner_result = cio_file_native_map(cf, mapped_size);
+ }
+#endif
+
+ return result;
+ }
+
+ if (mapped_flag) {
+#ifdef _WIN32
+ result = cio_file_native_map(cf, new_size);
+#else
+ result = cio_file_native_remap(cf, new_size);
+#endif
+
+ if (result != CIO_OK) {
+ return result;
+ }
+ }
+
+ (void) mapped_size;
+ (void) inner_result;
+
+ return CIO_OK;
+}
+
+char *cio_file_hash(struct cio_file *cf)
+{
+ return (cf->map + 2);
+}
+
+void cio_file_hash_print(struct cio_file *cf)
+{
+ printf("crc cur=%lu\n", (long unsigned int)cf->crc_cur);
+ printf("%08lx\n", (long unsigned int ) cf->crc_cur);
+}
+
+/* Dump files from given stream */
+void cio_file_scan_dump(struct cio_ctx *ctx, struct cio_stream *st)
+{
+ int ret;
+ int meta_len;
+ int set_down = CIO_FALSE;
+ char *p;
+ crc_t crc;
+ crc_t crc_fs;
+ char tmp[PATH_MAX];
+ struct mk_list *head;
+ struct cio_chunk *ch;
+ struct cio_file *cf;
+
+ mk_list_foreach(head, &st->chunks) {
+ ch = mk_list_entry(head, struct cio_chunk, _head);
+ cf = ch->backend;
+
+ if (cio_file_is_up(ch, cf) == CIO_FALSE) {
+ ret = cio_file_up(ch);
+ if (ret == -1) {
+ continue;
+ }
+ set_down = CIO_TRUE;
+ }
+
+ snprintf(tmp, sizeof(tmp) -1, "%s/%s", st->name, ch->name);
+ meta_len = cio_file_st_get_meta_len(cf->map);
+
+ p = cio_file_st_get_hash(cf->map);
+
+ memcpy(&crc_fs, p, sizeof(crc_fs));
+ crc_fs = ntohl(crc_fs);
+
+ printf(" %-60s", tmp);
+
+ /*
+ * the crc32 specified in the file is stored in 'val' now, if
+ * checksum mode is enabled we have to verify it.
+ */
+ if (ctx->options.flags & CIO_CHECKSUM) {
+ cio_file_calculate_checksum(cf, &crc);
+
+ /*
+ * finalize the checksum and compare it value using the
+ * host byte order.
+ */
+ crc = cio_crc32_finalize(crc);
+ if (crc != crc_fs) {
+ printf("checksum error=%08x expected=%08x, ",
+ (uint32_t) crc_fs, (uint32_t) crc);
+ }
+ }
+ printf("meta_len=%d, data_size=%zu, crc=%08x\n",
+ meta_len, cf->data_size, (uint32_t) crc_fs);
+
+ if (set_down == CIO_TRUE) {
+ cio_file_down(ch);
+ }
+ }
+}
+
+/* Check if a file content is up in memory and a file descriptor is set */
+int cio_file_is_up(struct cio_chunk *ch, struct cio_file *cf)
+{
+ (void) ch;
+
+ if (cio_file_native_is_open(cf) &&
+ cio_file_native_is_mapped(cf)) {
+ return CIO_TRUE;
+ }
+
+ return CIO_FALSE;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_file_unix.c b/fluent-bit/lib/chunkio/src/cio_file_unix.c
new file mode 100644
index 00000000..72d49312
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_file_unix.c
@@ -0,0 +1,570 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <limits.h>
+#include <pwd.h>
+#include <grp.h>
+
+#include <chunkio/chunkio.h>
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/cio_crc32.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_file_native.h>
+#include <chunkio/cio_file_st.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_error.h>
+#include <chunkio/cio_utils.h>
+
+
+int cio_file_native_unmap(struct cio_file *cf)
+{
+ int ret;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (!cio_file_native_is_mapped(cf)) {
+ return CIO_OK;
+ }
+
+ ret = munmap(cf->map, cf->alloc_size);
+
+ if (ret != 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->alloc_size = 0;
+ cf->map = NULL;
+
+ return CIO_OK;
+}
+
+int cio_file_native_map(struct cio_file *cf, size_t map_size)
+{
+ int flags;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (!cio_file_native_is_open(cf)) {
+ return CIO_ERROR;
+ }
+
+ if (cio_file_native_is_mapped(cf)) {
+ return CIO_OK;
+ }
+
+ if (cf->flags & CIO_OPEN_RW) {
+ flags = PROT_READ | PROT_WRITE;
+ }
+ else if (cf->flags & CIO_OPEN_RD) {
+ flags = PROT_READ;
+ }
+ else {
+ return CIO_ERROR;
+ }
+
+ cf->map = mmap(0, map_size, flags, MAP_SHARED, cf->fd, 0);
+
+ if (cf->map == MAP_FAILED) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->alloc_size = map_size;
+
+ return CIO_OK;
+}
+
+int cio_file_native_remap(struct cio_file *cf, size_t new_size)
+{
+ int result;
+ void *tmp;
+
+ result = 0;
+
+/* OSX mman does not implement mremap or MREMAP_MAYMOVE. */
+#ifndef MREMAP_MAYMOVE
+ result = cio_file_native_unmap(cf);
+
+ if (result == -1) {
+ return result;
+ }
+
+ tmp = mmap(0, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, cf->fd, 0);
+#else
+ (void) result;
+
+ tmp = mremap(cf->map, cf->alloc_size, new_size, MREMAP_MAYMOVE);
+#endif
+
+ if (tmp == MAP_FAILED) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->map = tmp;
+ cf->alloc_size = new_size;
+
+ return CIO_OK;
+}
+
+int cio_file_native_lookup_user(char *user, void **result)
+{
+ long query_buffer_size;
+ struct passwd *query_result;
+ char *query_buffer;
+ struct passwd passwd_entry;
+ int api_result;
+
+ if (user == NULL) {
+ *result = calloc(1, sizeof(uid_t));
+
+ if (*result == NULL) {
+ cio_file_native_report_runtime_error();
+
+ return CIO_ERROR;
+ }
+
+ **(uid_t **) result = (uid_t) -1;
+ }
+
+ query_buffer_size = sysconf(_SC_GETPW_R_SIZE_MAX);
+
+ if (query_buffer_size == -1) {
+ query_buffer_size = 4096 * 10;
+ }
+
+ query_buffer = calloc(1, query_buffer_size);
+
+ if (query_buffer == NULL) {
+ return CIO_ERROR;
+ }
+
+ query_result = NULL;
+
+ api_result = getpwnam_r(user, &passwd_entry, query_buffer,
+ query_buffer_size, &query_result);
+
+ if (api_result != 0 || query_result == NULL) {
+ cio_file_native_report_os_error();
+
+ free(query_buffer);
+
+ return CIO_ERROR;
+ }
+
+ *result = calloc(1, sizeof(uid_t));
+
+ if (*result == NULL) {
+ cio_file_native_report_runtime_error();
+
+ free(query_buffer);
+
+ return CIO_ERROR;
+ }
+
+ **(uid_t **) result = query_result->pw_uid;
+
+ free(query_buffer);
+
+ return CIO_OK;
+}
+
+int cio_file_native_lookup_group(char *group, void **result)
+{
+ long query_buffer_size;
+ struct group *query_result;
+ char *query_buffer;
+ struct group group_entry;
+ int api_result;
+
+ if (group == NULL) {
+ *result = calloc(1, sizeof(gid_t));
+
+ if (*result == NULL) {
+ cio_file_native_report_runtime_error();
+
+ return CIO_ERROR;
+ }
+
+ **(gid_t **) result = (gid_t) -1;
+ }
+
+ query_buffer_size = sysconf(_SC_GETGR_R_SIZE_MAX);
+
+ if (query_buffer_size == -1) {
+ query_buffer_size = 4096 * 10;
+ }
+
+ query_buffer = calloc(1, query_buffer_size);
+
+ if (query_buffer == NULL) {
+ return CIO_ERROR;
+ }
+
+ query_result = NULL;
+
+ api_result = getgrnam_r(group, &group_entry, query_buffer,
+ query_buffer_size, &query_result);
+
+ if (api_result != 0 || query_result == NULL) {
+ cio_file_native_report_os_error();
+
+ free(query_buffer);
+
+ return CIO_ERROR;
+ }
+
+ *result = calloc(1, sizeof(gid_t));
+
+ if (*result == NULL) {
+ cio_file_native_report_runtime_error();
+
+ free(query_buffer);
+
+ return CIO_ERROR;
+ }
+
+ **(gid_t **) result = query_result->gr_gid;
+
+ free(query_buffer);
+
+ return CIO_OK;
+}
+
+int cio_file_native_apply_acl_and_settings(struct cio_ctx *ctx, struct cio_file *cf)
+{
+ mode_t filesystem_acl;
+ gid_t numeric_group;
+ uid_t numeric_user;
+ int result;
+
+ numeric_group = -1;
+ numeric_user = -1;
+
+ if (ctx->processed_user != NULL) {
+ numeric_user = *(uid_t *) ctx->processed_user;
+ }
+
+ if (ctx->processed_group != NULL) {
+ numeric_group = *(gid_t *) ctx->processed_group;
+ }
+
+ if (numeric_user != -1 || numeric_group != -1) {
+ result = chown(cf->path, numeric_user, numeric_group);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+ }
+
+ if (ctx->options.chmod != NULL) {
+ filesystem_acl = strtoul(ctx->options.chmod, NULL, 8);
+
+ result = chmod(cf->path, filesystem_acl);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ cio_log_error(ctx, "cannot change acl of %s to %s",
+ cf->path, ctx->options.user);
+
+ return CIO_ERROR;
+ }
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_get_size(struct cio_file *cf, size_t *file_size)
+{
+ int ret;
+ struct stat st;
+
+ ret = -1;
+
+ if (cio_file_native_is_open(cf)) {
+ ret = fstat(cf->fd, &st);
+ }
+
+ if (ret == -1) {
+ ret = stat(cf->path, &st);
+ }
+
+ if (ret == -1) {
+ return CIO_ERROR;
+ }
+
+ if (file_size != NULL) {
+ *file_size = st.st_size;
+ }
+
+ return CIO_OK;
+}
+
+char *cio_file_native_compose_path(char *root_path, char *stream_name,
+ char *chunk_name)
+{
+ size_t psize;
+ char *path;
+ int ret;
+
+ /* Compose path for the file */
+ psize = strlen(root_path) +
+ strlen(stream_name) +
+ strlen(chunk_name) +
+ 8;
+
+ path = malloc(psize);
+
+ if (path == NULL) {
+ cio_file_native_report_runtime_error();
+
+ return NULL;
+ }
+
+ ret = snprintf(path, psize, "%s/%s/%s",
+ root_path, stream_name, chunk_name);
+
+ if (ret == -1) {
+ cio_file_native_report_runtime_error();
+
+ free(path);
+
+ return NULL;
+ }
+
+ return path;
+}
+
+int cio_file_native_filename_check(char *name)
+{
+ size_t len;
+
+ len = strlen(name);
+
+ if (len == 0) {
+ return CIO_ERROR;
+ }
+ if (len == 1) {
+ if ((name[0] == '.' || name[0] == '/')) {
+ return CIO_ERROR;
+ }
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_open(struct cio_file *cf)
+{
+ if (cio_file_native_is_open(cf)) {
+ return CIO_OK;
+ }
+
+ /* Open file descriptor */
+ if (cf->flags & CIO_OPEN_RW) {
+ cf->fd = open(cf->path, O_RDWR | O_CREAT, (mode_t) 0600);
+ }
+ else if (cf->flags & CIO_OPEN_RD) {
+ cf->fd = open(cf->path, O_RDONLY);
+ }
+
+ if (cf->fd == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_close(struct cio_file *cf)
+{
+ int result;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (cio_file_native_is_open(cf)) {
+ result = close(cf->fd);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->fd = -1;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_delete_by_path(const char *path)
+{
+ int result;
+
+ result = unlink(path);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_delete(struct cio_file *cf)
+{
+ int result;
+
+ if (cio_file_native_is_open(cf) ||
+ cio_file_native_is_mapped(cf)) {
+ return CIO_ERROR;
+ }
+
+ result = unlink(cf->path);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_sync(struct cio_file *cf, int sync_mode)
+{
+ int result;
+
+ if (sync_mode & CIO_FULL_SYNC) {
+ sync_mode = MS_SYNC;
+ }
+ else {
+ sync_mode = MS_ASYNC;
+ }
+
+ result = msync(cf->map, cf->alloc_size, sync_mode);
+
+ if (result == -1) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_resize(struct cio_file *cf, size_t new_size)
+{
+ int fallocate_available;
+ int result;
+
+ result = -1;
+
+#if defined(CIO_HAVE_FALLOCATE) || defined(CIO_HAVE_POSIX_FALLOCATE)
+ fallocate_available = CIO_TRUE;
+#else
+ fallocate_available = CIO_FALSE;
+#endif
+
+ /*
+ * fallocate() is not portable an Linux only. Since macOS does not have
+ * fallocate() we use ftruncate().
+ */
+ if (fallocate_available && new_size > cf->fs_size) {
+ retry:
+
+ if (cf->allocate_strategy == CIO_FILE_LINUX_FALLOCATE) {
+ /*
+ * To increase the file size we use fallocate() since this option
+ * will send a proper ENOSPC error if the file system ran out of
+ * space. ftruncate() will not fail and upon memcpy() over the
+ * mmap area it will trigger a 'Bus Error' crashing the program.
+ *
+ * fallocate() is not portable, Linux only.
+ */
+#if defined(CIO_HAVE_FALLOCATE)
+ result = fallocate(cf->fd, 0, 0, new_size);
+
+#elif defined(CIO_HAVE_POSIX_FALLOCATE)
+ result = -1;
+ errno = EOPNOTSUPP;
+#endif
+
+ if (result == -1 && errno == EOPNOTSUPP) {
+ /*
+ * If fallocate fails with an EOPNOTSUPP try operation using
+ * posix_fallocate. Required since some filesystems do not support
+ * the fallocate operation e.g. ext3 and reiserfs.
+ */
+ cf->allocate_strategy = CIO_FILE_LINUX_POSIX_FALLOCATE;
+ goto retry;
+ }
+ }
+ else if (cf->allocate_strategy == CIO_FILE_LINUX_POSIX_FALLOCATE) {
+#if defined(CIO_HAVE_POSIX_FALLOCATE)
+ result = posix_fallocate(cf->fd, 0, new_size);
+#else
+ goto fallback;
+#endif
+ }
+ }
+ else
+ {
+#if !defined(CIO_HAVE_POSIX_FALLOCATE)
+ fallback:
+#endif
+
+ result = ftruncate(cf->fd, new_size);
+ }
+
+ if (result) {
+ cio_file_native_report_os_error();
+ }
+ else {
+ cf->fs_size = new_size;
+ }
+
+ return result;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_file_win32.c b/fluent-bit/lib/chunkio/src/cio_file_win32.c
new file mode 100644
index 00000000..18044c40
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_file_win32.c
@@ -0,0 +1,549 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <inttypes.h>
+#include <stdio.h>
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_crc32.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_file_native.h>
+#include <chunkio/cio_file_st.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_stream.h>
+
+int cio_file_native_unmap(struct cio_file *cf)
+{
+ int result;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (!cio_file_native_is_open(cf)) {
+ return CIO_OK;
+ }
+
+ if (!cio_file_native_is_mapped(cf)) {
+ return CIO_OK;
+ }
+
+ result = UnmapViewOfFile(cf->map);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ CloseHandle(cf->backing_mapping);
+
+ cf->backing_mapping = INVALID_HANDLE_VALUE;
+ cf->alloc_size = 0;
+ cf->map = NULL;
+
+ return CIO_OK;
+}
+
+int cio_file_native_map(struct cio_file *cf, size_t map_size)
+{
+ DWORD desired_protection;
+ DWORD desired_access;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (!cio_file_native_is_open(cf)) {
+ return CIO_ERROR;
+ }
+
+ if (cio_file_native_is_mapped(cf)) {
+ return CIO_OK;
+ }
+
+ if (cf->flags & CIO_OPEN_RW) {
+ desired_protection = PAGE_READWRITE;
+ desired_access = FILE_MAP_ALL_ACCESS;
+ }
+ else if (cf->flags & CIO_OPEN_RD) {
+ desired_protection = PAGE_READONLY;
+ desired_access = FILE_MAP_READ;
+ }
+ else {
+ return CIO_ERROR;
+ }
+
+ cf->backing_mapping = CreateFileMappingA(cf->backing_file, NULL,
+ desired_protection,
+ 0, 0, NULL);
+
+ if (cf->backing_mapping == NULL) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->map = MapViewOfFile(cf->backing_mapping, desired_access, 0, 0, map_size);
+
+ if (cf->map == NULL) {
+ cio_file_native_report_os_error();
+
+ CloseHandle(cf->backing_mapping);
+
+ cf->backing_mapping = INVALID_HANDLE_VALUE;
+
+ return CIO_ERROR;
+ }
+
+ cf->alloc_size = map_size;
+
+ return CIO_OK;
+}
+
+int cio_file_native_remap(struct cio_file *cf, size_t new_size)
+{
+ /*
+ * There's no reason for this function to exist because in windows
+ * we need to unmap, resize and then map again so there's no benefit
+ * from remapping and I'm not implementing a dummy version because I
+ * don't want anyone to read it and think there are any reasonable use
+ * cases for it.
+ */
+
+ (void) cf;
+ (void) new_size;
+
+ return CIO_ERROR;
+}
+
+static SID *perform_sid_lookup(char *account_name, SID_NAME_USE *result_sid_type)
+{
+ DWORD referenced_domain_name_length;
+ char referenced_domain_name[256];
+ SID *reallocated_sid_buffer;
+ DWORD sid_buffer_size;
+ size_t retry_index;
+ SID *sid_buffer;
+ SID_NAME_USE sid_type;
+ int result;
+
+ referenced_domain_name_length = 256;
+ sid_buffer_size = 256;
+
+ sid_buffer = calloc(1, sid_buffer_size);
+
+ if (sid_buffer == NULL) {
+ cio_file_native_report_runtime_error();
+
+ return NULL;
+ }
+
+ result = 0;
+ sid_type = SidTypeUnknown;
+
+ for (retry_index = 0 ; retry_index < 5 && !result ; retry_index++) {
+ result = LookupAccountNameA(NULL,
+ account_name,
+ sid_buffer,
+ &sid_buffer_size,
+ referenced_domain_name,
+ &referenced_domain_name_length,
+ &sid_type);
+
+ if (!result) {
+ if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) {
+ sid_buffer_size *= 2;
+
+ reallocated_sid_buffer = realloc(sid_buffer, sid_buffer_size);
+
+ if (reallocated_sid_buffer == NULL) {
+ cio_file_native_report_runtime_error();
+
+ free(sid_buffer);
+
+ return NULL;
+ }
+ }
+ else {
+ cio_file_native_report_os_error();
+
+ free(sid_buffer);
+
+ return NULL;
+ }
+ }
+ }
+
+ if (result_sid_type != NULL) {
+ *result_sid_type = sid_type;
+ }
+
+ return sid_buffer;
+}
+
+static int perform_entity_lookup(char *name,
+ void **result,
+ SID_NAME_USE desired_sid_type)
+{
+ SID_NAME_USE result_sid_type;
+
+ *result = (void **) perform_sid_lookup(name, &result_sid_type);
+
+ if (*result == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (desired_sid_type != result_sid_type) {
+ free(*result);
+
+ *result = NULL;
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_lookup_user(char *user, void **result)
+{
+ return perform_entity_lookup(user, result, SidTypeUser);
+}
+
+int cio_file_native_lookup_group(char *group, void **result)
+{
+ return perform_entity_lookup(group, result, SidTypeGroup);
+}
+
+static DWORD cio_file_win_chown(char *path, SID *user, SID *group)
+{
+ int result;
+
+ /* Ownership here does not work in the same way it works in unixes
+ * so specifying both a user and group will end up with the group
+ * overriding the user if possible which can cause some misunderstandings.
+ */
+
+ result = ERROR_SUCCESS;
+
+ if (user != NULL) {
+ result = SetNamedSecurityInfoA(path, SE_FILE_OBJECT,
+ OWNER_SECURITY_INFORMATION,
+ user, NULL, NULL, NULL);
+ }
+
+ if (group != NULL && result == ERROR_SUCCESS) {
+ result = SetNamedSecurityInfoA(path, SE_FILE_OBJECT,
+ GROUP_SECURITY_INFORMATION,
+ group, NULL, NULL, NULL);
+ }
+
+ return result;
+}
+
+int cio_file_native_apply_acl_and_settings(struct cio_ctx *ctx, struct cio_file *cf)
+{
+ int result;
+
+ if (ctx->processed_user != NULL) {
+ result = cio_file_win_chown(cf->path, ctx->processed_user, ctx->processed_group);
+
+ if (result != ERROR_SUCCESS) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+ }
+
+ return CIO_OK;
+}
+
+static int get_file_size_by_handle(struct cio_file *cf, size_t *file_size)
+{
+ LARGE_INTEGER native_file_size;
+ int ret;
+
+ memset(&native_file_size, 0, sizeof(native_file_size));
+
+ ret = GetFileSizeEx(cf->backing_file, &native_file_size);
+
+ if (ret == 0) {
+ return CIO_ERROR;
+ }
+
+ if (file_size != NULL) {
+ *file_size = (size_t) native_file_size.QuadPart;
+ }
+
+ return CIO_OK;
+}
+
+static int get_file_size_by_path(struct cio_file *cf, size_t *file_size)
+{
+ int ret;
+#ifdef _WIN64
+ struct _stat64 st;
+#else
+ struct _stat32 st;
+#endif
+
+#ifdef _WIN64
+ ret = _stat64(cf->path, &st);
+#else
+ ret = _stat32(cf->path, &st);
+#endif
+
+ if (ret == -1) {
+ return CIO_ERROR;
+ }
+
+ if (file_size != NULL) {
+ *file_size = st.st_size;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_get_size(struct cio_file *cf, size_t *file_size)
+{
+ int ret;
+
+ ret = CIO_ERROR;
+
+ if (cf->backing_file != INVALID_HANDLE_VALUE) {
+ ret = get_file_size_by_handle(cf, file_size);
+ }
+
+ if (ret != CIO_OK) {
+ ret = get_file_size_by_path(cf, file_size);
+ }
+
+ return ret;
+}
+
+char *cio_file_native_compose_path(char *root_path, char *stream_name,
+ char *chunk_name)
+{
+ size_t psize;
+ char *path;
+ int ret;
+
+ /* Compose path for the file */
+ psize = strlen(root_path) +
+ strlen(stream_name) +
+ strlen(chunk_name) +
+ 3;
+
+ path = malloc(psize);
+
+ if (path == NULL) {
+ cio_file_native_report_runtime_error();
+
+ return NULL;
+ }
+
+ ret = snprintf(path, psize, "%s\\%s\\%s",
+ root_path, stream_name, chunk_name);
+
+ if (ret == -1) {
+ cio_file_native_report_runtime_error();
+
+ free(path);
+
+ return NULL;
+ }
+
+ return path;
+}
+
+int cio_file_native_filename_check(char *name)
+{
+ size_t len;
+
+ len = strlen(name);
+
+ if (len == 0) {
+ return CIO_ERROR;
+ }
+ else if (len == 1) {
+ if (name[0] == '\\' || name[0] == '.' || name[0] == '/') {
+ return CIO_ERROR;
+ }
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_open(struct cio_file *cf)
+{
+ DWORD creation_disposition;
+ DWORD desired_access;
+
+ if (cio_file_native_is_open(cf)) {
+ return CIO_OK;
+ }
+
+ if (cf->flags & CIO_OPEN) {
+ desired_access = GENERIC_READ | GENERIC_WRITE;
+ creation_disposition = OPEN_ALWAYS;
+ }
+ else if (cf->flags & CIO_OPEN_RD) {
+ desired_access = GENERIC_READ;
+ creation_disposition = OPEN_EXISTING;
+ }
+ else {
+ return CIO_ERROR;
+ }
+
+ cf->backing_file = CreateFileA(cf->path,
+ desired_access,
+ FILE_SHARE_DELETE |
+ FILE_SHARE_READ |
+ FILE_SHARE_WRITE,
+ NULL,
+ creation_disposition,
+ FILE_ATTRIBUTE_NORMAL,
+ NULL);
+
+ if (cf->backing_file == INVALID_HANDLE_VALUE) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_close(struct cio_file *cf)
+{
+ int result;
+
+ if (cf == NULL) {
+ return CIO_ERROR;
+ }
+
+ if (cio_file_native_is_open(cf)) {
+ result = CloseHandle(cf->backing_file);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->backing_file = INVALID_HANDLE_VALUE;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_delete_by_path(const char *path)
+{
+ int result;
+
+ result = DeleteFileA(path);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_delete(struct cio_file *cf)
+{
+ int result;
+
+ result = DeleteFileA(cf->path);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_sync(struct cio_file *cf, int sync_mode)
+{
+ int result;
+
+ result = FlushViewOfFile(cf->map, cf->alloc_size);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ if (sync_mode & CIO_FULL_SYNC) {
+ result = FlushFileBuffers(cf->backing_file);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+ }
+
+ return CIO_OK;
+}
+
+int cio_file_native_resize(struct cio_file *cf, size_t new_size)
+{
+ LARGE_INTEGER movement_distance;
+ int result;
+
+ if (!cio_file_native_is_open(cf)) {
+ return CIO_ERROR;
+ }
+
+ if (cio_file_native_is_mapped(cf)) {
+ return CIO_ERROR;
+ }
+
+ movement_distance.QuadPart = new_size;
+
+ result = SetFilePointerEx(cf->backing_file,
+ movement_distance,
+ NULL, FILE_BEGIN);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ result = SetEndOfFile(cf->backing_file);
+
+ if (result == 0) {
+ cio_file_native_report_os_error();
+
+ return CIO_ERROR;
+ }
+
+ cf->fs_size = new_size;
+
+ return CIO_OK;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_log.c b/fluent-bit/lib/chunkio/src/cio_log.c
new file mode 100644
index 00000000..8c6e8123
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_log.c
@@ -0,0 +1,87 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_log.h>
+
+void cio_log_print(void *ctx, int level, const char *file, int line,
+ const char *fmt, ...)
+{
+ int ret;
+ char buf[CIO_LOG_BUF_SIZE];
+ va_list args;
+ struct cio_ctx *cio = ctx;
+
+ if (!cio->options.log_cb) {
+ return;
+ }
+
+ if (level > cio->options.log_level) {
+ return;
+ }
+
+ va_start(args, fmt);
+ ret = vsnprintf(buf, CIO_LOG_BUF_SIZE - 1, fmt, args);
+
+ if (ret >= 0) {
+ buf[ret] = '\0';
+ }
+ va_end(args);
+
+ cio->options.log_cb(ctx, level, file, line, buf);
+}
+
+int cio_errno_print(int errnum, const char *file, int line)
+{
+ char buf[256];
+
+ strerror_r(errnum, buf, sizeof(buf) - 1);
+ fprintf(stderr, "[%s:%i errno=%i] %s\n",
+ file, line, errnum, buf);
+ return 0;
+}
+
+#ifdef _WIN32
+void cio_winapi_error_print(const char *func, int line)
+{
+ int error = GetLastError();
+ char buf[256];
+ int success;
+
+ success = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL,
+ error,
+ LANG_SYSTEM_DEFAULT,
+ buf,
+ sizeof(buf),
+ NULL);
+ if (success) {
+ fprintf(stderr, "[%s() line=%i error=%i] %s\n", func, line, error, buf);
+ }
+ else {
+ fprintf(stderr, "[%s() line=%i error=%i] Win32 API failed\n", func, line, error);
+ }
+}
+#endif
diff --git a/fluent-bit/lib/chunkio/src/cio_memfs.c b/fluent-bit/lib/chunkio/src/cio_memfs.c
new file mode 100644
index 00000000..8cd0f6c1
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_memfs.c
@@ -0,0 +1,156 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_utils.h>
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/cio_memfs.h>
+#include <chunkio/cio_log.h>
+
+#include <stdio.h>
+#include <string.h>
+#include <limits.h>
+
+struct cio_memfs *cio_memfs_open(struct cio_ctx *ctx, struct cio_stream *st,
+ struct cio_chunk *ch, int flags,
+ size_t size)
+{
+ struct cio_memfs *mf;
+
+ (void) flags;
+ (void) ctx;
+ (void) ch;
+ (void) st;
+
+ mf = calloc(1, sizeof(struct cio_memfs));
+ if (!mf) {
+ cio_errno();
+ return NULL;
+ }
+ mf->crc_cur = cio_crc32_init();
+
+ mf->buf_data = malloc(size);
+ if (!mf->buf_data) {
+ cio_errno();
+ free(mf->name);
+ free(mf);
+ return NULL;
+ }
+ mf->buf_size = size;
+ mf->buf_len = 0;
+ if (ctx->realloc_size_hint > 0) {
+ mf->realloc_size = ctx->realloc_size_hint;
+ } else {
+ mf->realloc_size = cio_getpagesize() * 8;
+ }
+
+ return mf;
+}
+
+void cio_memfs_close(struct cio_chunk *ch)
+{
+ struct cio_memfs *mf = ch->backend;
+
+ if (!mf) {
+ return;
+ }
+
+ free(mf->name);
+ free(mf->buf_data);
+ free(mf->meta_data);
+ free(mf);
+}
+
+int cio_memfs_write(struct cio_chunk *ch, const void *buf, size_t count)
+{
+ size_t av_size;
+ size_t new_size;
+ char *tmp;
+ struct cio_memfs *mf = ch->backend;
+
+ if (count == 0) {
+ return 0;
+ }
+
+ /* Calculate available size */
+ av_size = (mf->buf_size - mf->buf_len);
+ if (av_size < count) {
+
+ /* Suggest initial new size */
+ new_size = mf->buf_size + mf->realloc_size;
+ while (new_size < (mf->buf_len + count)) {
+ new_size += mf->realloc_size;
+ }
+
+ tmp = realloc(mf->buf_data, new_size);
+ if (!tmp) {
+ cio_errno();
+ return -1;
+ }
+
+ mf->buf_data = tmp;
+ mf->buf_size = new_size;
+ }
+
+ memcpy(mf->buf_data + mf->buf_len, buf, count);
+ mf->buf_len += count;
+
+ return 0;
+}
+
+int cio_memfs_content_copy(struct cio_chunk *ch,
+ void **out_buf, size_t *out_size)
+{
+ char *buf;
+ struct cio_memfs *mf = ch->backend;
+
+ buf = malloc(mf->buf_len + 1);
+ if (!buf) {
+ cio_errno();
+ return -1;
+ }
+
+ /* Copy the data and append an extra NULL byte */
+ memcpy(buf, mf->buf_data, mf->buf_len);
+ buf[mf->buf_len] = '\0';
+
+ *out_buf = buf;
+ *out_size = mf->buf_len;
+
+ return 0;
+}
+
+void cio_memfs_scan_dump(struct cio_ctx *ctx, struct cio_stream *st)
+{
+ char tmp[PATH_MAX];
+ struct mk_list *head;
+ struct cio_memfs *mf;
+ struct cio_chunk *ch;
+
+ (void) ctx;
+
+ mk_list_foreach(head, &st->chunks) {
+ ch = mk_list_entry(head, struct cio_chunk, _head);
+ mf = ch->backend;
+
+ snprintf(tmp, sizeof(tmp) -1, "%s/%s", ch->st->name, ch->name);
+ printf(" %-60s", tmp);
+ printf("meta_len=%i, data_size=%zu\n", mf->meta_len, mf->buf_len);
+ }
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_meta.c b/fluent-bit/lib/chunkio/src/cio_meta.c
new file mode 100644
index 00000000..03e852e0
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_meta.c
@@ -0,0 +1,180 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+#include <string.h>
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_file_st.h>
+#include <chunkio/cio_memfs.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_log.h>
+
+/*
+ * Metadata is an optional information stored before the content of each file
+ * and can be used for different purposes. Manipulating metadata can have
+ * some performance impacts depending on 'when' it's added and how often
+ * is modified.
+ *
+ * For performance reasons, we suggest the metadata be stored before to write
+ * any data to the content area, otherwise if metadata grows in terms of bytes
+ * we need to move all the content data to a different position which is not
+ * ideal.
+ *
+ * The caller might want to fix the performance penalties setting up some
+ * empty metadata with specific sizes.
+ */
+
+int cio_meta_write(struct cio_chunk *ch, char *buf, size_t size)
+{
+ struct cio_memfs *mf;
+
+ if (size > 65535) {
+ return -1;
+ }
+
+ if (ch->st->type == CIO_STORE_MEM) {
+ mf = (struct cio_memfs *) ch->backend;
+ if (mf->meta_data) {
+ free(mf->meta_data);
+ }
+
+ mf->meta_data = malloc(size);
+ if (!mf->meta_data) {
+ cio_errno();
+ return -1;
+ }
+ memcpy(mf->meta_data, buf, size);
+ mf->meta_len = size;
+ return 0;
+ }
+ else if (ch->st->type == CIO_STORE_FS) {
+ return cio_file_write_metadata(ch, buf, size);
+ }
+ return -1;
+}
+
+int cio_meta_size(struct cio_chunk *ch) {
+ if (ch->st->type == CIO_STORE_MEM) {
+ struct cio_memfs *mf = (struct cio_memfs *) ch->backend;
+ return mf->meta_len;
+ }
+ else if (ch->st->type == CIO_STORE_FS) {
+ if (cio_file_read_prepare(ch->ctx, ch)) {
+ return -1;
+ }
+ struct cio_file *cf = ch->backend;
+ return cio_file_st_get_meta_len(cf->map);
+ }
+
+ return -1;
+}
+
+int cio_meta_read(struct cio_chunk *ch, char **meta_buf, int *meta_len)
+{
+ int len;
+ char *meta;
+ struct cio_file *cf;
+ struct cio_memfs *mf;
+
+ /* In-memory type */
+ if (ch->st->type == CIO_STORE_MEM) {
+ mf = (struct cio_memfs *) ch->backend;
+
+ /* no metadata */
+ if (!mf->meta_data) {
+ return -1;
+ }
+
+ *meta_buf = mf->meta_data;
+ *meta_len = mf->meta_len;
+
+ return 0;
+ }
+ else if (ch->st->type == CIO_STORE_FS) {
+ if (cio_file_read_prepare(ch->ctx, ch)) {
+ return -1;
+ }
+
+ cf = ch->backend;
+ len = cio_file_st_get_meta_len(cf->map);
+ if (len <= 0) {
+ return -1;
+ }
+
+ meta = cio_file_st_get_meta(cf->map);
+ *meta_buf = meta;
+ *meta_len = len;
+
+ return 0;
+ }
+
+ return -1;
+
+}
+
+int cio_meta_cmp(struct cio_chunk *ch, char *meta_buf, int meta_len)
+{
+ int len;
+ char *meta;
+ struct cio_file *cf = ch->backend;
+ struct cio_memfs *mf;
+
+ /* In-memory type */
+ if (ch->st->type == CIO_STORE_MEM) {
+ mf = (struct cio_memfs *) ch->backend;
+
+ /* no metadata */
+ if (!mf->meta_data) {
+ return -1;
+ }
+
+ /* different lengths */
+ if (mf->meta_len != meta_len) {
+ return -1;
+ }
+
+ /* perfect match */
+ if (memcmp(mf->meta_data, meta_buf, meta_len) == 0) {
+ return 0;
+ }
+
+ return -1;
+ }
+
+ if (cio_file_read_prepare(ch->ctx, ch)) {
+ return -1;
+ }
+
+ /* File system type */
+ len = cio_file_st_get_meta_len(cf->map);
+ if (len != meta_len) {
+ return -1;
+ }
+
+ /* compare metadata */
+ meta = cio_file_st_get_meta(cf->map);
+ if (memcmp(meta, meta_buf, meta_len) == 0) {
+ return 0;
+ }
+
+ return -1;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_os.c b/fluent-bit/lib/chunkio/src/cio_os.c
new file mode 100644
index 00000000..0adbc617
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_os.c
@@ -0,0 +1,134 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+
+#include <chunkio/chunkio_compat.h>
+
+#ifdef _WIN32
+#include <Shlobj.h>
+#endif
+
+/* Check if a path is a directory */
+int cio_os_isdir(const char *dir)
+{
+ int ret;
+ struct stat st;
+
+ ret = stat(dir, &st);
+ if (ret == -1) {
+ return -1;
+ }
+
+ if (st.st_mode & S_IFDIR) {
+ return 0;
+ }
+
+ return -1;
+}
+
+/* Create directory */
+int cio_os_mkpath(const char *dir, mode_t mode)
+{
+ struct stat st;
+
+#ifdef _WIN32
+ char path[MAX_PATH];
+#else
+# ifdef __APPLE__
+ char *parent_dir = NULL;
+ char *path = NULL;
+# endif
+ char *dup_dir;
+#endif
+
+ if (!dir) {
+ errno = EINVAL;
+ return 1;
+ }
+
+ if (strlen(dir) == 0) {
+ errno = EINVAL;
+ return 1;
+ }
+
+ if (!stat(dir, &st)) {
+ return 0;
+ }
+
+#ifdef _WIN32
+ (void) mode;
+
+ if (_fullpath(path, dir, MAX_PATH) == NULL) {
+ return 1;
+ }
+
+ if (SHCreateDirectoryExA(NULL, path, NULL) != ERROR_SUCCESS) {
+ return 1;
+ }
+ return 0;
+#elif __APPLE__
+ dup_dir = strdup(dir);
+ if (!dup_dir) {
+ return -1;
+ }
+
+ /* macOS's dirname(3) should return current directory when slash
+ * charachter is not included in passed string.
+ * And note that macOS's dirname(3) does not modify passed string.
+ */
+ parent_dir = dirname(dup_dir);
+ if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) {
+ if (S_ISDIR (st.st_mode)) {
+ mkdir(dup_dir, mode);
+ free(dup_dir);
+ return 0;
+ }
+ }
+
+ /* Create directories straightforward except for the last one hierarchy. */
+ for (path = strchr(dup_dir + 1, '/'); path; path = strchr(path + 1, '/')) {
+ *path = '\0';
+ if (mkdir(dup_dir, mode) == -1) {
+ if (errno != EEXIST) {
+ *path = '/';
+ return -1;
+ }
+ }
+ *path = '/';
+ }
+
+ free(dup_dir);
+ return mkdir(dir, mode);
+#else
+ dup_dir = strdup(dir);
+ if (!dup_dir) {
+ return 1;
+ }
+ cio_os_mkpath(dirname(dup_dir), mode);
+ free(dup_dir);
+ return mkdir(dir, mode);
+#endif
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_scan.c b/fluent-bit/lib/chunkio/src/cio_scan.c
new file mode 100644
index 00000000..5c90641d
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_scan.c
@@ -0,0 +1,190 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_file.h>
+#include <chunkio/cio_memfs.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_error.h>
+#include <chunkio/cio_log.h>
+
+#ifdef _WIN32
+#include "win32/dirent.h"
+#endif
+
+#ifdef CIO_HAVE_BACKEND_FILESYSTEM
+static int cio_scan_stream_files(struct cio_ctx *ctx, struct cio_stream *st,
+ char *chunk_extension)
+{
+ int len;
+ int ret;
+ int err;
+ int ext_off;
+ int ext_len = 0;
+ char *path;
+ DIR *dir;
+ struct dirent *ent;
+
+ len = strlen(ctx->options.root_path) + strlen(st->name) + 2;
+ path = malloc(len);
+ if (!path) {
+ cio_errno();
+ return -1;
+ }
+
+ ret = snprintf(path, len, "%s/%s", ctx->options.root_path, st->name);
+ if (ret == -1) {
+ cio_errno();
+ free(path);
+ return -1;
+ }
+
+ dir = opendir(path);
+ if (!dir) {
+ cio_errno();
+ free(path);
+ return -1;
+ }
+
+ if (chunk_extension) {
+ ext_len = strlen(chunk_extension);
+ }
+
+ cio_log_debug(ctx, "[cio scan] opening stream %s", st->name);
+
+ /* Iterate the root_path */
+ while ((ent = readdir(dir)) != NULL) {
+ if ((ent->d_name[0] == '.') || (strcmp(ent->d_name, "..") == 0)) {
+ continue;
+ }
+
+ /* Look just for directories */
+ if (ent->d_type != DT_REG) {
+ continue;
+ }
+
+ /* Check the file matches the desired extension (if set) */
+ if (chunk_extension) {
+ len = strlen(ent->d_name);
+ if (len <= ext_len) {
+ continue;
+ }
+
+ ext_off = len - ext_len;
+ if (strncmp(ent->d_name + ext_off, chunk_extension, ext_len) != 0) {
+ continue;
+ }
+ }
+
+ ctx->last_chunk_error = 0;
+
+ /* register every directory as a stream */
+ cio_chunk_open(ctx, st, ent->d_name, ctx->options.flags, 0, &err);
+
+ if (ctx->options.flags & CIO_DELETE_IRRECOVERABLE) {
+ if (err == CIO_CORRUPTED) {
+ if (ctx->last_chunk_error == CIO_ERR_BAD_FILE_SIZE ||
+ ctx->last_chunk_error == CIO_ERR_BAD_LAYOUT)
+ {
+ cio_log_error(ctx, "[cio scan] discarding irrecoverable chunk");
+
+ cio_chunk_delete(ctx, st, ent->d_name);
+ }
+ }
+ }
+ }
+
+ closedir(dir);
+ free(path);
+
+ return 0;
+}
+
+/* Given a cio context, scan it root_path and populate stream/files */
+int cio_scan_streams(struct cio_ctx *ctx, char *chunk_extension)
+{
+ DIR *dir;
+ struct dirent *ent;
+ struct cio_stream *st;
+
+ dir = opendir(ctx->options.root_path);
+ if (!dir) {
+ cio_errno();
+ return -1;
+ }
+
+ cio_log_debug(ctx, "[cio scan] opening path %s", ctx->options.root_path);
+
+ /* Iterate the root_path */
+ while ((ent = readdir(dir)) != NULL) {
+ if ((ent->d_name[0] == '.') || (strcmp(ent->d_name, "..") == 0)) {
+ continue;
+ }
+
+ /* Look just for directories */
+ if (ent->d_type != DT_DIR) {
+ continue;
+ }
+
+ /* register every directory as a stream */
+ st = cio_stream_create(ctx, ent->d_name, CIO_STORE_FS);
+ if (st) {
+ cio_scan_stream_files(ctx, st, chunk_extension);
+ }
+ }
+
+ closedir(dir);
+ return 0;
+}
+#else
+int cio_scan_streams(struct cio_ctx *ctx)
+{
+ cio_log_error(ctx, "[cio scan] file system backend not supported");
+ return -1;
+}
+#endif
+
+void cio_scan_dump(struct cio_ctx *ctx)
+{
+ struct mk_list *head;
+ struct cio_stream *st;
+
+ cio_log_info(ctx, "scan dump of %s", ctx->options.root_path);
+
+ /* Iterate streams */
+ mk_list_foreach(head, &ctx->streams) {
+ st = mk_list_entry(head, struct cio_stream, _head);
+ printf(" stream:%-60s%i chunks\n",
+ st->name, mk_list_size(&st->chunks));
+
+ if (st->type == CIO_STORE_MEM) {
+ cio_memfs_scan_dump(ctx, st);
+ }
+ else if (st->type == CIO_STORE_FS) {
+ cio_file_scan_dump(ctx, st);
+ }
+ }
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_sha1.c b/fluent-bit/lib/chunkio/src/cio_sha1.c
new file mode 100644
index 00000000..1748b683
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_sha1.c
@@ -0,0 +1,68 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Just a simple wrapper over sha1 routines */
+
+#include <stdio.h>
+#include <string.h>
+#include <chunkio/cio_sha1.h>
+
+void cio_sha1_init(struct cio_sha1 *ctx)
+{
+ SHA1_Init(&ctx->sha);
+}
+
+void cio_sha1_update(struct cio_sha1 *ctx, const void *data, unsigned long len)
+{
+ SHA1_Update(&ctx->sha, data, len);
+}
+
+void cio_sha1_final(unsigned char hash[20], struct cio_sha1 *ctx)
+{
+ SHA1_Final(hash, &ctx->sha);
+}
+
+void cio_sha1_hash(const void *data_in, unsigned long length,
+ unsigned char *data_out, void *state)
+{
+ SHA_CTX sha;
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, data_in, length);
+
+ /*
+ * If state is not NULL, make a copy of the SHA context for future
+ * iterations and updates.
+ */
+ if (state != NULL) {
+ memcpy(state, &sha, sizeof(SHA_CTX));
+ }
+
+ SHA1_Final(data_out, &sha);
+}
+
+void cio_sha1_to_hex(unsigned char *in, char *out)
+{
+ int i;
+
+ for (i = 0; i < 20; ++i) {
+ sprintf(&out[i*2], "%02x", in[i]);
+ }
+
+ out[40] = '\0';
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_stats.c b/fluent-bit/lib/chunkio/src/cio_stats.c
new file mode 100644
index 00000000..2135f66f
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_stats.c
@@ -0,0 +1,79 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_stats.h>
+
+void cio_stats_get(struct cio_ctx *ctx, struct cio_stats *stats)
+{
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct cio_chunk *ch;
+ struct cio_stream *stream;
+
+ memset(stats, 0, sizeof(struct cio_stats));
+
+ /* Iterate each stream */
+ mk_list_foreach(head, &ctx->streams) {
+ stream = mk_list_entry(head, struct cio_stream, _head);
+ stats->streams_total++;
+
+ /* Iterate chunks */
+ mk_list_foreach(f_head, &stream->chunks) {
+ stats->chunks_total++;
+
+ if (stream->type == CIO_STORE_MEM) {
+ stats->chunks_mem++;
+ continue;
+ }
+
+ /* Only applicable for 'file' type chunks */
+ ch = mk_list_entry(f_head, struct cio_chunk, _head);
+ stats->chunks_fs++;
+
+ if (cio_chunk_is_up(ch) == CIO_TRUE) {
+ stats->chunks_fs_up++;
+ }
+ else {
+ stats->chunks_fs_down++;
+ }
+ }
+ }
+}
+
+void cio_stats_print_summary(struct cio_ctx *ctx)
+{
+ struct cio_stats st;
+
+ /* retrieve stats */
+ cio_stats_get(ctx, &st);
+
+ printf("======== Chunk I/O Stats ========\n");
+ printf("- streams total : %i\n", st.streams_total);
+ printf("- chunks total : %i\n", st.chunks_total);
+ printf("- chunks memfs total: %i\n", st.chunks_mem);
+ printf("- chunks file total : %i\n", st.chunks_fs);
+ printf(" - files up : %i\n", st.chunks_fs_up);
+ printf(" - files down : %i\n", st.chunks_fs_down);
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_stream.c b/fluent-bit/lib/chunkio/src/cio_stream.c
new file mode 100644
index 00000000..547d7eeb
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_stream.c
@@ -0,0 +1,276 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <chunkio/chunkio_compat.h>
+
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_os.h>
+#include <chunkio/cio_log.h>
+#include <chunkio/cio_chunk.h>
+#include <chunkio/cio_stream.h>
+#include <chunkio/cio_utils.h>
+
+#include <monkey/mk_core/mk_list.h>
+
+static char *get_stream_path(struct cio_ctx *ctx, struct cio_stream *st)
+{
+ int ret;
+ int len;
+ char *p;
+
+ /* Compose final path */
+ len = strlen(ctx->options.root_path) + strlen(st->name) + 2;
+ p = malloc(len + 1);
+ if (!p) {
+ cio_errno();
+ return NULL;
+ }
+
+ ret = snprintf(p, len, "%s/%s", ctx->options.root_path, st->name);
+ if (ret == -1) {
+ cio_errno();
+ free(p);
+ return NULL;
+ }
+
+ return p;
+}
+
+static int check_stream_path(struct cio_ctx *ctx, const char *path)
+{
+ int ret;
+ int len;
+ char *p;
+
+ /* Compose final path */
+ len = strlen(ctx->options.root_path) + strlen(path) + 2;
+ p = malloc(len + 1);
+ if (!p) {
+ cio_errno();
+ return -1;
+ }
+ ret = snprintf(p, len, "%s/%s", ctx->options.root_path, path);
+ if (ret == -1) {
+ cio_errno();
+ free(p);
+ return -1;
+ }
+
+ ret = cio_os_isdir(p);
+ if (ret == -1) {
+ /* Try to create the path */
+ ret = cio_os_mkpath(p, 0755);
+ if (ret == -1) {
+ cio_log_error(ctx, "cannot create stream path %s", p);
+ free(p);
+ return -1;
+ }
+ cio_log_debug(ctx, "created stream path %s", p);
+ free(p);
+ return 0;
+ }
+
+ /* Check write access and release*/
+ ret = access(p, W_OK);
+ free(p);
+ return ret;
+}
+
+struct cio_stream *cio_stream_get(struct cio_ctx *ctx, const char *name)
+{
+ struct mk_list *head;
+ struct cio_stream *st;
+
+ mk_list_foreach(head, &ctx->streams) {
+ st = mk_list_entry(head, struct cio_stream, _head);
+ if (strcmp(st->name, name) == 0) {
+ return st;
+ }
+ }
+
+ return NULL;
+}
+
+struct cio_stream *cio_stream_create(struct cio_ctx *ctx, const char *name,
+ int type)
+{
+ int ret;
+ int len;
+ struct cio_stream *st;
+
+ if (!name) {
+ cio_log_error(ctx, "[stream create] stream name not set");
+ return NULL;
+ }
+
+ len = strlen(name);
+ if (len == 0) {
+ cio_log_error(ctx, "[stream create] invalid stream name");
+ return NULL;
+ }
+
+ if (len == 1 && (name[0] == '.' || name[0] == '/')) {
+ cio_log_error(ctx, "[stream create] invalid stream name");
+ return NULL;
+ }
+#ifndef CIO_HAVE_BACKEND_FILESYSTEM
+ if (type == CIO_STORE_FS) {
+ cio_log_error(ctx, "[stream create] file system backend not supported");
+ return NULL;
+ }
+#endif
+
+ /* Find duplicated */
+ st = cio_stream_get(ctx, name);
+ if (st) {
+ cio_log_error(ctx, "[cio stream] stream already registered: %s", name);
+ return NULL;
+ }
+
+ /* If backend is the file system, validate the stream path */
+ if (type == CIO_STORE_FS) {
+ ret = check_stream_path(ctx, name);
+ if (ret == -1) {
+ return NULL;
+ }
+ }
+
+ st = malloc(sizeof(struct cio_stream));
+ if (!st) {
+ cio_errno();
+ return NULL;
+ }
+ st->type = type;
+ st->name = strdup(name);
+ if (!st->name) {
+ cio_errno();
+ free(st);
+ return NULL;
+ }
+
+ st->parent = ctx;
+ mk_list_init(&st->chunks);
+ mk_list_init(&st->chunks_up);
+ mk_list_init(&st->chunks_down);
+ mk_list_add(&st->_head, &ctx->streams);
+
+ cio_log_debug(ctx, "[cio stream] new stream registered: %s", name);
+ return st;
+}
+
+void cio_stream_destroy(struct cio_stream *st)
+{
+ if (!st) {
+ return;
+ }
+ /* close all files */
+ cio_chunk_close_stream(st);
+
+ /* destroy stream */
+ mk_list_del(&st->_head);
+ free(st->name);
+ free(st);
+}
+
+/* Deletes a complete stream, this include all chunks available */
+int cio_stream_delete(struct cio_stream *st)
+{
+ int ret;
+ char *path;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct cio_chunk *ch;
+ struct cio_ctx *ctx;
+
+ ctx = st->parent;
+
+ /* delete all chunks */
+ mk_list_foreach_safe(head, tmp, &st->chunks) {
+ ch = mk_list_entry(head, struct cio_chunk, _head);
+ cio_chunk_close(ch, CIO_TRUE);
+ }
+
+#ifdef CIO_HAVE_BACKEND_FILESYSTEM
+ /* If the stream is filesystem based, destroy the real directory */
+ if (st->type == CIO_STORE_FS) {
+ path = get_stream_path(ctx, st);
+ if (!path) {
+ cio_log_error(ctx,
+ "content from stream '%s' has been deleted, but the "
+ "directory might still exists.", path);
+ return -1;
+ }
+
+ cio_log_debug(ctx, "[cio stream] delete stream path: %s", path);
+
+ /* Recursive deletion */
+ ret = cio_utils_recursive_delete(path);
+ if (ret == -1) {
+ cio_log_error(ctx, "error in recursive deletion of path %s", path);
+ free(path);
+ return -1;
+ }
+ free(path);
+
+ return ret;
+ }
+#endif
+
+ return 0;
+}
+
+void cio_stream_destroy_all(struct cio_ctx *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct cio_stream *st;
+
+ if (!ctx) {
+ return;
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->streams) {
+ st = mk_list_entry(head, struct cio_stream, _head);
+ cio_stream_destroy(st);
+ }
+}
+
+/* Return the total number of bytes being used by Chunks up in memory */
+size_t cio_stream_size_chunks_up(struct cio_stream *st)
+{
+ ssize_t bytes;
+ size_t total = 0;
+ struct cio_chunk *ch;
+ struct mk_list *head;
+
+ mk_list_foreach(head, &st->chunks_up) {
+ ch = mk_list_entry(head, struct cio_chunk, _state_head);
+
+ bytes = cio_chunk_get_content_size(ch);
+ if (bytes <= 0) {
+ continue;
+ }
+ total += bytes;
+ }
+
+ return total;
+}
diff --git a/fluent-bit/lib/chunkio/src/cio_utils.c b/fluent-bit/lib/chunkio/src/cio_utils.c
new file mode 100644
index 00000000..45cb0ae8
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/cio_utils.c
@@ -0,0 +1,258 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#ifndef _MSC_VER
+#include <fts.h>
+#endif
+
+#include <chunkio/cio_info.h>
+#include <chunkio/chunkio_compat.h>
+#include <chunkio/chunkio.h>
+#include <chunkio/cio_log.h>
+
+#ifndef _MSC_VER
+/*
+ * Taken from StackOverflow:
+ *
+ * https://stackoverflow.com/questions/2256945/removing-a-non-empty-directory-programmatically-in-c-or-c
+ */
+int cio_utils_recursive_delete(const char *dir)
+{
+ int ret = 0;
+ FTS *ftsp = NULL;
+ FTSENT *curr;
+ char *files[] = { (char *) dir, NULL };
+ struct stat st;
+
+ ret = stat(dir, &st);
+ if (ret == -1) {
+ return -1;
+ }
+
+ ftsp = fts_open(files, FTS_NOCHDIR | FTS_PHYSICAL | FTS_XDEV, NULL);
+ if (!ftsp) {
+ fprintf(stderr, "%s: fts_open failed: %s\n", dir, strerror(errno));
+ ret = -1;
+ goto finish;
+ }
+
+ while ((curr = fts_read(ftsp))) {
+ switch (curr->fts_info) {
+ case FTS_NS:
+ case FTS_DNR:
+ case FTS_ERR:
+ fprintf(stderr, "%s: fts_read error: %s\n",
+ curr->fts_accpath, strerror(curr->fts_errno));
+ break;
+ case FTS_DC:
+ case FTS_DOT:
+ case FTS_NSOK:
+ break;
+ case FTS_D:
+ break;
+ case FTS_DP:
+ case FTS_F:
+ case FTS_SL:
+ case FTS_SLNONE:
+ case FTS_DEFAULT:
+ if (remove(curr->fts_accpath) < 0) {
+ fprintf(stderr, "%s: Failed to remove: %s\n",
+ curr->fts_path, strerror(errno));
+ ret = -1;
+ }
+ break;
+ }
+ }
+
+ finish:
+ if (ftsp) {
+ fts_close(ftsp);
+ }
+
+ return ret;
+}
+#else
+static int cio_utils_recursive_delete_handler(const char *path,
+ size_t current_depth,
+ size_t depth_limit)
+{
+ char search_path[MAX_PATH];
+ char entry_path[MAX_PATH];
+ DWORD target_file_flags;
+ HANDLE find_file_handle;
+ WIN32_FIND_DATAA find_file_data;
+ int error_detected;
+ DWORD result;
+
+ result = snprintf(search_path, sizeof(search_path) - 1, "%s\\*", path);
+
+ if (result <= 0) {
+ return CIO_ERROR;
+ }
+
+ find_file_handle = FindFirstFileA(search_path, &find_file_data);
+
+ if (find_file_handle == INVALID_HANDLE_VALUE) {
+ return CIO_ERROR;
+ }
+
+ target_file_flags = FILE_ATTRIBUTE_NORMAL | FILE_ATTRIBUTE_ARCHIVE;
+ error_detected = CIO_FALSE;
+ result = 0;
+
+ do {
+ if (strcmp(find_file_data.cFileName, ".") != 0 &&
+ strcmp(find_file_data.cFileName, "..") != 0) {
+
+ result = snprintf(entry_path, sizeof(entry_path) - 1, "%s\\%s", path,
+ find_file_data.cFileName);
+
+ if (result > 0) {
+ if (find_file_data.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
+ if (current_depth < depth_limit) {
+ result = (DWORD) cio_utils_recursive_delete_handler(entry_path,
+ current_depth + 1,
+ depth_limit);
+
+ if (result != CIO_OK) {
+ error_detected = CIO_TRUE;
+ }
+ }
+ else {
+ error_detected = CIO_TRUE;
+ }
+ }
+ else if (find_file_data.dwFileAttributes & target_file_flags) {
+ result = DeleteFileA(entry_path);
+
+ if (result == 0) {
+ error_detected = CIO_TRUE;
+ }
+ }
+
+ }
+ else {
+ error_detected = CIO_TRUE;
+ }
+ }
+
+ if (error_detected == CIO_FALSE) {
+ result = FindNextFile(find_file_handle, &find_file_data);
+
+ if (result == 0) {
+ result = GetLastError();
+
+ if (result != ERROR_NO_MORE_FILES) {
+ error_detected = CIO_TRUE;
+ }
+
+ break;
+ }
+ }
+ }
+ while (error_detected == CIO_FALSE);
+
+ FindClose(find_file_handle);
+
+ if (error_detected) {
+ return CIO_ERROR;
+ }
+
+ result = RemoveDirectoryA(path);
+
+ if (result == 0) {
+ return CIO_ERROR;
+ }
+
+ return CIO_OK;
+}
+
+int cio_utils_recursive_delete(const char *dir)
+{
+ DWORD result;
+
+ result = cio_utils_recursive_delete_handler(dir, 0, 100);
+
+ if (result != CIO_OK) {
+ return -1;
+ }
+
+ return 0;
+}
+#endif
+
+int cio_utils_read_file(const char *path, char **buf, size_t *size)
+{
+ int ret;
+ char *data;
+ FILE *fp;
+ struct stat st;
+
+ fp = fopen(path, "rb");
+ if (fp == NULL) {
+ perror("fopen");
+ return -1;
+ }
+
+ ret = fstat(fileno(fp), &st);
+ if (ret == -1) {
+ fclose(fp);
+ perror("fstat");
+ return -1;
+ }
+ if (!S_ISREG(st.st_mode)) {
+ fclose(fp);
+ return -1;
+ }
+
+ data = calloc(st.st_size, 1);
+ if (!data) {
+ perror("calloc");
+ fclose(fp);
+ return -1;
+ }
+
+ ret = fread(data, st.st_size, 1, fp);
+ if (ret != 1) {
+ free(data);
+ fclose(fp);
+ return -1;
+ }
+ fclose(fp);
+
+ *buf = data;
+ *size = st.st_size;
+
+ return 0;
+}
+
+#ifdef CIO_HAVE_GETPAGESIZE
+int cio_getpagesize()
+{
+ return getpagesize();
+}
+#endif
diff --git a/fluent-bit/lib/chunkio/src/win32/dirent.c b/fluent-bit/lib/chunkio/src/win32/dirent.c
new file mode 100644
index 00000000..6ea57f96
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/win32/dirent.c
@@ -0,0 +1,135 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This module implements <dirent.h> emulation layer based on
+ * Win32's FIndFirstFile/FindNextFile API.
+ */
+
+#include <Windows.h>
+#include <shlwapi.h>
+
+#include "dirent.h"
+
+struct CIO_WIN32_DIR {
+ HANDLE h;
+ char *pattern;
+ int count;
+ WIN32_FIND_DATA find_data;
+ struct cio_win32_dirent dir;
+};
+
+/*
+ * Guess POSIX flle type from Win32 file attributes.
+ */
+static unsigned char get_filetype(int dwFileAttributes)
+{
+ if (dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
+ return DT_DIR;
+ }
+ else if (dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
+ return DT_LNK;
+ }
+
+ return DT_REG;
+}
+
+/*
+ * Construct a match pattern (e.g. 'c:\var\data\*')
+ */
+static char *create_pattern(const char *path)
+{
+ char *buf;
+ size_t len = strlen(path);
+ size_t buflen = len + 3;
+
+ buf = malloc(buflen);
+ if (buf == NULL) {
+ return NULL;
+ }
+
+ strcpy_s(buf, buflen, path);
+
+ if (path[len - 1] == '\\') {
+ strcat_s(buf, buflen, "*");
+ }
+ else {
+ strcat_s(buf, buflen, "\\*");
+ }
+ return buf;
+}
+
+struct CIO_WIN32_DIR *cio_win32_opendir(const char *path)
+{
+ struct CIO_WIN32_DIR *d;
+
+ if (!PathIsDirectoryA(path)) {
+ return NULL;
+ }
+
+ d = calloc(1, sizeof(struct CIO_WIN32_DIR));
+ if (d == NULL) {
+ return NULL;
+ }
+
+ d->pattern = create_pattern(path);
+ if (d->pattern == NULL) {
+ free(d);
+ return NULL;
+ }
+
+ d->h = FindFirstFileA(d->pattern, &d->find_data);
+ if (d->h == INVALID_HANDLE_VALUE) {
+ return d;
+ }
+ return d;
+}
+
+struct cio_win32_dirent *cio_win32_readdir(struct CIO_WIN32_DIR *d)
+{
+ if (d->h == INVALID_HANDLE_VALUE) {
+ return NULL;
+ }
+
+ /*
+ * The initial entry should be retrieved by FindFirstFile(),
+ * so we can skip FindNextFile() on the first call.
+ */
+ if (d->count > 0) {
+ if (FindNextFile(d->h, &d->find_data) == 0) {
+ return NULL;
+ }
+ }
+
+ d->count++;
+ d->dir.d_name = d->find_data.cFileName;
+ d->dir.d_type = get_filetype(d->find_data.dwFileAttributes);
+
+ return &d->dir;
+}
+
+int cio_win32_closedir(struct CIO_WIN32_DIR *d)
+{
+ if (d->h != INVALID_HANDLE_VALUE) {
+ FindClose(d->h);
+ }
+ free(d->pattern);
+ free(d);
+ return 0;
+}
diff --git a/fluent-bit/lib/chunkio/src/win32/dirent.h b/fluent-bit/lib/chunkio/src/win32/dirent.h
new file mode 100644
index 00000000..b21148a6
--- /dev/null
+++ b/fluent-bit/lib/chunkio/src/win32/dirent.h
@@ -0,0 +1,59 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Chunk I/O
+ * =========
+ * Copyright 2018-2019 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * POSIX <dirent.h> emulation for Windows.
+ *
+ * This header file provies a drop-in replacement of opendir(),
+ * readdir() and closedir() for Windows platform.
+ */
+
+#ifndef CIO_WIN32_DIRENT
+#define CIO_WIN32_DIRENT
+
+struct CIO_WIN32_DIR;
+
+struct cio_win32_dirent {
+ int d_ino;
+ int d_off;
+ unsigned short d_reclen;
+ unsigned char d_type;
+ char *d_name;
+};
+
+struct CIO_WIN32_DIR *cio_win32_opendir(const char *path);
+struct cio_win32_dirent *cio_win32_readdir(struct CIO_WIN32_DIR *d);
+int cio_win32_closedir(struct CIO_WIN32_DIR *d);
+
+#define DIR struct CIO_WIN32_DIR
+#define dirent cio_win32_dirent
+#define closedir cio_win32_closedir
+#define opendir cio_win32_opendir
+#define readdir cio_win32_readdir
+
+#define DT_UNKNOWN -1
+#define DT_BLK 1
+#define DT_CHR 2
+#define DT_DIR 3
+#define DT_FIFO 4
+#define DT_LNK 5
+#define DT_REG 6
+#define DT_SOCK 7
+
+#endif