diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:07:37 +0000 |
commit | b485aab7e71c1625cfc27e0f92c9509f42378458 (patch) | |
tree | ae9abe108601079d1679194de237c9a435ae5b55 /fluent-bit/src/flb_pack.c | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip |
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_pack.c')
-rw-r--r-- | fluent-bit/src/flb_pack.c | 1270 |
1 files changed, 0 insertions, 1270 deletions
diff --git a/fluent-bit/src/flb_pack.c b/fluent-bit/src/flb_pack.c deleted file mode 100644 index adcaa22c9..000000000 --- a/fluent-bit/src/flb_pack.c +++ /dev/null @@ -1,1270 +0,0 @@ -/*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <stdlib.h> -#include <string.h> - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_error.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_unescape.h> - -/* cmetrics */ -#include <cmetrics/cmetrics.h> -#include <cmetrics/cmt_decode_msgpack.h> -#include <cmetrics/cmt_encode_text.h> - -#include <msgpack.h> -#include <math.h> -#include <jsmn/jsmn.h> - -#define try_to_write_str flb_utils_write_str - -static int convert_nan_to_null = FLB_FALSE; - -static int flb_pack_set_null_as_nan(int b) { - if (b == FLB_TRUE || b == FLB_FALSE) { - convert_nan_to_null = b; - } - return convert_nan_to_null; -} - -int flb_json_tokenise(const char *js, size_t len, - struct flb_pack_state *state) -{ - int ret; - int new_tokens = 256; - size_t old_size; - size_t new_size; - void *tmp; - - ret = jsmn_parse(&state->parser, js, len, - state->tokens, state->tokens_size); - while (ret == JSMN_ERROR_NOMEM) { - /* Get current size of the array in bytes */ - old_size = state->tokens_size * sizeof(jsmntok_t); - - /* New size: add capacity for new 256 entries */ - new_size = old_size + (sizeof(jsmntok_t) * new_tokens); - - tmp = flb_realloc(state->tokens, new_size); - if (!tmp) { - flb_errno(); - return -1; - } - state->tokens = tmp; - state->tokens_size += new_tokens; - - ret = jsmn_parse(&state->parser, js, len, - state->tokens, state->tokens_size); - } - - if (ret == JSMN_ERROR_INVAL) { - return FLB_ERR_JSON_INVAL; - } - - if (ret == JSMN_ERROR_PART) { - /* This is a partial JSON message, just stop */ - flb_trace("[json tokenise] incomplete"); - return FLB_ERR_JSON_PART; - } - - state->tokens_count += ret; - return 0; -} - -static inline int is_float(const char *buf, int len) -{ - const char *end = buf + len; - const char *p = buf; - - while (p <= end) { - if (*p == 'e' && p < end && *(p + 1) == '-') { - return 1; - } - else if (*p == '.') { - return 1; - } - p++; - } - - return 0; -} - -/* Sanitize incoming JSON string */ -static inline int pack_string_token(struct flb_pack_state *state, - const char *str, int len, - msgpack_packer *pck) -{ - int s; - int out_len; - char *tmp; - char *out_buf; - - if (state->buf_size < len + 1) { - s = len + 1; - tmp = flb_realloc(state->buf_data, s); - if (!tmp) { - flb_errno(); - return -1; - } - else { - state->buf_data = tmp; - state->buf_size = s; - } - } - out_buf = state->buf_data; - - /* Always decode any UTF-8 or special characters */ - out_len = flb_unescape_string_utf8(str, len, out_buf); - - /* Pack decoded text */ - msgpack_pack_str(pck, out_len); - msgpack_pack_str_body(pck, out_buf, out_len); - - return out_len; -} - -/* Receive a tokenized JSON message and convert it to MsgPack */ -static char *tokens_to_msgpack(struct flb_pack_state *state, - const char *js, - int *out_size, int *last_byte, - int *out_records) -{ - int i; - int flen; - int arr_size; - int records = 0; - const char *p; - char *buf; - const jsmntok_t *t; - msgpack_packer pck; - msgpack_sbuffer sbuf; - jsmntok_t *tokens; - - tokens = state->tokens; - arr_size = state->tokens_count; - - if (arr_size == 0) { - return NULL; - } - - /* initialize buffers */ - msgpack_sbuffer_init(&sbuf); - msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); - - for (i = 0; i < arr_size ; i++) { - t = &tokens[i]; - - if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { - break; - } - - if (t->parent == -1) { - *last_byte = t->end; - records++; - } - - flen = (t->end - t->start); - switch (t->type) { - case JSMN_OBJECT: - msgpack_pack_map(&pck, t->size); - break; - case JSMN_ARRAY: - msgpack_pack_array(&pck, t->size); - break; - case JSMN_STRING: - pack_string_token(state, js + t->start, flen, &pck); - break; - case JSMN_PRIMITIVE: - p = js + t->start; - if (*p == 'f') { - msgpack_pack_false(&pck); - } - else if (*p == 't') { - msgpack_pack_true(&pck); - } - else if (*p == 'n') { - msgpack_pack_nil(&pck); - } - else { - if (is_float(p, flen)) { - msgpack_pack_double(&pck, atof(p)); - } - else { - msgpack_pack_int64(&pck, atoll(p)); - } - } - break; - case JSMN_UNDEFINED: - msgpack_sbuffer_destroy(&sbuf); - return NULL; - } - } - - *out_size = sbuf.size; - *out_records = records; - buf = sbuf.data; - - return buf; -} - -/* - * It parse a JSON string and convert it to MessagePack format, this packer is - * useful when a complete JSON message exists, otherwise it will fail until - * the message is complete. - * - * This routine do not keep a state in the parser, do not use it for big - * JSON messages. - */ -static int pack_json_to_msgpack(const char *js, size_t len, char **buffer, - size_t *size, int *root_type, int *records, - size_t *consumed) -{ - int ret = -1; - int n_records; - int out; - int last; - char *buf = NULL; - struct flb_pack_state state; - - ret = flb_pack_state_init(&state); - if (ret != 0) { - return -1; - } - ret = flb_json_tokenise(js, len, &state); - if (ret != 0) { - ret = -1; - goto flb_pack_json_end; - } - - if (state.tokens_count == 0) { - ret = -1; - goto flb_pack_json_end; - } - - buf = tokens_to_msgpack(&state, js, &out, &last, &n_records); - if (!buf) { - ret = -1; - goto flb_pack_json_end; - } - - *root_type = state.tokens[0].type; - *size = out; - *buffer = buf; - *records = n_records; - - if (consumed != NULL) { - *consumed = last; - } - - ret = 0; - - flb_pack_json_end: - flb_pack_state_reset(&state); - return ret; -} - -/* Pack unlimited serialized JSON messages into msgpack */ -int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size, - int *root_type, size_t *consumed) -{ - int records; - - return pack_json_to_msgpack(js, len, buffer, size, root_type, &records, consumed); -} - -/* - * Pack unlimited serialized JSON messages into msgpack, finally it writes on - * 'out_records' the number of messages. - */ -int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size, - int *root_type, int *out_records, size_t *consumed) -{ - return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records, consumed); -} - -/* Initialize a JSON packer state */ -int flb_pack_state_init(struct flb_pack_state *s) -{ - int tokens = 256; - size_t size = 256; - - jsmn_init(&s->parser); - - size = sizeof(jsmntok_t) * tokens; - s->tokens = flb_malloc(size); - if (!s->tokens) { - flb_errno(); - return -1; - } - s->tokens_size = tokens; - s->tokens_count = 0; - s->last_byte = 0; - s->multiple = FLB_FALSE; - - s->buf_data = flb_malloc(size); - if (!s->buf_data) { - flb_errno(); - flb_free(s->tokens); - s->tokens = NULL; - return -1; - } - s->buf_size = size; - s->buf_len = 0; - - return 0; -} - -void flb_pack_state_reset(struct flb_pack_state *s) -{ - flb_free(s->tokens); - s->tokens = NULL; - s->tokens_size = 0; - s->tokens_count = 0; - s->last_byte = 0; - s->buf_size = 0; - flb_free(s->buf_data); - s->buf_data = NULL; -} - - -/* - * It parse a JSON string and convert it to MessagePack format. The main - * difference of this function and the previous flb_pack_json() is that it - * keeps a parser and tokens state, allowing to process big messages and - * resume the parsing process instead of start from zero. - */ -int flb_pack_json_state(const char *js, size_t len, - char **buffer, int *size, - struct flb_pack_state *state) -{ - int ret; - int out; - int delim = 0; - int last = 0; - int records; - char *buf; - jsmntok_t *t; - - ret = flb_json_tokenise(js, len, state); - state->multiple = FLB_TRUE; - if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) { - /* - * If the caller enabled 'multiple' flag, it means that the incoming - * JSON message may have multiple messages concatenated and likely - * the last one is only incomplete. - * - * The following routine aims to determinate how many JSON messages - * are OK in the array of tokens, if any, process them and adjust - * the JSMN context/buffers. - */ - - /* - * jsmn_parse updates jsmn_parser members. (state->parser) - * A member 'toknext' points next incomplete object token. - * We use toknext - 1 as an index of last member of complete JSON. - */ - int i; - int found = 0; - - if (state->parser.toknext == 0) { - return ret; - } - - for (i = (int)state->parser.toknext - 1; i >= 1; i--) { - t = &state->tokens[i]; - - if (t->parent == -1 && (t->end != 0)) { - found++; - delim = i; - break; - } - } - - if (found == 0) { - return ret; /* FLB_ERR_JSON_PART */ - } - state->tokens_count += delim; - } - else if (ret != 0) { - return ret; - } - - if (state->tokens_count == 0 || state->tokens == NULL) { - state->last_byte = last; - return FLB_ERR_JSON_INVAL; - } - - buf = tokens_to_msgpack(state, js, &out, &last, &records); - if (!buf) { - return -1; - } - - *size = out; - *buffer = buf; - state->last_byte = last; - - return 0; -} - -int flb_metadata_pop_from_msgpack(msgpack_object **metadata, msgpack_unpacked *upk, - msgpack_object **map) -{ - if (metadata == NULL || upk == NULL) { - return -1; - } - - if (upk->data.type != MSGPACK_OBJECT_ARRAY) { - return -1; - } - - *metadata = &upk->data.via.array.ptr[0].via.array.ptr[1]; - *map = &upk->data.via.array.ptr[1]; - - return 0; -} - -static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result) -{ - msgpack_object *metadata; - msgpack_object root; - msgpack_object *obj; - struct flb_time tms; - msgpack_object o; - - root = result.data; - if (root.type != MSGPACK_OBJECT_ARRAY) { - return -1; - } - - o = root.via.array.ptr[0]; - if (o.type != MSGPACK_OBJECT_ARRAY) { - return -1; - } - - /* decode expected timestamp only (integer, float or ext) */ - o = o.via.array.ptr[0]; - if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER && - o.type != MSGPACK_OBJECT_FLOAT && - o.type != MSGPACK_OBJECT_EXT) { - return -1; - } - - /* This is a Fluent Bit record, just do the proper unpacking/printing */ - flb_time_pop_from_msgpack(&tms, &result, &obj); - flb_metadata_pop_from_msgpack(&metadata, &result, &obj); - - fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt, - (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec); - - msgpack_object_print(stdout, *metadata); - - fprintf(stdout, ", "); - - msgpack_object_print(stdout, *obj); - - fprintf(stdout, "]\n"); - - return 0; -} - -void flb_pack_print(const char *data, size_t bytes) -{ - int ret; - msgpack_unpacked result; - size_t off = 0, cnt = 0; - - msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { - /* Check if we are processing an internal Fluent Bit record */ - ret = pack_print_fluent_record(cnt, result); - if (ret == 0) { - continue; - } - - printf("[%zd] ", cnt++); - msgpack_object_print(stdout, result.data); - printf("\n"); - } - msgpack_unpacked_destroy(&result); -} - -void flb_pack_print_metrics(const char *data, size_t bytes) -{ - int ret; - size_t off = 0; - cfl_sds_t text; - struct cmt *cmt = NULL; - - /* get cmetrics context */ - ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); - if (ret != 0) { - flb_error("could not process metrics payload"); - return; - } - - /* convert to text representation */ - text = cmt_encode_text_create(cmt); - - /* destroy cmt context */ - cmt_destroy(cmt); - - printf("%s", text); - fflush(stdout); - - cmt_encode_text_destroy(text); -} - -static inline int try_to_write(char *buf, int *off, size_t left, - const char *str, size_t str_len) -{ - if (str_len <= 0){ - str_len = strlen(str); - } - if (left <= *off+str_len) { - return FLB_FALSE; - } - memcpy(buf+*off, str, str_len); - *off += str_len; - return FLB_TRUE; -} - - -/* - * Check if a key exists in the map using the 'offset' as an index to define - * which element needs to start looking from - */ -static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset) -{ - int i; - msgpack_object p; - - if (key.type != MSGPACK_OBJECT_STR) { - return FLB_FALSE; - } - - for (i = offset; i < map.via.map.size; i++) { - p = map.via.map.ptr[i].key; - if (p.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (key.via.str.size != p.via.str.size) { - continue; - } - - if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -static int msgpack2json(char *buf, int *off, size_t left, - const msgpack_object *o) -{ - int i; - int dup; - int ret = FLB_FALSE; - int loop; - int packed; - - switch(o->type) { - case MSGPACK_OBJECT_NIL: - ret = try_to_write(buf, off, left, "null", 4); - break; - - case MSGPACK_OBJECT_BOOLEAN: - ret = try_to_write(buf, off, left, - (o->via.boolean ? "true":"false"),0); - - break; - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - { - char temp[32] = {0}; - i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64); - ret = try_to_write(buf, off, left, temp, i); - } - break; - - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - { - char temp[32] = {0}; - i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64); - ret = try_to_write(buf, off, left, temp, i); - } - break; - case MSGPACK_OBJECT_FLOAT32: - case MSGPACK_OBJECT_FLOAT64: - { - char temp[512] = {0}; - if (o->via.f64 == (double)(long long int)o->via.f64) { - i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64); - } - else if (convert_nan_to_null && isnan(o->via.f64) ) { - i = snprintf(temp, sizeof(temp)-1, "null"); - } - else { - i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64); - } - ret = try_to_write(buf, off, left, temp, i); - } - break; - - case MSGPACK_OBJECT_STR: - if (try_to_write(buf, off, left, "\"", 1) && - (o->via.str.size > 0 ? - try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size) - : 1/* nothing to do */) && - try_to_write(buf, off, left, "\"", 1)) { - ret = FLB_TRUE; - } - break; - - case MSGPACK_OBJECT_BIN: - if (try_to_write(buf, off, left, "\"", 1) && - (o->via.bin.size > 0 ? - try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size) - : 1 /* nothing to do */) && - try_to_write(buf, off, left, "\"", 1)) { - ret = FLB_TRUE; - } - break; - - case MSGPACK_OBJECT_EXT: - if (!try_to_write(buf, off, left, "\"", 1)) { - goto msg2json_end; - } - /* ext body. fortmat is similar to printf(1) */ - { - char temp[32] = {0}; - int len; - loop = o->via.ext.size; - for(i=0; i<loop; i++) { - len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]); - if (!try_to_write(buf, off, left, temp, len)) { - goto msg2json_end; - } - } - } - if (!try_to_write(buf, off, left, "\"", 1)) { - goto msg2json_end; - } - ret = FLB_TRUE; - break; - - case MSGPACK_OBJECT_ARRAY: - loop = o->via.array.size; - - if (!try_to_write(buf, off, left, "[", 1)) { - goto msg2json_end; - } - if (loop != 0) { - msgpack_object* p = o->via.array.ptr; - if (!msgpack2json(buf, off, left, p)) { - goto msg2json_end; - } - for (i=1; i<loop; i++) { - if (!try_to_write(buf, off, left, ",", 1) || - !msgpack2json(buf, off, left, p+i)) { - goto msg2json_end; - } - } - } - - ret = try_to_write(buf, off, left, "]", 1); - break; - - case MSGPACK_OBJECT_MAP: - loop = o->via.map.size; - if (!try_to_write(buf, off, left, "{", 1)) { - goto msg2json_end; - } - if (loop != 0) { - msgpack_object k; - msgpack_object_kv *p = o->via.map.ptr; - - packed = 0; - dup = FLB_FALSE; - - k = o->via.map.ptr[0].key; - for (i = 0; i < loop; i++) { - k = o->via.map.ptr[i].key; - dup = key_exists_in_map(k, *o, i + 1); - if (dup == FLB_TRUE) { - continue; - } - - if (packed > 0) { - if (!try_to_write(buf, off, left, ",", 1)) { - goto msg2json_end; - } - } - - if ( - !msgpack2json(buf, off, left, &(p+i)->key) || - !try_to_write(buf, off, left, ":", 1) || - !msgpack2json(buf, off, left, &(p+i)->val) ) { - goto msg2json_end; - } - packed++; - } - } - - ret = try_to_write(buf, off, left, "}", 1); - break; - - default: - flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type); - } - - msg2json_end: - return ret; -} - -/** - * convert msgpack to JSON string. - * This API is similar to snprintf. - * - * @param json_str The buffer to fill JSON string. - * @param json_size The size of json_str. - * @param data The msgpack_unpacked data. - * @return success ? a number characters filled : negative value - */ -int flb_msgpack_to_json(char *json_str, size_t json_size, - const msgpack_object *obj) -{ - int ret = -1; - int off = 0; - - if (json_str == NULL || obj == NULL) { - return -1; - } - - ret = msgpack2json(json_str, &off, json_size - 1, obj); - json_str[off] = '\0'; - return ret ? off: ret; -} - -flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size) -{ - int ret; - size_t off = 0; - size_t out_size; - size_t realloc_size; - - msgpack_unpacked result; - msgpack_object *root; - flb_sds_t out_buf; - flb_sds_t tmp_buf; - - /* buffer size strategy */ - out_size = in_size * FLB_MSGPACK_TO_JSON_INIT_BUFFER_SIZE; - realloc_size = in_size * FLB_MSGPACK_TO_JSON_REALLOC_BUFFER_SIZE; - if (realloc_size < 256) { - realloc_size = 256; - } - - out_buf = flb_sds_create_size(out_size); - if (!out_buf) { - flb_errno(); - return NULL; - } - - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, in_buf, in_size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - flb_sds_destroy(out_buf); - msgpack_unpacked_destroy(&result); - return NULL; - } - - root = &result.data; - while (1) { - ret = flb_msgpack_to_json(out_buf, out_size, root); - if (ret <= 0) { - tmp_buf = flb_sds_increase(out_buf, realloc_size); - if (tmp_buf) { - out_buf = tmp_buf; - out_size += realloc_size; - } - else { - flb_errno(); - flb_sds_destroy(out_buf); - msgpack_unpacked_destroy(&result); - return NULL; - } - } - else { - break; - } - } - - msgpack_unpacked_destroy(&result); - flb_sds_len_set(out_buf, ret); - - return out_buf; -} - -/* - * Given a 'format' string type, return it integer representation. This - * is used by output plugins that uses pack functions to convert - * msgpack records to JSON. - */ -int flb_pack_to_json_format_type(const char *str) -{ - if (strcasecmp(str, "msgpack") == 0) { - return FLB_PACK_JSON_FORMAT_NONE; - } - else if (strcasecmp(str, "json") == 0) { - return FLB_PACK_JSON_FORMAT_JSON; - } - else if (strcasecmp(str, "json_stream") == 0) { - return FLB_PACK_JSON_FORMAT_STREAM; - } - else if (strcasecmp(str, "json_lines") == 0) { - return FLB_PACK_JSON_FORMAT_LINES; - } - - return -1; -} - -/* Given a 'date string type', return it integer representation */ -int flb_pack_to_json_date_type(const char *str) -{ - if (strcasecmp(str, "double") == 0) { - return FLB_PACK_JSON_DATE_DOUBLE; - } - else if (strcasecmp(str, "java_sql_timestamp") == 0) { - return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP; - } - else if (strcasecmp(str, "iso8601") == 0) { - return FLB_PACK_JSON_DATE_ISO8601; - } - else if (strcasecmp(str, "epoch") == 0) { - return FLB_PACK_JSON_DATE_EPOCH; - } - else if (strcasecmp(str, "epoch_ms") == 0 || - strcasecmp(str, "epoch_millis") == 0 || - strcasecmp(str, "epoch_milliseconds") == 0) { - return FLB_PACK_JSON_DATE_EPOCH_MS; - } - - return -1; -} - - -static int msgpack_pack_formatted_datetime(flb_sds_t out_buf, char time_formatted[], int max_len, - msgpack_packer* tmp_pck, struct flb_time* tms, - const char *date_format, - const char *time_format) -{ - int len; - size_t s; - struct tm tm; - - gmtime_r(&tms->tm.tv_sec, &tm); - - s = strftime(time_formatted, max_len, - date_format, &tm); - if (!s) { - flb_debug("strftime failed in flb_pack_msgpack_to_json_format"); - return 1; - } - - /* Format the time, use microsecond precision not nanoseconds */ - max_len -= s; - len = snprintf(&time_formatted[s], - max_len, - time_format, - (uint64_t) tms->tm.tv_nsec / 1000); - if (len >= max_len) { - flb_debug("snprintf: %d >= %d in flb_pack_msgpack_to_json_format", len, max_len); - return 2; - } - s += len; - msgpack_pack_str(tmp_pck, s); - msgpack_pack_str_body(tmp_pck, time_formatted, s); - return 0; -} - -flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, - int json_format, int date_format, - flb_sds_t date_key) -{ - int i; - int ok = MSGPACK_UNPACK_SUCCESS; - int records = 0; - int map_size; - size_t off = 0; - char time_formatted[38]; - flb_sds_t out_tmp; - flb_sds_t out_js; - flb_sds_t out_buf = NULL; - msgpack_unpacked result; - msgpack_object root; - msgpack_object map; - msgpack_sbuffer tmp_sbuf; - msgpack_packer tmp_pck; - msgpack_object *obj; - msgpack_object *k; - msgpack_object *v; - struct flb_time tms; - - /* For json lines and streams mode we need a pre-allocated buffer */ - if (json_format == FLB_PACK_JSON_FORMAT_LINES || - json_format == FLB_PACK_JSON_FORMAT_STREAM) { - out_buf = flb_sds_create_size(bytes + bytes / 4); - if (!out_buf) { - flb_errno(); - return NULL; - } - } - - /* Create temporary msgpack buffer */ - msgpack_sbuffer_init(&tmp_sbuf); - msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - - /* - * If the format is the original msgpack style of one big array, - * registrate the array, otherwise is not necessary. FYI, original format: - * - * [ - * [timestamp, map], - * [timestamp, map], - * [T, M]... - * ] - */ - if (json_format == FLB_PACK_JSON_FORMAT_JSON) { - records = flb_mp_count(data, bytes); - if (records <= 0) { - msgpack_sbuffer_destroy(&tmp_sbuf); - return NULL; - } - msgpack_pack_array(&tmp_pck, records); - } - - msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, data, bytes, &off) == ok) { - /* Each array must have two entries: time and record */ - root = result.data; - if (root.type != MSGPACK_OBJECT_ARRAY) { - continue; - } - if (root.via.array.size != 2) { - continue; - } - - /* Unpack time */ - flb_time_pop_from_msgpack(&tms, &result, &obj); - - /* Get the record/map */ - map = root.via.array.ptr[1]; - if (map.type != MSGPACK_OBJECT_MAP) { - continue; - } - map_size = map.via.map.size; - - if (date_key != NULL) { - msgpack_pack_map(&tmp_pck, map_size + 1); - } - else { - msgpack_pack_map(&tmp_pck, map_size); - } - - if (date_key != NULL) { - /* Append date key */ - msgpack_pack_str(&tmp_pck, flb_sds_len(date_key)); - msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key)); - - /* Append date value */ - switch (date_format) { - case FLB_PACK_JSON_DATE_DOUBLE: - msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms)); - break; - case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP: - if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, - FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) { - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); - return NULL; - } - break; - case FLB_PACK_JSON_DATE_ISO8601: - if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, - FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) { - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); - return NULL; - } - break; - case FLB_PACK_JSON_DATE_EPOCH: - msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec)); - break; - case FLB_PACK_JSON_DATE_EPOCH_MS: - msgpack_pack_uint64(&tmp_pck, flb_time_to_millisec(&tms)); - break; - } - } - - /* Append remaining keys/values */ - for (i = 0; i < map_size; i++) { - k = &map.via.map.ptr[i].key; - v = &map.via.map.ptr[i].val; - msgpack_pack_object(&tmp_pck, *k); - msgpack_pack_object(&tmp_pck, *v); - } - - /* - * If the format is the original msgpack style, just continue since - * we don't care about separator or JSON convertion at this point. - */ - if (json_format == FLB_PACK_JSON_FORMAT_JSON) { - continue; - } - - /* - * Here we handle two types of records concatenation: - * - * FLB_PACK_JSON_FORMAT_LINES: add breakline (\n) after each record - * - * - * {'ts':abc,'k1':1} - * {'ts':abc,'k1':2} - * {N} - * - * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g: - * - * {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N} - */ - if (json_format == FLB_PACK_JSON_FORMAT_LINES || - json_format == FLB_PACK_JSON_FORMAT_STREAM) { - - /* Encode current record into JSON in a temporary variable */ - out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); - if (!out_js) { - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); - return NULL; - } - - /* - * One map record has been converted, now append it to the - * outgoing out_buf sds variable. - */ - out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js)); - if (!out_tmp) { - flb_sds_destroy(out_js); - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); - return NULL; - } - - /* Release temporary json sds buffer */ - flb_sds_destroy(out_js); - - /* If a realloc happened, check the returned address */ - if (out_tmp != out_buf) { - out_buf = out_tmp; - } - - /* Append the breakline only for json lines mode */ - if (json_format == FLB_PACK_JSON_FORMAT_LINES) { - out_tmp = flb_sds_cat(out_buf, "\n", 1); - if (!out_tmp) { - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); - return NULL; - } - if (out_tmp != out_buf) { - out_buf = out_tmp; - } - } - msgpack_sbuffer_clear(&tmp_sbuf); - } - } - - /* Release the unpacker */ - msgpack_unpacked_destroy(&result); - - /* Format to JSON */ - if (json_format == FLB_PACK_JSON_FORMAT_JSON) { - out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); - msgpack_sbuffer_destroy(&tmp_sbuf); - - if (!out_buf) { - return NULL; - } - } - else { - msgpack_sbuffer_destroy(&tmp_sbuf); - } - - if (out_buf && flb_sds_len(out_buf) == 0) { - flb_sds_destroy(out_buf); - return NULL; - } - - return out_buf; -} - -/** - * convert msgpack to JSON string. - * This API is similar to snprintf. - * @param size Estimated length of json str. - * @param data The msgpack_unpacked data. - * @return success ? allocated json str ptr : NULL - */ -char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj) -{ - int ret; - char *buf = NULL; - char *tmp; - - if (obj == NULL) { - return NULL; - } - - if (size <= 0) { - size = 128; - } - - buf = flb_malloc(size); - if (!buf) { - flb_errno(); - return NULL; - } - - while (1) { - ret = flb_msgpack_to_json(buf, size, obj); - if (ret <= 0) { - /* buffer is small. retry.*/ - size += 128; - tmp = flb_realloc(buf, size); - if (tmp) { - buf = tmp; - } - else { - flb_free(buf); - flb_errno(); - return NULL; - } - } - else { - break; - } - } - - return buf; -} - -int flb_pack_time_now(msgpack_packer *pck) -{ - int ret; - struct flb_time t; - - flb_time_get(&t); - ret = flb_time_append_to_msgpack(&t, pck, 0); - - return ret; -} - -int flb_msgpack_expand_map(char *map_data, size_t map_size, - msgpack_object_kv **kv_arr, int kv_arr_len, - char** out_buf, int* out_size) -{ - msgpack_sbuffer sbuf; - msgpack_packer pck; - msgpack_unpacked result; - size_t off = 0; - char *ret_buf; - int map_num; - int i; - int len; - - if (map_data == NULL){ - return -1; - } - - msgpack_unpacked_init(&result); - if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) != - MSGPACK_UNPACK_SUCCESS ) { - msgpack_unpacked_destroy(&result); - return -1; - } - if (result.data.type != MSGPACK_OBJECT_MAP) { - msgpack_unpacked_destroy(&result); - return -1; - } - - len = result.data.via.map.size; - map_num = kv_arr_len + len; - - msgpack_sbuffer_init(&sbuf); - msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); - msgpack_pack_map(&pck, map_num); - - for (i=0; i<len; i++) { - msgpack_pack_object(&pck, result.data.via.map.ptr[i].key); - msgpack_pack_object(&pck, result.data.via.map.ptr[i].val); - } - for (i=0; i<kv_arr_len; i++){ - msgpack_pack_object(&pck, kv_arr[i]->key); - msgpack_pack_object(&pck, kv_arr[i]->val); - } - msgpack_unpacked_destroy(&result); - - *out_size = sbuf.size; - ret_buf = flb_malloc(sbuf.size); - *out_buf = ret_buf; - if (*out_buf == NULL) { - flb_errno(); - msgpack_sbuffer_destroy(&sbuf); - return -1; - } - memcpy(*out_buf, sbuf.data, sbuf.size); - msgpack_sbuffer_destroy(&sbuf); - - return 0; -} - -int flb_pack_init(struct flb_config *config) -{ - int ret; - - if (config == NULL) { - return -1; - } - ret = flb_pack_set_null_as_nan(config->convert_nan_to_null); - - return ret; -} |