diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c')
-rw-r--r-- | src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c | 922 |
1 files changed, 0 insertions, 922 deletions
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c deleted file mode 100644 index c7acfd671..000000000 --- a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c +++ /dev/null @@ -1,922 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_version.h> -#include <fluent-bit/flb_error.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_gzip.h> - -#include <monkey/monkey.h> -#include <monkey/mk_core.h> - -#include "in_elasticsearch.h" -#include "in_elasticsearch_bulk_conn.h" -#include "in_elasticsearch_bulk_prot.h" - -#define HTTP_CONTENT_JSON 0 -#define HTTP_CONTENT_NDJSON 1 - -static int send_empty_response(struct in_elasticsearch_bulk_conn *conn, int http_status) -{ - size_t sent; - flb_sds_t out; - - out = flb_sds_create_size(256); - if (!out) { - return -1; - } - - if (http_status == 200) { - flb_sds_printf(&out, - "HTTP/1.1 200 OK\r\n" - "Content-Type: application/json\r\n\r\n"); - } - - /* We should check this operations result */ - flb_io_net_write(conn->connection, - (void *) out, - flb_sds_len(out), - &sent); - - flb_sds_destroy(out); - - return 0; -} - -static int send_json_message_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message) -{ - size_t sent; - int len; - flb_sds_t out; - - out = flb_sds_create_size(256); - if (!out) { - return -1; - } - - if (message) { - len = strlen(message); - } - else { - len = 0; - } - - if (http_status == 200) { - flb_sds_printf(&out, - "HTTP/1.1 200 OK\r\n" - "Content-Type: application/json\r\n" - "Content-Length: %i\r\n\r\n%s", - len, message); - } - - /* We should check this operations result */ - flb_io_net_write(conn->connection, - (void *) out, - flb_sds_len(out), - &sent); - - flb_sds_destroy(out); - - return 0; -} - -static int send_version_message_response(struct flb_in_elasticsearch *ctx, - struct in_elasticsearch_bulk_conn *conn, int http_status) -{ - size_t sent; - int len; - flb_sds_t out; - flb_sds_t resp; - - out = flb_sds_create_size(256); - if (!out) { - return -1; - } - resp = flb_sds_create_size(384); - if (!resp) { - flb_sds_destroy(out); - return -1; - } - - flb_sds_printf(&resp, - ES_VERSION_RESPONSE_TEMPLATE, - ctx->es_version); - - len = flb_sds_len(resp); - - if (http_status == 200) { - flb_sds_printf(&out, - "HTTP/1.1 200 OK\r\n" - "Content-Type: application/json\r\n" - "Content-Length: %i\r\n\r\n%s", - len, resp); - } - - /* We should check this operations result */ - flb_io_net_write(conn->connection, - (void *) out, - flb_sds_len(out), - &sent); - - flb_sds_destroy(resp); - flb_sds_destroy(out); - - return 0; -} - -static int send_dummy_sniffer_response(struct in_elasticsearch_bulk_conn *conn, int http_status, - struct flb_in_elasticsearch *ctx) -{ - size_t sent; - int len; - flb_sds_t out; - flb_sds_t resp; - flb_sds_t hostname; - - if (ctx->hostname != NULL) { - hostname = ctx->hostname; - } - else { - hostname = "localhost"; - } - - out = flb_sds_create_size(384); - if (!out) { - return -1; - } - - resp = flb_sds_create_size(384); - if (!resp) { - flb_sds_destroy(out); - return -1; - } - - flb_sds_printf(&resp, - ES_NODES_TEMPLATE, - ctx->cluster_name, ctx->node_name, - hostname, ctx->tcp_port, ctx->buffer_max_size); - - len = flb_sds_len(resp) ; - - if (http_status == 200) { - flb_sds_printf(&out, - "HTTP/1.1 200 OK\r\n" - "Content-Type: application/json\r\n" - "Content-Length: %i\r\n\r\n%s", - len, resp); - } - - /* We should check this operations result */ - flb_io_net_write(conn->connection, - (void *) out, - flb_sds_len(out), - &sent); - - flb_sds_destroy(resp); - flb_sds_destroy(out); - - return 0; -} - -static int send_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message) -{ - size_t sent; - int len; - flb_sds_t out; - - out = flb_sds_create_size(256); - if (!out) { - return -1; - } - - if (message) { - len = strlen(message); - } - else { - len = 0; - } - - if (http_status == 200) { - flb_sds_printf(&out, - "HTTP/1.1 200 OK\r\n" - "Server: Fluent Bit v%s\r\n" - "Content-Type: application/json\r\n" - "Content-Length: %i\r\n\r\n%s", - FLB_VERSION_STR, - len, message); - } - else if (http_status == 400) { - flb_sds_printf(&out, - "HTTP/1.1 400 Forbidden\r\n" - "Server: Fluent Bit v%s\r\n" - "Content-Length: %i\r\n\r\n%s", - FLB_VERSION_STR, - len, message); - } - - /* We should check this operations result */ - flb_io_net_write(conn->connection, - (void *) out, - flb_sds_len(out), - &sent); - - flb_sds_destroy(out); - - return 0; -} - -/* implements functionality to get tag from key in record */ -static flb_sds_t tag_key(struct flb_in_elasticsearch *ctx, msgpack_object *map) -{ - size_t map_size = map->via.map.size; - msgpack_object_kv *kv; - msgpack_object key; - msgpack_object val; - char *key_str = NULL; - char *val_str = NULL; - size_t key_str_size = 0; - size_t val_str_size = 0; - int j; - int check = FLB_FALSE; - int found = FLB_FALSE; - flb_sds_t tag; - - kv = map->via.map.ptr; - - for(j=0; j < map_size; j++) { - check = FLB_FALSE; - found = FLB_FALSE; - key = (kv+j)->key; - if (key.type == MSGPACK_OBJECT_BIN) { - key_str = (char *) key.via.bin.ptr; - key_str_size = key.via.bin.size; - check = FLB_TRUE; - } - if (key.type == MSGPACK_OBJECT_STR) { - key_str = (char *) key.via.str.ptr; - key_str_size = key.via.str.size; - check = FLB_TRUE; - } - - if (check == FLB_TRUE) { - if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) { - val = (kv+j)->val; - if (val.type == MSGPACK_OBJECT_BIN) { - val_str = (char *) val.via.bin.ptr; - val_str_size = val.via.str.size; - found = FLB_TRUE; - break; - } - if (val.type == MSGPACK_OBJECT_STR) { - val_str = (char *) val.via.str.ptr; - val_str_size = val.via.str.size; - found = FLB_TRUE; - break; - } - } - } - } - - if (found == FLB_TRUE) { - tag = flb_sds_create_len(val_str, val_str_size); - if (!tag) { - flb_errno(); - return NULL; - } - return tag; - } - - - flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key); - return NULL; -} - -static int get_write_op(struct flb_in_elasticsearch *ctx, msgpack_object *map, flb_sds_t *out_write_op, size_t *out_key_size) -{ - char *op_str = NULL; - size_t op_str_size = 0; - msgpack_object_kv *kv; - msgpack_object key; - int check = FLB_FALSE; - - kv = map->via.map.ptr; - key = kv[0].key; - if (key.type == MSGPACK_OBJECT_BIN) { - op_str = (char *) key.via.bin.ptr; - op_str_size = key.via.bin.size; - check = FLB_TRUE; - } - if (key.type == MSGPACK_OBJECT_STR) { - op_str = (char *) key.via.str.ptr; - op_str_size = key.via.str.size; - check = FLB_TRUE; - } - - if (check == FLB_TRUE) { - *out_write_op = flb_sds_create_len(op_str, op_str_size); - *out_key_size = op_str_size; - } - - return check; -} - -static int status_buffer_avail(struct flb_in_elasticsearch *ctx, flb_sds_t bulk_statuses, size_t threshold) -{ - if (flb_sds_avail(bulk_statuses) < threshold) { - flb_plg_warn(ctx->ins, "left buffer for bulk status(es) is too small"); - - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static int process_ndpack(struct flb_in_elasticsearch *ctx, flb_sds_t tag, char *buf, size_t size, flb_sds_t bulk_statuses) -{ - int ret; - size_t off = 0; - size_t map_copy_index; - msgpack_object_kv *map_copy_entry; - msgpack_unpacked result; - struct flb_time tm; - msgpack_object *obj; - flb_sds_t tag_from_record = NULL; - int idx = 0; - flb_sds_t write_op; - size_t op_str_size = 0; - int op_ret = FLB_FALSE; - int error_op = FLB_FALSE; - - flb_time_get(&tm); - - msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { - if (result.data.type == MSGPACK_OBJECT_MAP) { - if (idx > 0 && idx % 2 == 0) { - flb_sds_cat(bulk_statuses, ",", 1); - } - if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) { - break; - } - if (idx % 2 == 0) { - op_ret = get_write_op(ctx, &result.data, &write_op, &op_str_size); - - if (op_ret) { - if (flb_sds_cmp(write_op, "index", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"index\":", 9); - error_op = FLB_FALSE; - } - else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"create\":", 10); - error_op = FLB_FALSE; - } - else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"update\":", 10); - error_op = FLB_TRUE; - } - else if (flb_sds_cmp(write_op, "delete", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"delete\":{\"status\":404,\"result\":\"not_found\"}}", 46); - error_op = FLB_TRUE; - idx += 1; /* Prepare to adjust to multiple of two - * in the end of the loop. - * Due to delete actions include only one line. */ - flb_sds_destroy(write_op); - - goto proceed; - } - else { - flb_sds_cat(bulk_statuses, "{\"unknown\":{\"status\":400,\"result\":\"bad_request\"}}", 49); - error_op = FLB_TRUE; - - flb_sds_destroy(write_op); - - break; - } - } else { - flb_sds_destroy(write_op); - flb_plg_error(ctx->ins, "meta information line is missing"); - error_op = FLB_TRUE; - - break; - } - - if (error_op == FLB_FALSE) { - flb_log_event_encoder_reset(&ctx->log_encoder); - - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_sds_destroy(write_op); - flb_plg_error(ctx->ins, "event encoder error : %d", ret); - error_op = FLB_TRUE; - - break; - } - - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &tm); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_sds_destroy(write_op); - flb_plg_error(ctx->ins, "event encoder error : %d", ret); - error_op = FLB_TRUE; - - break; - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_values( - &ctx->log_encoder, - FLB_LOG_EVENT_CSTRING_VALUE((char *) ctx->meta_key), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&result.data)); - } - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_sds_destroy(write_op); - flb_plg_error(ctx->ins, "event encoder error : %d", ret); - error_op = FLB_TRUE; - - break; - } - } - } - else if (idx % 2 == 1) { - if (error_op == FLB_FALSE) { - /* Pack body */ - - for (map_copy_index = 0 ; - map_copy_index < result.data.via.map.size && - ret == FLB_EVENT_ENCODER_SUCCESS ; - map_copy_index++) { - map_copy_entry = &result.data.via.map.ptr[map_copy_index]; - - ret = flb_log_event_encoder_append_body_values( - &ctx->log_encoder, - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->key), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->val)); - } - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "event encoder error : %d", ret); - error_op = FLB_TRUE; - - break; - } - - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "event encoder error : %d", ret); - error_op = FLB_TRUE; - - break; - } - - tag_from_record = NULL; - - if (ctx->tag_key) { - obj = &result.data; - tag_from_record = tag_key(ctx, obj); - } - - if (tag_from_record) { - flb_input_log_append(ctx->ins, - tag_from_record, - flb_sds_len(tag_from_record), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - flb_sds_destroy(tag_from_record); - } - else if (tag) { - flb_input_log_append(ctx->ins, - tag, - flb_sds_len(tag), - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - else { - /* use default plugin Tag (it internal name, e.g: http.0 */ - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - } - - flb_log_event_encoder_reset(&ctx->log_encoder); - } - if (op_ret) { - if (flb_sds_cmp(write_op, "index", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34); - } - else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34); - } - else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) { - flb_sds_cat(bulk_statuses, "{\"status\":403,\"result\":\"forbidden\"}}", 36); - } - if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) { - flb_sds_destroy(write_op); - - break; - } - } - flb_sds_destroy(write_op); - } - - proceed: - idx++; - } - else { - flb_plg_error(ctx->ins, "skip record from invalid type: %i", - result.data.type); - msgpack_unpacked_destroy(&result); - return -1; - } - } - - if (idx % 2 != 0) { - flb_plg_warn(ctx->ins, "decode payload of Bulk API is failed"); - msgpack_unpacked_destroy(&result); - if (error_op == FLB_FALSE) { - /* On lacking of body case in non-error case, there is no - * releasing memory code paths. We should proceed to do - * it here. */ - flb_sds_destroy(write_op); - } - - return -1; - } - - msgpack_unpacked_destroy(&result); - - return 0; -} - -static ssize_t parse_payload_ndjson(struct flb_in_elasticsearch *ctx, flb_sds_t tag, - char *payload, size_t size, flb_sds_t bulk_statuses) -{ - int ret; - int out_size; - char *pack; - struct flb_pack_state pack_state; - - /* Initialize packer */ - flb_pack_state_init(&pack_state); - - /* Pack JSON as msgpack */ - ret = flb_pack_json_state(payload, size, - &pack, &out_size, &pack_state); - flb_pack_state_reset(&pack_state); - - /* Handle exceptions */ - if (ret == FLB_ERR_JSON_PART) { - flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); - return -1; - } - else if (ret == FLB_ERR_JSON_INVAL) { - flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); - return -1; - } - else if (ret == -1) { - return -1; - } - - /* Process the packaged JSON and return the last byte used */ - process_ndpack(ctx, tag, pack, out_size, bulk_statuses); - flb_free(pack); - - return 0; -} - -static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticsearch_bulk_conn *conn, - flb_sds_t tag, - struct mk_http_session *session, - struct mk_http_request *request, - flb_sds_t bulk_statuses) -{ - int type = -1; - int i = 0; - int ret = 0; - struct mk_http_header *header; - int extra_size = -1; - struct mk_http_header *headers_extra; - int gzip_compressed = FLB_FALSE; - void *gz_data = NULL; - size_t gz_size = -1; - - header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; - if (header->key.data == NULL) { - send_response(conn, 400, "error: header 'Content-Type' is not set\n"); - return -1; - } - - if (header->val.len >= 20 && - strncasecmp(header->val.data, "application/x-ndjson", 20) == 0) { - type = HTTP_CONTENT_NDJSON; - } - - if (header->val.len >= 16 && - strncasecmp(header->val.data, "application/json", 16) == 0) { - type = HTTP_CONTENT_JSON; - } - - if (type == -1) { - send_response(conn, 400, "error: invalid 'Content-Type'\n"); - return -1; - } - - if (request->data.len <= 0) { - send_response(conn, 400, "error: no payload found\n"); - return -1; - } - - extra_size = session->parser.headers_extra_count; - if (extra_size > 0) { - for (i = 0; i < extra_size; i++) { - headers_extra = &session->parser.headers_extra[i]; - if (headers_extra->key.len == 16 && - strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) { - if (headers_extra->val.len == 4 && - strncasecmp(headers_extra->val.data, "gzip", 4) == 0) { - flb_debug("[elasticsearch_bulk_prot] body is gzipped"); - gzip_compressed = FLB_TRUE; - } - } - } - } - - if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) { - if (gzip_compressed == FLB_TRUE) { - ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, - &gz_data, &gz_size); - if (ret == -1) { - flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed"); - return -1; - } - parse_payload_ndjson(ctx, tag, gz_data, gz_size, bulk_statuses); - flb_free(gz_data); - } - else { - parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses); - } - } - - return 0; -} - -static inline int mk_http_point_header(mk_ptr_t *h, - struct mk_http_parser *parser, int key) -{ - struct mk_http_header *header; - - header = &parser->headers[key]; - if (header->type == key) { - h->data = header->val.data; - h->len = header->val.len; - return 0; - } - else { - h->data = NULL; - h->len = -1; - } - - return -1; -} - -/* - * Handle an incoming request. It perform extra checks over the request, if - * everything is OK, it enqueue the incoming payload. - */ -int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx, - struct in_elasticsearch_bulk_conn *conn, - struct mk_http_session *session, - struct mk_http_request *request) -{ - int i; - int ret; - int len; - char *uri; - char *qs; - off_t diff; - flb_sds_t tag; - struct mk_http_header *header; - flb_sds_t bulk_statuses = NULL; - flb_sds_t bulk_response = NULL; - char *error_str = NULL; - - if (request->uri.data[0] != '/') { - send_response(conn, 400, "error: invalid request\n"); - return -1; - } - - /* Decode URI */ - uri = mk_utils_url_decode(request->uri); - if (!uri) { - uri = mk_mem_alloc_z(request->uri.len + 1); - if (!uri) { - return -1; - } - memcpy(uri, request->uri.data, request->uri.len); - uri[request->uri.len] = '\0'; - } - - /* Try to match a query string so we can remove it */ - qs = strchr(uri, '?'); - if (qs) { - /* remove the query string part */ - diff = qs - uri; - uri[diff] = '\0'; - } - - /* Refer the tag at first*/ - if (ctx->ins->tag && !ctx->ins->tag_default) { - tag = flb_sds_create(ctx->ins->tag); - if (tag == NULL) { - return -1; - } - } - else { - /* Compose the query string using the URI */ - len = strlen(uri); - - if (len == 1) { - tag = NULL; /* use default tag */ - } - else { - /* New tag skipping the URI '/' */ - tag = flb_sds_create_len(&uri[1], len - 1); - if (!tag) { - mk_mem_free(uri); - return -1; - } - - /* Sanitize, only allow alphanum chars */ - for (i = 0; i < flb_sds_len(tag); i++) { - if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { - tag[i] = '_'; - } - } - } - } - - /* Check if we have a Host header: Hostname ; port */ - mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); - - /* Header: Connection */ - mk_http_point_header(&request->connection, &session->parser, - MK_HEADER_CONNECTION); - - /* HTTP/1.1 needs Host header */ - if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { - flb_sds_destroy(tag); - mk_mem_free(uri); - return -1; - } - - /* Should we close the session after this request ? */ - mk_http_keepalive_check(session, request, ctx->server); - - /* Content Length */ - header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; - if (header->type == MK_HEADER_CONTENT_LENGTH) { - request->_content_length.data = header->val.data; - request->_content_length.len = header->val.len; - } - else { - request->_content_length.data = NULL; - } - - if (request->method == MK_METHOD_HEAD) { - send_empty_response(conn, 200); - - flb_sds_destroy(tag); - mk_mem_free(uri); - - return 0; - } - - if (request->method == MK_METHOD_PUT) { - send_json_message_response(conn, 200, "{}"); - - flb_sds_destroy(tag); - mk_mem_free(uri); - - return 0; - } - - if (request->method == MK_METHOD_GET) { - if (strncmp(uri, "/_nodes/http", 12) == 0) { - send_dummy_sniffer_response(conn, 200, ctx); - } - else if (strlen(uri) == 1 && strncmp(uri, "/", 1) == 0) { - send_version_message_response(ctx, conn, 200); - } - else { - send_json_message_response(conn, 200, "{}"); - } - - flb_sds_destroy(tag); - mk_mem_free(uri); - - return 0; - } - - if (request->method == MK_METHOD_POST) { - if (strncmp(uri, "/_bulk", 6) == 0) { - bulk_statuses = flb_sds_create_size(ctx->buffer_max_size); - if (!bulk_statuses) { - flb_sds_destroy(tag); - mk_mem_free(uri); - return -1; - } - - bulk_response = flb_sds_create_size(ctx->buffer_max_size); - if (!bulk_response) { - flb_sds_destroy(bulk_statuses); - flb_sds_destroy(tag); - mk_mem_free(uri); - return -1; - } - } else { - flb_sds_destroy(tag); - mk_mem_free(uri); - - send_response(conn, 400, "error: invaild HTTP endpoint\n"); - - return -1; - } - } - - if (request->method != MK_METHOD_POST && - request->method != MK_METHOD_GET && - request->method != MK_METHOD_HEAD && - request->method != MK_METHOD_PUT) { - - if (bulk_statuses) { - flb_sds_destroy(bulk_statuses); - } - if (bulk_response) { - flb_sds_destroy(bulk_response); - } - - flb_sds_destroy(tag); - mk_mem_free(uri); - - send_response(conn, 400, "error: invalid HTTP method\n"); - return -1; - } - - ret = process_payload(ctx, conn, tag, session, request, bulk_statuses); - flb_sds_destroy(tag); - - len = flb_sds_len(bulk_statuses); - if (flb_sds_alloc(bulk_response) < len + 27) { - bulk_response = flb_sds_increase(bulk_response, len + 27 - flb_sds_alloc(bulk_response)); - } - error_str = strstr(bulk_statuses, "\"status\":40"); - if (error_str){ - flb_sds_cat(bulk_response, "{\"errors\":true,\"items\":[", 24); - } - else { - flb_sds_cat(bulk_response, "{\"errors\":false,\"items\":[", 25); - } - flb_sds_cat(bulk_response, bulk_statuses, flb_sds_len(bulk_statuses)); - flb_sds_cat(bulk_response, "]}", 2); - send_response(conn, 200, bulk_response); - - mk_mem_free(uri); - flb_sds_destroy(bulk_statuses); - flb_sds_destroy(bulk_response); - - return ret; -} - -/* - * Handle an incoming request which has resulted in an http parser error. - */ -int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx, - struct in_elasticsearch_bulk_conn *conn, - struct mk_http_session *session, - struct mk_http_request *request) -{ - send_response(conn, 400, "error: invalid request\n"); - return -1; -} |