summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_task.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_task.c')
-rw-r--r--fluent-bit/src/flb_task.c520
1 files changed, 0 insertions, 520 deletions
diff --git a/fluent-bit/src/flb_task.c b/fluent-bit/src/flb_task.c
deleted file mode 100644
index bc9ddc634..000000000
--- a/fluent-bit/src/flb_task.c
+++ /dev/null
@@ -1,520 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_input_chunk.h>
-#include <fluent-bit/flb_output.h>
-#include <fluent-bit/flb_router.h>
-#include <fluent-bit/flb_task.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/flb_str.h>
-#include <fluent-bit/flb_scheduler.h>
-
-/*
- * Every task created must have an unique ID, this function lookup the
- * lowest number available in the tasks_map.
- *
- * This 'id' is used by the task interface to communicate with the engine event
- * loop about some action.
- */
-
-static inline int map_get_task_id(struct flb_config *config)
-{
- int i;
- int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map));
-
- for (i = 0; i < map_size; i++) {
- if (config->tasks_map[i].task == NULL) {
- return i;
- }
- }
-
- return -1;
-}
-
-static inline void map_set_task_id(int id, struct flb_task *task,
- struct flb_config *config)
-{
- config->tasks_map[id].task = task;
-
-}
-
-static inline void map_free_task_id(int id, struct flb_config *config)
-{
- config->tasks_map[id].task = NULL;
-}
-
-void flb_task_retry_destroy(struct flb_task_retry *retry)
-{
- int ret;
-
- /* Make sure to invalidate any request from the scheduler */
- ret = flb_sched_request_invalidate(retry->parent->config, retry);
- if (ret == 0) {
- flb_debug("[retry] task retry=%p, invalidated from the scheduler",
- retry);
- }
-
- mk_list_del(&retry->_head);
- flb_free(retry);
-}
-
-/*
- * For an existing task 'retry', re-schedule it. One of the use case of this function
- * is when the engine dispatcher fails to bring the chunk up due to Chunk I/O
- * configuration restrictions, the task needs to be re-scheduled.
- */
-int flb_task_retry_reschedule(struct flb_task_retry *retry, struct flb_config *config)
-{
- int seconds;
- struct flb_task *task;
-
- task = retry->parent;
- seconds = flb_sched_request_create(config, retry, retry->attempts);
- if (seconds == -1) {
- /*
- * This is the worse case scenario: 'cannot re-schedule a retry'. If the Chunk
- * resides only in memory, it will be lost. */
- flb_warn("[task] retry for task %i could not be re-scheduled", task->id);
- flb_task_retry_destroy(retry);
- if (task->users == 0 && mk_list_size(&task->retries) == 0) {
- flb_task_destroy(task, FLB_TRUE);
- }
- return -1;
- }
- else {
- flb_info("[task] re-schedule retry=%p %i in the next %i seconds",
- retry, task->id, seconds);
- }
-
- return 0;
-}
-
-struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
- struct flb_output_instance *ins)
-{
- struct mk_list *head;
- struct mk_list *tmp;
- struct flb_task_retry *retry = NULL;
-
- /* First discover if is there any previous retry context in the task */
- mk_list_foreach_safe(head, tmp, &task->retries) {
- retry = mk_list_entry(head, struct flb_task_retry, _head);
- if (retry->o_ins == ins) {
- if (retry->attempts >= ins->retry_limit && ins->retry_limit >= 0) {
- flb_debug("[task] task_id=%i reached retry-attempts limit %i/%i",
- task->id, retry->attempts, ins->retry_limit);
- flb_task_retry_destroy(retry);
- return NULL;
- }
- break;
- }
- retry = NULL;
- }
-
- if (!retry) {
- /* Create a new re-try instance */
- retry = flb_malloc(sizeof(struct flb_task_retry));
- if (!retry) {
- flb_errno();
- return NULL;
- }
-
- retry->attempts = 1;
- retry->o_ins = ins;
- retry->parent = task;
- mk_list_add(&retry->_head, &task->retries);
-
- flb_debug("[retry] new retry created for task_id=%i attempts=%i",
- task->id, retry->attempts);
- }
- else {
- retry->attempts++;
- flb_debug("[retry] re-using retry for task_id=%i attempts=%i",
- task->id, retry->attempts);
- }
-
- /*
- * This 'retry' was issued by an output plugin, from an Engine perspective
- * we need to determinate if the source input plugin have some memory
- * restrictions and if the Storage type is 'filesystem' we need to put
- * the file content down.
- */
- flb_input_chunk_set_up_down(task->ic);
-
- /*
- * Besides limits adjusted above, a retry that's going to only one place
- * must be down.
- */
- if (mk_list_size(&task->routes) == 1) {
- flb_input_chunk_down(task->ic);
- }
-
- return retry;
-}
-
-/*
- * Return FLB_TRUE or FLB_FALSE if the chunk pointed by the task was
- * created on this running instance or it comes from a chunk in the
- * filesystem from a previous run.
- */
-int flb_task_from_fs_storage(struct flb_task *task)
-{
- struct flb_input_chunk *ic;
-
- ic = (struct flb_input_chunk *) task->ic;
- return ic->fs_backlog;
-}
-
-int flb_task_retry_count(struct flb_task *task, void *data)
-{
- struct mk_list *head;
- struct flb_task_retry *retry;
- struct flb_output_instance *o_ins;
-
- o_ins = (struct flb_output_instance *) data;
-
- mk_list_foreach(head, &task->retries) {
- retry = mk_list_entry(head, struct flb_task_retry, _head);
-
- if (retry->o_ins == o_ins) {
- return retry->attempts;
- }
- }
-
- return -1;
-}
-
-/* Check if a 'retry' context exists for a specific task, if so, cleanup */
-int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_task_retry *retry;
-
- /* Delete 'retries' only associated with the output instance */
- mk_list_foreach_safe(head, tmp, &task->retries) {
- retry = mk_list_entry(head, struct flb_task_retry, _head);
- if (retry->o_ins == ins) {
- flb_task_retry_destroy(retry);
- return 0;
- }
- }
-
- return -1;
-}
-
-/* Allocate an initialize a basic Task structure */
-static struct flb_task *task_alloc(struct flb_config *config)
-{
- int task_id;
- struct flb_task *task;
-
- /* Allocate the new task */
- task = (struct flb_task *) flb_calloc(1, sizeof(struct flb_task));
- if (!task) {
- flb_errno();
- return NULL;
- }
-
- /* Get ID and set back 'task' reference */
- task_id = map_get_task_id(config);
- if (task_id == -1) {
- flb_free(task);
- return NULL;
- }
- map_set_task_id(task_id, task, config);
-
- flb_trace("[task %p] created (id=%i)", task, task_id);
-
- /* Initialize minimum variables */
- task->id = task_id;
- task->config = config;
- task->status = FLB_TASK_NEW;
- task->users = 0;
- mk_list_init(&task->routes);
- mk_list_init(&task->retries);
-
- pthread_mutex_init(&task->lock, NULL);
-
- return task;
-}
-
-/* Return the number of tasks with 'running status' or tasks with retries */
-int flb_task_running_count(struct flb_config *config)
-{
- int count = 0;
- struct mk_list *head;
- struct mk_list *t_head;
- struct flb_task *task;
- struct flb_input_instance *ins;
-
- mk_list_foreach(head, &config->inputs) {
- ins = mk_list_entry(head, struct flb_input_instance, _head);
- mk_list_foreach(t_head, &ins->tasks) {
- task = mk_list_entry(t_head, struct flb_task, _head);
- if (task->users > 0 || mk_list_size(&task->retries) > 0) {
- count++;
- }
- }
- }
-
- return count;
-}
-
-int flb_task_running_print(struct flb_config *config)
-{
- int count = 0;
- flb_sds_t tmp;
- flb_sds_t routes;
- struct mk_list *head;
- struct mk_list *t_head;
- struct mk_list *r_head;
- struct flb_task *task;
- struct flb_task_route *route;
- struct flb_input_instance *ins;
-
- routes = flb_sds_create_size(256);
- if (!routes) {
- flb_error("[task] cannot allocate space to report pending tasks");
- return -1;
- }
-
- mk_list_foreach(head, &config->inputs) {
- ins = mk_list_entry(head, struct flb_input_instance, _head);
- count = mk_list_size(&ins->tasks);
- flb_info("[task] %s/%s has %i pending task(s):",
- ins->p->name, flb_input_name(ins), count);
- mk_list_foreach(t_head, &ins->tasks) {
- task = mk_list_entry(t_head, struct flb_task, _head);
-
- mk_list_foreach(r_head, &task->routes) {
- route = mk_list_entry(r_head, struct flb_task_route, _head);
- tmp = flb_sds_printf(&routes, "%s/%s ",
- route->out->p->name,
- flb_output_name(route->out));
- if (!tmp) {
- flb_sds_destroy(routes);
- flb_error("[task] cannot print report for pending tasks");
- return -1;
- }
- routes = tmp;
- }
-
- flb_info("[task] task_id=%i still running on route(s): %s",
- task->id, routes);
- flb_sds_len_set(routes, 0);
- }
- }
- flb_sds_destroy(routes);
- return 0;
-}
-
-/* Create an engine task to handle the output plugin flushing work */
-struct flb_task *flb_task_create(uint64_t ref_id,
- const char *buf,
- size_t size,
- struct flb_input_instance *i_ins,
- struct flb_input_chunk *ic,
- const char *tag_buf, int tag_len,
- struct flb_config *config,
- int *err)
-{
- int count = 0;
- int total_events = 0;
- struct flb_task *task;
- struct flb_event_chunk *evc;
- struct flb_task_route *route;
- struct flb_router_path *route_path;
- struct flb_output_instance *o_ins;
- struct flb_input_chunk *task_ic;
- struct mk_list *i_head;
- struct mk_list *o_head;
-
- /* No error status */
- *err = FLB_FALSE;
-
- /* allocate task */
- task = task_alloc(config);
- if (!task) {
- *err = FLB_TRUE;
- return NULL;
- }
-
-#ifdef FLB_HAVE_METRICS
- total_events = ((struct flb_input_chunk *) ic)->total_records;
-#endif
-
- /* event chunk */
- evc = flb_event_chunk_create(ic->event_type,
- total_events,
- (char *) tag_buf, tag_len,
- (char *) buf, size);
- if (!evc) {
- flb_free(task);
- *err = FLB_TRUE;
- return NULL;
- }
- task->event_chunk = evc;
- task_ic = (struct flb_input_chunk *) ic;
- task_ic->task = task;
-
- /* Keep track of origins */
- task->ref_id = ref_id;
- task->i_ins = i_ins;
- task->ic = ic;
- mk_list_add(&task->_head, &i_ins->tasks);
-
-#ifdef FLB_HAVE_METRICS
- task->records = ((struct flb_input_chunk *) ic)->total_records;
-#endif
-
- /* Direct connects betweek input <> outputs (API based) */
- if (mk_list_size(&i_ins->routes_direct) > 0) {
- mk_list_foreach(i_head, &i_ins->routes_direct) {
- route_path = mk_list_entry(i_head, struct flb_router_path, _head);
- o_ins = route_path->ins;
-
- route = flb_malloc(sizeof(struct flb_task_route));
- if (!route) {
- flb_errno();
- task->event_chunk->data = NULL;
- flb_task_destroy(task, FLB_TRUE);
- return NULL;
- }
-
- route->out = o_ins;
- mk_list_add(&route->_head, &task->routes);
- }
- flb_debug("[task] created direct task=%p id=%i OK", task, task->id);
- return task;
- }
-
- /* Find matching routes for the incoming task */
- mk_list_foreach(o_head, &config->outputs) {
- o_ins = mk_list_entry(o_head,
- struct flb_output_instance, _head);
-
- /* skip output plugins that don't handle proper event types */
- if (!flb_router_match_type(ic->event_type, o_ins)) {
- continue;
- }
-
- if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id) != 0) {
- route = flb_calloc(1, sizeof(struct flb_task_route));
- if (!route) {
- flb_errno();
- continue;
- }
-
- route->status = FLB_TASK_ROUTE_INACTIVE;
- route->out = o_ins;
- mk_list_add(&route->_head, &task->routes);
- count++;
- }
- }
-
- /* no destinations ?, useless task. */
- if (count == 0) {
- flb_debug("[task] created task=%p id=%i without routes, dropping.",
- task, task->id);
- task->event_chunk->data = NULL;
- flb_task_destroy(task, FLB_TRUE);
- return NULL;
- }
-
- flb_debug("[task] created task=%p id=%i OK", task, task->id);
- return task;
-}
-
-void flb_task_destroy(struct flb_task *task, int del)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_task_route *route;
- struct flb_task_retry *retry;
-
- flb_debug("[task] destroy task=%p (task_id=%i)", task, task->id);
-
- /* Release task_id */
- map_free_task_id(task->id, task->config);
-
- /* Remove routes */
- mk_list_foreach_safe(head, tmp, &task->routes) {
- route = mk_list_entry(head, struct flb_task_route, _head);
- mk_list_del(&route->_head);
- flb_free(route);
- }
-
- /* Unlink and release task */
- mk_list_del(&task->_head);
-
- /* destroy chunk */
- flb_input_chunk_destroy(task->ic, del);
-
- /* Remove 'retries' */
- mk_list_foreach_safe(head, tmp, &task->retries) {
- retry = mk_list_entry(head, struct flb_task_retry, _head);
- flb_task_retry_destroy(retry);
- }
-
- flb_input_chunk_set_limits(task->i_ins);
-
- if (task->event_chunk) {
- flb_event_chunk_destroy(task->event_chunk);
- }
- flb_free(task);
-}
-
-struct flb_task_queue* flb_task_queue_create() {
- struct flb_task_queue *tq;
- tq = flb_malloc(sizeof(struct flb_task_queue));
- if (!tq) {
- flb_errno();
- return NULL;
- }
- mk_list_init(&tq->pending);
- mk_list_init(&tq->in_progress);
- return tq;
-}
-
-void flb_task_queue_destroy(struct flb_task_queue *queue) {
- struct flb_task_enqueued *queued_task;
- struct mk_list *tmp;
- struct mk_list *head;
-
- mk_list_foreach_safe(head, tmp, &queue->pending) {
- queued_task = mk_list_entry(head, struct flb_task_enqueued, _head);
- mk_list_del(&queued_task->_head);
- flb_free(queued_task);
- }
-
- mk_list_foreach_safe(head, tmp, &queue->in_progress) {
- queued_task = mk_list_entry(head, struct flb_task_enqueued, _head);
- mk_list_del(&queued_task->_head);
- flb_free(queued_task);
- }
-
- flb_free(queue);
-}