diff options
Diffstat (limited to '')
-rw-r--r-- | fluent-bit/src/flb_task.c | 520 |
1 files changed, 520 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_task.c b/fluent-bit/src/flb_task.c new file mode 100644 index 00000000..bc9ddc63 --- /dev/null +++ b/fluent-bit/src/flb_task.c @@ -0,0 +1,520 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_chunk.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_router.h> +#include <fluent-bit/flb_task.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_scheduler.h> + +/* + * Every task created must have an unique ID, this function lookup the + * lowest number available in the tasks_map. + * + * This 'id' is used by the task interface to communicate with the engine event + * loop about some action. + */ + +static inline int map_get_task_id(struct flb_config *config) +{ + int i; + int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map)); + + for (i = 0; i < map_size; i++) { + if (config->tasks_map[i].task == NULL) { + return i; + } + } + + return -1; +} + +static inline void map_set_task_id(int id, struct flb_task *task, + struct flb_config *config) +{ + config->tasks_map[id].task = task; + +} + +static inline void map_free_task_id(int id, struct flb_config *config) +{ + config->tasks_map[id].task = NULL; +} + +void flb_task_retry_destroy(struct flb_task_retry *retry) +{ + int ret; + + /* Make sure to invalidate any request from the scheduler */ + ret = flb_sched_request_invalidate(retry->parent->config, retry); + if (ret == 0) { + flb_debug("[retry] task retry=%p, invalidated from the scheduler", + retry); + } + + mk_list_del(&retry->_head); + flb_free(retry); +} + +/* + * For an existing task 'retry', re-schedule it. One of the use case of this function + * is when the engine dispatcher fails to bring the chunk up due to Chunk I/O + * configuration restrictions, the task needs to be re-scheduled. + */ +int flb_task_retry_reschedule(struct flb_task_retry *retry, struct flb_config *config) +{ + int seconds; + struct flb_task *task; + + task = retry->parent; + seconds = flb_sched_request_create(config, retry, retry->attempts); + if (seconds == -1) { + /* + * This is the worse case scenario: 'cannot re-schedule a retry'. If the Chunk + * resides only in memory, it will be lost. */ + flb_warn("[task] retry for task %i could not be re-scheduled", task->id); + flb_task_retry_destroy(retry); + if (task->users == 0 && mk_list_size(&task->retries) == 0) { + flb_task_destroy(task, FLB_TRUE); + } + return -1; + } + else { + flb_info("[task] re-schedule retry=%p %i in the next %i seconds", + retry, task->id, seconds); + } + + return 0; +} + +struct flb_task_retry *flb_task_retry_create(struct flb_task *task, + struct flb_output_instance *ins) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_task_retry *retry = NULL; + + /* First discover if is there any previous retry context in the task */ + mk_list_foreach_safe(head, tmp, &task->retries) { + retry = mk_list_entry(head, struct flb_task_retry, _head); + if (retry->o_ins == ins) { + if (retry->attempts >= ins->retry_limit && ins->retry_limit >= 0) { + flb_debug("[task] task_id=%i reached retry-attempts limit %i/%i", + task->id, retry->attempts, ins->retry_limit); + flb_task_retry_destroy(retry); + return NULL; + } + break; + } + retry = NULL; + } + + if (!retry) { + /* Create a new re-try instance */ + retry = flb_malloc(sizeof(struct flb_task_retry)); + if (!retry) { + flb_errno(); + return NULL; + } + + retry->attempts = 1; + retry->o_ins = ins; + retry->parent = task; + mk_list_add(&retry->_head, &task->retries); + + flb_debug("[retry] new retry created for task_id=%i attempts=%i", + task->id, retry->attempts); + } + else { + retry->attempts++; + flb_debug("[retry] re-using retry for task_id=%i attempts=%i", + task->id, retry->attempts); + } + + /* + * This 'retry' was issued by an output plugin, from an Engine perspective + * we need to determinate if the source input plugin have some memory + * restrictions and if the Storage type is 'filesystem' we need to put + * the file content down. + */ + flb_input_chunk_set_up_down(task->ic); + + /* + * Besides limits adjusted above, a retry that's going to only one place + * must be down. + */ + if (mk_list_size(&task->routes) == 1) { + flb_input_chunk_down(task->ic); + } + + return retry; +} + +/* + * Return FLB_TRUE or FLB_FALSE if the chunk pointed by the task was + * created on this running instance or it comes from a chunk in the + * filesystem from a previous run. + */ +int flb_task_from_fs_storage(struct flb_task *task) +{ + struct flb_input_chunk *ic; + + ic = (struct flb_input_chunk *) task->ic; + return ic->fs_backlog; +} + +int flb_task_retry_count(struct flb_task *task, void *data) +{ + struct mk_list *head; + struct flb_task_retry *retry; + struct flb_output_instance *o_ins; + + o_ins = (struct flb_output_instance *) data; + + mk_list_foreach(head, &task->retries) { + retry = mk_list_entry(head, struct flb_task_retry, _head); + + if (retry->o_ins == o_ins) { + return retry->attempts; + } + } + + return -1; +} + +/* Check if a 'retry' context exists for a specific task, if so, cleanup */ +int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_task_retry *retry; + + /* Delete 'retries' only associated with the output instance */ + mk_list_foreach_safe(head, tmp, &task->retries) { + retry = mk_list_entry(head, struct flb_task_retry, _head); + if (retry->o_ins == ins) { + flb_task_retry_destroy(retry); + return 0; + } + } + + return -1; +} + +/* Allocate an initialize a basic Task structure */ +static struct flb_task *task_alloc(struct flb_config *config) +{ + int task_id; + struct flb_task *task; + + /* Allocate the new task */ + task = (struct flb_task *) flb_calloc(1, sizeof(struct flb_task)); + if (!task) { + flb_errno(); + return NULL; + } + + /* Get ID and set back 'task' reference */ + task_id = map_get_task_id(config); + if (task_id == -1) { + flb_free(task); + return NULL; + } + map_set_task_id(task_id, task, config); + + flb_trace("[task %p] created (id=%i)", task, task_id); + + /* Initialize minimum variables */ + task->id = task_id; + task->config = config; + task->status = FLB_TASK_NEW; + task->users = 0; + mk_list_init(&task->routes); + mk_list_init(&task->retries); + + pthread_mutex_init(&task->lock, NULL); + + return task; +} + +/* Return the number of tasks with 'running status' or tasks with retries */ +int flb_task_running_count(struct flb_config *config) +{ + int count = 0; + struct mk_list *head; + struct mk_list *t_head; + struct flb_task *task; + struct flb_input_instance *ins; + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + mk_list_foreach(t_head, &ins->tasks) { + task = mk_list_entry(t_head, struct flb_task, _head); + if (task->users > 0 || mk_list_size(&task->retries) > 0) { + count++; + } + } + } + + return count; +} + +int flb_task_running_print(struct flb_config *config) +{ + int count = 0; + flb_sds_t tmp; + flb_sds_t routes; + struct mk_list *head; + struct mk_list *t_head; + struct mk_list *r_head; + struct flb_task *task; + struct flb_task_route *route; + struct flb_input_instance *ins; + + routes = flb_sds_create_size(256); + if (!routes) { + flb_error("[task] cannot allocate space to report pending tasks"); + return -1; + } + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + count = mk_list_size(&ins->tasks); + flb_info("[task] %s/%s has %i pending task(s):", + ins->p->name, flb_input_name(ins), count); + mk_list_foreach(t_head, &ins->tasks) { + task = mk_list_entry(t_head, struct flb_task, _head); + + mk_list_foreach(r_head, &task->routes) { + route = mk_list_entry(r_head, struct flb_task_route, _head); + tmp = flb_sds_printf(&routes, "%s/%s ", + route->out->p->name, + flb_output_name(route->out)); + if (!tmp) { + flb_sds_destroy(routes); + flb_error("[task] cannot print report for pending tasks"); + return -1; + } + routes = tmp; + } + + flb_info("[task] task_id=%i still running on route(s): %s", + task->id, routes); + flb_sds_len_set(routes, 0); + } + } + flb_sds_destroy(routes); + return 0; +} + +/* Create an engine task to handle the output plugin flushing work */ +struct flb_task *flb_task_create(uint64_t ref_id, + const char *buf, + size_t size, + struct flb_input_instance *i_ins, + struct flb_input_chunk *ic, + const char *tag_buf, int tag_len, + struct flb_config *config, + int *err) +{ + int count = 0; + int total_events = 0; + struct flb_task *task; + struct flb_event_chunk *evc; + struct flb_task_route *route; + struct flb_router_path *route_path; + struct flb_output_instance *o_ins; + struct flb_input_chunk *task_ic; + struct mk_list *i_head; + struct mk_list *o_head; + + /* No error status */ + *err = FLB_FALSE; + + /* allocate task */ + task = task_alloc(config); + if (!task) { + *err = FLB_TRUE; + return NULL; + } + +#ifdef FLB_HAVE_METRICS + total_events = ((struct flb_input_chunk *) ic)->total_records; +#endif + + /* event chunk */ + evc = flb_event_chunk_create(ic->event_type, + total_events, + (char *) tag_buf, tag_len, + (char *) buf, size); + if (!evc) { + flb_free(task); + *err = FLB_TRUE; + return NULL; + } + task->event_chunk = evc; + task_ic = (struct flb_input_chunk *) ic; + task_ic->task = task; + + /* Keep track of origins */ + task->ref_id = ref_id; + task->i_ins = i_ins; + task->ic = ic; + mk_list_add(&task->_head, &i_ins->tasks); + +#ifdef FLB_HAVE_METRICS + task->records = ((struct flb_input_chunk *) ic)->total_records; +#endif + + /* Direct connects betweek input <> outputs (API based) */ + if (mk_list_size(&i_ins->routes_direct) > 0) { + mk_list_foreach(i_head, &i_ins->routes_direct) { + route_path = mk_list_entry(i_head, struct flb_router_path, _head); + o_ins = route_path->ins; + + route = flb_malloc(sizeof(struct flb_task_route)); + if (!route) { + flb_errno(); + task->event_chunk->data = NULL; + flb_task_destroy(task, FLB_TRUE); + return NULL; + } + + route->out = o_ins; + mk_list_add(&route->_head, &task->routes); + } + flb_debug("[task] created direct task=%p id=%i OK", task, task->id); + return task; + } + + /* Find matching routes for the incoming task */ + mk_list_foreach(o_head, &config->outputs) { + o_ins = mk_list_entry(o_head, + struct flb_output_instance, _head); + + /* skip output plugins that don't handle proper event types */ + if (!flb_router_match_type(ic->event_type, o_ins)) { + continue; + } + + if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id) != 0) { + route = flb_calloc(1, sizeof(struct flb_task_route)); + if (!route) { + flb_errno(); + continue; + } + + route->status = FLB_TASK_ROUTE_INACTIVE; + route->out = o_ins; + mk_list_add(&route->_head, &task->routes); + count++; + } + } + + /* no destinations ?, useless task. */ + if (count == 0) { + flb_debug("[task] created task=%p id=%i without routes, dropping.", + task, task->id); + task->event_chunk->data = NULL; + flb_task_destroy(task, FLB_TRUE); + return NULL; + } + + flb_debug("[task] created task=%p id=%i OK", task, task->id); + return task; +} + +void flb_task_destroy(struct flb_task *task, int del) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_task_route *route; + struct flb_task_retry *retry; + + flb_debug("[task] destroy task=%p (task_id=%i)", task, task->id); + + /* Release task_id */ + map_free_task_id(task->id, task->config); + + /* Remove routes */ + mk_list_foreach_safe(head, tmp, &task->routes) { + route = mk_list_entry(head, struct flb_task_route, _head); + mk_list_del(&route->_head); + flb_free(route); + } + + /* Unlink and release task */ + mk_list_del(&task->_head); + + /* destroy chunk */ + flb_input_chunk_destroy(task->ic, del); + + /* Remove 'retries' */ + mk_list_foreach_safe(head, tmp, &task->retries) { + retry = mk_list_entry(head, struct flb_task_retry, _head); + flb_task_retry_destroy(retry); + } + + flb_input_chunk_set_limits(task->i_ins); + + if (task->event_chunk) { + flb_event_chunk_destroy(task->event_chunk); + } + flb_free(task); +} + +struct flb_task_queue* flb_task_queue_create() { + struct flb_task_queue *tq; + tq = flb_malloc(sizeof(struct flb_task_queue)); + if (!tq) { + flb_errno(); + return NULL; + } + mk_list_init(&tq->pending); + mk_list_init(&tq->in_progress); + return tq; +} + +void flb_task_queue_destroy(struct flb_task_queue *queue) { + struct flb_task_enqueued *queued_task; + struct mk_list *tmp; + struct mk_list *head; + + mk_list_foreach_safe(head, tmp, &queue->pending) { + queued_task = mk_list_entry(head, struct flb_task_enqueued, _head); + mk_list_del(&queued_task->_head); + flb_free(queued_task); + } + + mk_list_foreach_safe(head, tmp, &queue->in_progress) { + queued_task = mk_list_entry(head, struct flb_task_enqueued, _head); + mk_list_del(&queued_task->_head); + flb_free(queued_task); + } + + flb_free(queue); +} |