summaryrefslogtreecommitdiffstats
path: root/fluent-bit/include/fluent-bit/flb_task.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/include/fluent-bit/flb_task.h')
-rw-r--r--fluent-bit/include/fluent-bit/flb_task.h279
1 files changed, 279 insertions, 0 deletions
diff --git a/fluent-bit/include/fluent-bit/flb_task.h b/fluent-bit/include/fluent-bit/flb_task.h
new file mode 100644
index 00000000..1e8de515
--- /dev/null
+++ b/fluent-bit/include/fluent-bit/flb_task.h
@@ -0,0 +1,279 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_TASK_H
+#define FLB_TASK_H
+
+#include <monkey/mk_core.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+/* Task status */
+#define FLB_TASK_NEW 0
+#define FLB_TASK_RUNNING 1
+
+/*
+ * Macro helpers to determinate return value, task_id and coro_id. When an
+ * output plugin returns, it must call FLB_OUTPUT_RETURN(val) where val is
+ * the return value, as of now defined as FLB_OK, FLB_ERROR or FLB_RETRY.
+ *
+ * The FLB_OUTPUT_RETURN macro lookup the current active 'engine coroutine' and
+ * it 'engine task' associated, so it emits an event to the main event loop
+ * indicating an output coroutine has finished. In order to specify return
+ * values and the proper IDs an unsigned 32 bits number is used:
+ *
+ * AAAA BBBBBBBBBBBBBB CCCCCCCCCCCCCC > 32 bit number
+ * ^ ^ ^
+ * 4 bits 14 bits 14 bits
+ * return val task_id output_id
+ */
+
+#define FLB_TASK_RET(val) (val >> 28)
+#define FLB_TASK_ID(val) (uint32_t) (val & 0xfffc000) >> 14
+#define FLB_TASK_OUT(val) (val & 0x3fff)
+#define FLB_TASK_SET(ret, task_id, out_id) \
+ (uint32_t) ((ret << 28) | (task_id << 14) | out_id)
+
+/* Route status */
+#define FLB_TASK_ROUTE_INACTIVE 0
+#define FLB_TASK_ROUTE_ACTIVE 1
+#define FLB_TASK_ROUTE_DROPPED 2
+
+struct flb_task_route {
+ int status;
+ struct flb_output_instance *out;
+ struct mk_list _head;
+};
+
+/*
+ * When a Task failed in an output instance plugin and this last one
+ * requested a FLB_RETRY, a flb_engine_task_retry entry is created and
+ * linked into the parent flb_engine_task->retries lists.
+ *
+ * This reference is used later by the scheduler to re-dispatch the
+ * task data to the desired output path.
+ */
+struct flb_task_retry {
+ int attempts; /* number of attempts, default 1 */
+ struct flb_output_instance *o_ins; /* route that we are retrying */
+ struct flb_task *parent; /* parent task reference */
+ struct mk_list _head; /* link to parent task list */
+};
+
+/* A task takes a buffer and sync input and output instances to handle it */
+struct flb_task {
+ int id; /* task id */
+ uint64_t ref_id; /* external reference id */
+ uint8_t status; /* new task or running ? */
+ int users; /* number of users (threads) */
+ struct flb_event_chunk *event_chunk; /* event chunk context */
+ void *ic; /* input chunk context */
+#ifdef FLB_HAVE_METRICS
+ int records; /* numbers of records in 'buf' */
+#endif
+ struct mk_list routes; /* routes to dispatch data */
+ struct mk_list retries; /* queued in-memory retries */
+ struct mk_list _head; /* link to input_instance */
+ struct flb_input_instance *i_ins; /* input instance */
+ struct flb_config *config; /* parent flb config */
+ pthread_mutex_t lock;
+};
+
+/*
+ * A queue of flb_task_enqueued tasks
+ *
+ * This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
+ * is used.
+ */
+struct flb_task_queue {
+ struct mk_list pending;
+ struct mk_list in_progress;
+};
+
+/*
+ * An enqueued task is a task that is not yet dispatched to a thread
+ * or started on the engine.
+ *
+ * There may be multiple enqueued instances of the same task on different out instances.
+ *
+ * This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
+ * is used.
+ */
+struct flb_task_enqueued {
+ struct flb_task *task;
+ struct flb_task_retry *retry;
+ struct flb_output_instance *out_instance;
+ struct flb_config *config;
+ struct mk_list _head;
+};
+
+int flb_task_running_count(struct flb_config *config);
+int flb_task_running_print(struct flb_config *config);
+
+struct flb_task *flb_task_create(uint64_t ref_id,
+ const char *buf,
+ size_t size,
+ struct flb_input_instance *i_ins,
+ struct flb_input_chunk *ic,
+ const char *tag_buf, int tag_len,
+ struct flb_config *config,
+ int *err);
+
+void flb_task_add_coro(struct flb_task *task, struct flb_coro *coro);
+
+void flb_task_destroy(struct flb_task *task, int del);
+
+struct flb_task_queue* flb_task_queue_create();
+void flb_task_queue_destroy(struct flb_task_queue *queue);
+struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
+ struct flb_output_instance *ins);
+
+void flb_task_retry_destroy(struct flb_task_retry *retry);
+int flb_task_retry_reschedule(struct flb_task_retry *retry, struct flb_config *config);
+int flb_task_from_fs_storage(struct flb_task *task);
+int flb_task_retry_count(struct flb_task *task, void *data);
+int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins);
+
+
+struct flb_task *flb_task_chunk_create(uint64_t ref_id,
+ const char *buf,
+ size_t size,
+ struct flb_input_instance *i_ins,
+ void *ic,
+ const char *tag_buf, int tag_len,
+ struct flb_config *config);
+
+static inline void flb_task_users_release(struct flb_task *task)
+{
+ if (task->users == 0 && mk_list_size(&task->retries) == 0) {
+ flb_task_destroy(task, FLB_TRUE);
+ }
+}
+
+/* Increase the counter for users */
+static inline void flb_task_users_inc(struct flb_task *task)
+{
+ task->users++;
+}
+
+/*
+ * Decrement the users counter from the task, and if release_check is enabled,
+ * it will check if the task can be destroyed.
+ */
+static inline void flb_task_users_dec(struct flb_task *task, int release_check)
+{
+ task->users--;
+ if (release_check == FLB_TRUE) {
+ flb_task_users_release(task);
+ }
+}
+
+static inline void flb_task_acquire_lock(struct flb_task *task)
+{
+ pthread_mutex_lock(&task->lock);
+}
+
+static inline void flb_task_release_lock(struct flb_task *task)
+{
+ pthread_mutex_unlock(&task->lock);
+}
+
+static FLB_INLINE int flb_task_get_active_route_count(
+ struct flb_task *task)
+{
+ struct mk_list *iterator;
+ int result;
+ struct flb_task_route *route;
+
+ result = 0;
+
+ mk_list_foreach(iterator, &task->routes) {
+ route = mk_list_entry(iterator, struct flb_task_route, _head);
+
+ if (route->status == FLB_TASK_ROUTE_ACTIVE) {
+ result++;
+ }
+ }
+
+ return result;
+}
+
+static FLB_INLINE size_t flb_task_get_route_status(
+ struct flb_task *task,
+ struct flb_output_instance *o_ins)
+{
+ struct mk_list *iterator;
+ size_t result;
+ struct flb_task_route *route;
+
+ result = FLB_TASK_ROUTE_INACTIVE;
+
+ mk_list_foreach(iterator, &task->routes) {
+ route = mk_list_entry(iterator, struct flb_task_route, _head);
+
+ if (route->out == o_ins) {
+ result = route->status;
+ break;
+ }
+ }
+
+ return result;
+}
+
+static FLB_INLINE void flb_task_set_route_status(
+ struct flb_task *task,
+ struct flb_output_instance *o_ins,
+ int new_status)
+{
+ struct mk_list *iterator;
+ struct flb_task_route *route;
+
+ mk_list_foreach(iterator, &task->routes) {
+ route = mk_list_entry(iterator, struct flb_task_route, _head);
+
+ if (route->out == o_ins) {
+ route->status = new_status;
+ break;
+ }
+ }
+}
+
+
+static FLB_INLINE void flb_task_activate_route(
+ struct flb_task *task,
+ struct flb_output_instance *o_ins)
+{
+ flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_ACTIVE);
+}
+
+static FLB_INLINE void flb_task_deactivate_route(
+ struct flb_task *task,
+ struct flb_output_instance *o_ins)
+{
+ flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_INACTIVE);
+}
+
+static FLB_INLINE void flb_task_drop_route(
+ struct flb_task *task,
+ struct flb_output_instance *o_ins)
+{
+ flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_DROPPED);
+}
+
+#endif