summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_pack.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/src/flb_pack.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_pack.c')
-rw-r--r--fluent-bit/src/flb_pack.c1270
1 files changed, 1270 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_pack.c b/fluent-bit/src/flb_pack.c
new file mode 100644
index 00000000..adcaa22c
--- /dev/null
+++ b/fluent-bit/src/flb_pack.c
@@ -0,0 +1,1270 @@
+/*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_unescape.h>
+
+/* cmetrics */
+#include <cmetrics/cmetrics.h>
+#include <cmetrics/cmt_decode_msgpack.h>
+#include <cmetrics/cmt_encode_text.h>
+
+#include <msgpack.h>
+#include <math.h>
+#include <jsmn/jsmn.h>
+
+#define try_to_write_str flb_utils_write_str
+
+static int convert_nan_to_null = FLB_FALSE;
+
+static int flb_pack_set_null_as_nan(int b) {
+ if (b == FLB_TRUE || b == FLB_FALSE) {
+ convert_nan_to_null = b;
+ }
+ return convert_nan_to_null;
+}
+
+int flb_json_tokenise(const char *js, size_t len,
+ struct flb_pack_state *state)
+{
+ int ret;
+ int new_tokens = 256;
+ size_t old_size;
+ size_t new_size;
+ void *tmp;
+
+ ret = jsmn_parse(&state->parser, js, len,
+ state->tokens, state->tokens_size);
+ while (ret == JSMN_ERROR_NOMEM) {
+ /* Get current size of the array in bytes */
+ old_size = state->tokens_size * sizeof(jsmntok_t);
+
+ /* New size: add capacity for new 256 entries */
+ new_size = old_size + (sizeof(jsmntok_t) * new_tokens);
+
+ tmp = flb_realloc(state->tokens, new_size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ state->tokens = tmp;
+ state->tokens_size += new_tokens;
+
+ ret = jsmn_parse(&state->parser, js, len,
+ state->tokens, state->tokens_size);
+ }
+
+ if (ret == JSMN_ERROR_INVAL) {
+ return FLB_ERR_JSON_INVAL;
+ }
+
+ if (ret == JSMN_ERROR_PART) {
+ /* This is a partial JSON message, just stop */
+ flb_trace("[json tokenise] incomplete");
+ return FLB_ERR_JSON_PART;
+ }
+
+ state->tokens_count += ret;
+ return 0;
+}
+
+static inline int is_float(const char *buf, int len)
+{
+ const char *end = buf + len;
+ const char *p = buf;
+
+ while (p <= end) {
+ if (*p == 'e' && p < end && *(p + 1) == '-') {
+ return 1;
+ }
+ else if (*p == '.') {
+ return 1;
+ }
+ p++;
+ }
+
+ return 0;
+}
+
+/* Sanitize incoming JSON string */
+static inline int pack_string_token(struct flb_pack_state *state,
+ const char *str, int len,
+ msgpack_packer *pck)
+{
+ int s;
+ int out_len;
+ char *tmp;
+ char *out_buf;
+
+ if (state->buf_size < len + 1) {
+ s = len + 1;
+ tmp = flb_realloc(state->buf_data, s);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ else {
+ state->buf_data = tmp;
+ state->buf_size = s;
+ }
+ }
+ out_buf = state->buf_data;
+
+ /* Always decode any UTF-8 or special characters */
+ out_len = flb_unescape_string_utf8(str, len, out_buf);
+
+ /* Pack decoded text */
+ msgpack_pack_str(pck, out_len);
+ msgpack_pack_str_body(pck, out_buf, out_len);
+
+ return out_len;
+}
+
+/* Receive a tokenized JSON message and convert it to MsgPack */
+static char *tokens_to_msgpack(struct flb_pack_state *state,
+ const char *js,
+ int *out_size, int *last_byte,
+ int *out_records)
+{
+ int i;
+ int flen;
+ int arr_size;
+ int records = 0;
+ const char *p;
+ char *buf;
+ const jsmntok_t *t;
+ msgpack_packer pck;
+ msgpack_sbuffer sbuf;
+ jsmntok_t *tokens;
+
+ tokens = state->tokens;
+ arr_size = state->tokens_count;
+
+ if (arr_size == 0) {
+ return NULL;
+ }
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&sbuf);
+ msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
+
+ for (i = 0; i < arr_size ; i++) {
+ t = &tokens[i];
+
+ if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
+ break;
+ }
+
+ if (t->parent == -1) {
+ *last_byte = t->end;
+ records++;
+ }
+
+ flen = (t->end - t->start);
+ switch (t->type) {
+ case JSMN_OBJECT:
+ msgpack_pack_map(&pck, t->size);
+ break;
+ case JSMN_ARRAY:
+ msgpack_pack_array(&pck, t->size);
+ break;
+ case JSMN_STRING:
+ pack_string_token(state, js + t->start, flen, &pck);
+ break;
+ case JSMN_PRIMITIVE:
+ p = js + t->start;
+ if (*p == 'f') {
+ msgpack_pack_false(&pck);
+ }
+ else if (*p == 't') {
+ msgpack_pack_true(&pck);
+ }
+ else if (*p == 'n') {
+ msgpack_pack_nil(&pck);
+ }
+ else {
+ if (is_float(p, flen)) {
+ msgpack_pack_double(&pck, atof(p));
+ }
+ else {
+ msgpack_pack_int64(&pck, atoll(p));
+ }
+ }
+ break;
+ case JSMN_UNDEFINED:
+ msgpack_sbuffer_destroy(&sbuf);
+ return NULL;
+ }
+ }
+
+ *out_size = sbuf.size;
+ *out_records = records;
+ buf = sbuf.data;
+
+ return buf;
+}
+
+/*
+ * It parse a JSON string and convert it to MessagePack format, this packer is
+ * useful when a complete JSON message exists, otherwise it will fail until
+ * the message is complete.
+ *
+ * This routine do not keep a state in the parser, do not use it for big
+ * JSON messages.
+ */
+static int pack_json_to_msgpack(const char *js, size_t len, char **buffer,
+ size_t *size, int *root_type, int *records,
+ size_t *consumed)
+{
+ int ret = -1;
+ int n_records;
+ int out;
+ int last;
+ char *buf = NULL;
+ struct flb_pack_state state;
+
+ ret = flb_pack_state_init(&state);
+ if (ret != 0) {
+ return -1;
+ }
+ ret = flb_json_tokenise(js, len, &state);
+ if (ret != 0) {
+ ret = -1;
+ goto flb_pack_json_end;
+ }
+
+ if (state.tokens_count == 0) {
+ ret = -1;
+ goto flb_pack_json_end;
+ }
+
+ buf = tokens_to_msgpack(&state, js, &out, &last, &n_records);
+ if (!buf) {
+ ret = -1;
+ goto flb_pack_json_end;
+ }
+
+ *root_type = state.tokens[0].type;
+ *size = out;
+ *buffer = buf;
+ *records = n_records;
+
+ if (consumed != NULL) {
+ *consumed = last;
+ }
+
+ ret = 0;
+
+ flb_pack_json_end:
+ flb_pack_state_reset(&state);
+ return ret;
+}
+
+/* Pack unlimited serialized JSON messages into msgpack */
+int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
+ int *root_type, size_t *consumed)
+{
+ int records;
+
+ return pack_json_to_msgpack(js, len, buffer, size, root_type, &records, consumed);
+}
+
+/*
+ * Pack unlimited serialized JSON messages into msgpack, finally it writes on
+ * 'out_records' the number of messages.
+ */
+int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
+ int *root_type, int *out_records, size_t *consumed)
+{
+ return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records, consumed);
+}
+
+/* Initialize a JSON packer state */
+int flb_pack_state_init(struct flb_pack_state *s)
+{
+ int tokens = 256;
+ size_t size = 256;
+
+ jsmn_init(&s->parser);
+
+ size = sizeof(jsmntok_t) * tokens;
+ s->tokens = flb_malloc(size);
+ if (!s->tokens) {
+ flb_errno();
+ return -1;
+ }
+ s->tokens_size = tokens;
+ s->tokens_count = 0;
+ s->last_byte = 0;
+ s->multiple = FLB_FALSE;
+
+ s->buf_data = flb_malloc(size);
+ if (!s->buf_data) {
+ flb_errno();
+ flb_free(s->tokens);
+ s->tokens = NULL;
+ return -1;
+ }
+ s->buf_size = size;
+ s->buf_len = 0;
+
+ return 0;
+}
+
+void flb_pack_state_reset(struct flb_pack_state *s)
+{
+ flb_free(s->tokens);
+ s->tokens = NULL;
+ s->tokens_size = 0;
+ s->tokens_count = 0;
+ s->last_byte = 0;
+ s->buf_size = 0;
+ flb_free(s->buf_data);
+ s->buf_data = NULL;
+}
+
+
+/*
+ * It parse a JSON string and convert it to MessagePack format. The main
+ * difference of this function and the previous flb_pack_json() is that it
+ * keeps a parser and tokens state, allowing to process big messages and
+ * resume the parsing process instead of start from zero.
+ */
+int flb_pack_json_state(const char *js, size_t len,
+ char **buffer, int *size,
+ struct flb_pack_state *state)
+{
+ int ret;
+ int out;
+ int delim = 0;
+ int last = 0;
+ int records;
+ char *buf;
+ jsmntok_t *t;
+
+ ret = flb_json_tokenise(js, len, state);
+ state->multiple = FLB_TRUE;
+ if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) {
+ /*
+ * If the caller enabled 'multiple' flag, it means that the incoming
+ * JSON message may have multiple messages concatenated and likely
+ * the last one is only incomplete.
+ *
+ * The following routine aims to determinate how many JSON messages
+ * are OK in the array of tokens, if any, process them and adjust
+ * the JSMN context/buffers.
+ */
+
+ /*
+ * jsmn_parse updates jsmn_parser members. (state->parser)
+ * A member 'toknext' points next incomplete object token.
+ * We use toknext - 1 as an index of last member of complete JSON.
+ */
+ int i;
+ int found = 0;
+
+ if (state->parser.toknext == 0) {
+ return ret;
+ }
+
+ for (i = (int)state->parser.toknext - 1; i >= 1; i--) {
+ t = &state->tokens[i];
+
+ if (t->parent == -1 && (t->end != 0)) {
+ found++;
+ delim = i;
+ break;
+ }
+ }
+
+ if (found == 0) {
+ return ret; /* FLB_ERR_JSON_PART */
+ }
+ state->tokens_count += delim;
+ }
+ else if (ret != 0) {
+ return ret;
+ }
+
+ if (state->tokens_count == 0 || state->tokens == NULL) {
+ state->last_byte = last;
+ return FLB_ERR_JSON_INVAL;
+ }
+
+ buf = tokens_to_msgpack(state, js, &out, &last, &records);
+ if (!buf) {
+ return -1;
+ }
+
+ *size = out;
+ *buffer = buf;
+ state->last_byte = last;
+
+ return 0;
+}
+
+int flb_metadata_pop_from_msgpack(msgpack_object **metadata, msgpack_unpacked *upk,
+ msgpack_object **map)
+{
+ if (metadata == NULL || upk == NULL) {
+ return -1;
+ }
+
+ if (upk->data.type != MSGPACK_OBJECT_ARRAY) {
+ return -1;
+ }
+
+ *metadata = &upk->data.via.array.ptr[0].via.array.ptr[1];
+ *map = &upk->data.via.array.ptr[1];
+
+ return 0;
+}
+
+static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result)
+{
+ msgpack_object *metadata;
+ msgpack_object root;
+ msgpack_object *obj;
+ struct flb_time tms;
+ msgpack_object o;
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_ARRAY) {
+ return -1;
+ }
+
+ o = root.via.array.ptr[0];
+ if (o.type != MSGPACK_OBJECT_ARRAY) {
+ return -1;
+ }
+
+ /* decode expected timestamp only (integer, float or ext) */
+ o = o.via.array.ptr[0];
+ if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
+ o.type != MSGPACK_OBJECT_FLOAT &&
+ o.type != MSGPACK_OBJECT_EXT) {
+ return -1;
+ }
+
+ /* This is a Fluent Bit record, just do the proper unpacking/printing */
+ flb_time_pop_from_msgpack(&tms, &result, &obj);
+ flb_metadata_pop_from_msgpack(&metadata, &result, &obj);
+
+ fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt,
+ (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec);
+
+ msgpack_object_print(stdout, *metadata);
+
+ fprintf(stdout, ", ");
+
+ msgpack_object_print(stdout, *obj);
+
+ fprintf(stdout, "]\n");
+
+ return 0;
+}
+
+void flb_pack_print(const char *data, size_t bytes)
+{
+ int ret;
+ msgpack_unpacked result;
+ size_t off = 0, cnt = 0;
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
+ /* Check if we are processing an internal Fluent Bit record */
+ ret = pack_print_fluent_record(cnt, result);
+ if (ret == 0) {
+ continue;
+ }
+
+ printf("[%zd] ", cnt++);
+ msgpack_object_print(stdout, result.data);
+ printf("\n");
+ }
+ msgpack_unpacked_destroy(&result);
+}
+
+void flb_pack_print_metrics(const char *data, size_t bytes)
+{
+ int ret;
+ size_t off = 0;
+ cfl_sds_t text;
+ struct cmt *cmt = NULL;
+
+ /* get cmetrics context */
+ ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
+ if (ret != 0) {
+ flb_error("could not process metrics payload");
+ return;
+ }
+
+ /* convert to text representation */
+ text = cmt_encode_text_create(cmt);
+
+ /* destroy cmt context */
+ cmt_destroy(cmt);
+
+ printf("%s", text);
+ fflush(stdout);
+
+ cmt_encode_text_destroy(text);
+}
+
+static inline int try_to_write(char *buf, int *off, size_t left,
+ const char *str, size_t str_len)
+{
+ if (str_len <= 0){
+ str_len = strlen(str);
+ }
+ if (left <= *off+str_len) {
+ return FLB_FALSE;
+ }
+ memcpy(buf+*off, str, str_len);
+ *off += str_len;
+ return FLB_TRUE;
+}
+
+
+/*
+ * Check if a key exists in the map using the 'offset' as an index to define
+ * which element needs to start looking from
+ */
+static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset)
+{
+ int i;
+ msgpack_object p;
+
+ if (key.type != MSGPACK_OBJECT_STR) {
+ return FLB_FALSE;
+ }
+
+ for (i = offset; i < map.via.map.size; i++) {
+ p = map.via.map.ptr[i].key;
+ if (p.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (key.via.str.size != p.via.str.size) {
+ continue;
+ }
+
+ if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static int msgpack2json(char *buf, int *off, size_t left,
+ const msgpack_object *o)
+{
+ int i;
+ int dup;
+ int ret = FLB_FALSE;
+ int loop;
+ int packed;
+
+ switch(o->type) {
+ case MSGPACK_OBJECT_NIL:
+ ret = try_to_write(buf, off, left, "null", 4);
+ break;
+
+ case MSGPACK_OBJECT_BOOLEAN:
+ ret = try_to_write(buf, off, left,
+ (o->via.boolean ? "true":"false"),0);
+
+ break;
+
+ case MSGPACK_OBJECT_POSITIVE_INTEGER:
+ {
+ char temp[32] = {0};
+ i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64);
+ ret = try_to_write(buf, off, left, temp, i);
+ }
+ break;
+
+ case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+ {
+ char temp[32] = {0};
+ i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64);
+ ret = try_to_write(buf, off, left, temp, i);
+ }
+ break;
+ case MSGPACK_OBJECT_FLOAT32:
+ case MSGPACK_OBJECT_FLOAT64:
+ {
+ char temp[512] = {0};
+ if (o->via.f64 == (double)(long long int)o->via.f64) {
+ i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64);
+ }
+ else if (convert_nan_to_null && isnan(o->via.f64) ) {
+ i = snprintf(temp, sizeof(temp)-1, "null");
+ }
+ else {
+ i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64);
+ }
+ ret = try_to_write(buf, off, left, temp, i);
+ }
+ break;
+
+ case MSGPACK_OBJECT_STR:
+ if (try_to_write(buf, off, left, "\"", 1) &&
+ (o->via.str.size > 0 ?
+ try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size)
+ : 1/* nothing to do */) &&
+ try_to_write(buf, off, left, "\"", 1)) {
+ ret = FLB_TRUE;
+ }
+ break;
+
+ case MSGPACK_OBJECT_BIN:
+ if (try_to_write(buf, off, left, "\"", 1) &&
+ (o->via.bin.size > 0 ?
+ try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size)
+ : 1 /* nothing to do */) &&
+ try_to_write(buf, off, left, "\"", 1)) {
+ ret = FLB_TRUE;
+ }
+ break;
+
+ case MSGPACK_OBJECT_EXT:
+ if (!try_to_write(buf, off, left, "\"", 1)) {
+ goto msg2json_end;
+ }
+ /* ext body. fortmat is similar to printf(1) */
+ {
+ char temp[32] = {0};
+ int len;
+ loop = o->via.ext.size;
+ for(i=0; i<loop; i++) {
+ len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]);
+ if (!try_to_write(buf, off, left, temp, len)) {
+ goto msg2json_end;
+ }
+ }
+ }
+ if (!try_to_write(buf, off, left, "\"", 1)) {
+ goto msg2json_end;
+ }
+ ret = FLB_TRUE;
+ break;
+
+ case MSGPACK_OBJECT_ARRAY:
+ loop = o->via.array.size;
+
+ if (!try_to_write(buf, off, left, "[", 1)) {
+ goto msg2json_end;
+ }
+ if (loop != 0) {
+ msgpack_object* p = o->via.array.ptr;
+ if (!msgpack2json(buf, off, left, p)) {
+ goto msg2json_end;
+ }
+ for (i=1; i<loop; i++) {
+ if (!try_to_write(buf, off, left, ",", 1) ||
+ !msgpack2json(buf, off, left, p+i)) {
+ goto msg2json_end;
+ }
+ }
+ }
+
+ ret = try_to_write(buf, off, left, "]", 1);
+ break;
+
+ case MSGPACK_OBJECT_MAP:
+ loop = o->via.map.size;
+ if (!try_to_write(buf, off, left, "{", 1)) {
+ goto msg2json_end;
+ }
+ if (loop != 0) {
+ msgpack_object k;
+ msgpack_object_kv *p = o->via.map.ptr;
+
+ packed = 0;
+ dup = FLB_FALSE;
+
+ k = o->via.map.ptr[0].key;
+ for (i = 0; i < loop; i++) {
+ k = o->via.map.ptr[i].key;
+ dup = key_exists_in_map(k, *o, i + 1);
+ if (dup == FLB_TRUE) {
+ continue;
+ }
+
+ if (packed > 0) {
+ if (!try_to_write(buf, off, left, ",", 1)) {
+ goto msg2json_end;
+ }
+ }
+
+ if (
+ !msgpack2json(buf, off, left, &(p+i)->key) ||
+ !try_to_write(buf, off, left, ":", 1) ||
+ !msgpack2json(buf, off, left, &(p+i)->val) ) {
+ goto msg2json_end;
+ }
+ packed++;
+ }
+ }
+
+ ret = try_to_write(buf, off, left, "}", 1);
+ break;
+
+ default:
+ flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type);
+ }
+
+ msg2json_end:
+ return ret;
+}
+
+/**
+ * convert msgpack to JSON string.
+ * This API is similar to snprintf.
+ *
+ * @param json_str The buffer to fill JSON string.
+ * @param json_size The size of json_str.
+ * @param data The msgpack_unpacked data.
+ * @return success ? a number characters filled : negative value
+ */
+int flb_msgpack_to_json(char *json_str, size_t json_size,
+ const msgpack_object *obj)
+{
+ int ret = -1;
+ int off = 0;
+
+ if (json_str == NULL || obj == NULL) {
+ return -1;
+ }
+
+ ret = msgpack2json(json_str, &off, json_size - 1, obj);
+ json_str[off] = '\0';
+ return ret ? off: ret;
+}
+
+flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size)
+{
+ int ret;
+ size_t off = 0;
+ size_t out_size;
+ size_t realloc_size;
+
+ msgpack_unpacked result;
+ msgpack_object *root;
+ flb_sds_t out_buf;
+ flb_sds_t tmp_buf;
+
+ /* buffer size strategy */
+ out_size = in_size * FLB_MSGPACK_TO_JSON_INIT_BUFFER_SIZE;
+ realloc_size = in_size * FLB_MSGPACK_TO_JSON_REALLOC_BUFFER_SIZE;
+ if (realloc_size < 256) {
+ realloc_size = 256;
+ }
+
+ out_buf = flb_sds_create_size(out_size);
+ if (!out_buf) {
+ flb_errno();
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, in_buf, in_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_sds_destroy(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ root = &result.data;
+ while (1) {
+ ret = flb_msgpack_to_json(out_buf, out_size, root);
+ if (ret <= 0) {
+ tmp_buf = flb_sds_increase(out_buf, realloc_size);
+ if (tmp_buf) {
+ out_buf = tmp_buf;
+ out_size += realloc_size;
+ }
+ else {
+ flb_errno();
+ flb_sds_destroy(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+ flb_sds_len_set(out_buf, ret);
+
+ return out_buf;
+}
+
+/*
+ * Given a 'format' string type, return it integer representation. This
+ * is used by output plugins that uses pack functions to convert
+ * msgpack records to JSON.
+ */
+int flb_pack_to_json_format_type(const char *str)
+{
+ if (strcasecmp(str, "msgpack") == 0) {
+ return FLB_PACK_JSON_FORMAT_NONE;
+ }
+ else if (strcasecmp(str, "json") == 0) {
+ return FLB_PACK_JSON_FORMAT_JSON;
+ }
+ else if (strcasecmp(str, "json_stream") == 0) {
+ return FLB_PACK_JSON_FORMAT_STREAM;
+ }
+ else if (strcasecmp(str, "json_lines") == 0) {
+ return FLB_PACK_JSON_FORMAT_LINES;
+ }
+
+ return -1;
+}
+
+/* Given a 'date string type', return it integer representation */
+int flb_pack_to_json_date_type(const char *str)
+{
+ if (strcasecmp(str, "double") == 0) {
+ return FLB_PACK_JSON_DATE_DOUBLE;
+ }
+ else if (strcasecmp(str, "java_sql_timestamp") == 0) {
+ return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP;
+ }
+ else if (strcasecmp(str, "iso8601") == 0) {
+ return FLB_PACK_JSON_DATE_ISO8601;
+ }
+ else if (strcasecmp(str, "epoch") == 0) {
+ return FLB_PACK_JSON_DATE_EPOCH;
+ }
+ else if (strcasecmp(str, "epoch_ms") == 0 ||
+ strcasecmp(str, "epoch_millis") == 0 ||
+ strcasecmp(str, "epoch_milliseconds") == 0) {
+ return FLB_PACK_JSON_DATE_EPOCH_MS;
+ }
+
+ return -1;
+}
+
+
+static int msgpack_pack_formatted_datetime(flb_sds_t out_buf, char time_formatted[], int max_len,
+ msgpack_packer* tmp_pck, struct flb_time* tms,
+ const char *date_format,
+ const char *time_format)
+{
+ int len;
+ size_t s;
+ struct tm tm;
+
+ gmtime_r(&tms->tm.tv_sec, &tm);
+
+ s = strftime(time_formatted, max_len,
+ date_format, &tm);
+ if (!s) {
+ flb_debug("strftime failed in flb_pack_msgpack_to_json_format");
+ return 1;
+ }
+
+ /* Format the time, use microsecond precision not nanoseconds */
+ max_len -= s;
+ len = snprintf(&time_formatted[s],
+ max_len,
+ time_format,
+ (uint64_t) tms->tm.tv_nsec / 1000);
+ if (len >= max_len) {
+ flb_debug("snprintf: %d >= %d in flb_pack_msgpack_to_json_format", len, max_len);
+ return 2;
+ }
+ s += len;
+ msgpack_pack_str(tmp_pck, s);
+ msgpack_pack_str_body(tmp_pck, time_formatted, s);
+ return 0;
+}
+
+flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
+ int json_format, int date_format,
+ flb_sds_t date_key)
+{
+ int i;
+ int ok = MSGPACK_UNPACK_SUCCESS;
+ int records = 0;
+ int map_size;
+ size_t off = 0;
+ char time_formatted[38];
+ flb_sds_t out_tmp;
+ flb_sds_t out_js;
+ flb_sds_t out_buf = NULL;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object map;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ msgpack_object *obj;
+ msgpack_object *k;
+ msgpack_object *v;
+ struct flb_time tms;
+
+ /* For json lines and streams mode we need a pre-allocated buffer */
+ if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
+ json_format == FLB_PACK_JSON_FORMAT_STREAM) {
+ out_buf = flb_sds_create_size(bytes + bytes / 4);
+ if (!out_buf) {
+ flb_errno();
+ return NULL;
+ }
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ /*
+ * If the format is the original msgpack style of one big array,
+ * registrate the array, otherwise is not necessary. FYI, original format:
+ *
+ * [
+ * [timestamp, map],
+ * [timestamp, map],
+ * [T, M]...
+ * ]
+ */
+ if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
+ records = flb_mp_count(data, bytes);
+ if (records <= 0) {
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ return NULL;
+ }
+ msgpack_pack_array(&tmp_pck, records);
+ }
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
+ /* Each array must have two entries: time and record */
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_ARRAY) {
+ continue;
+ }
+ if (root.via.array.size != 2) {
+ continue;
+ }
+
+ /* Unpack time */
+ flb_time_pop_from_msgpack(&tms, &result, &obj);
+
+ /* Get the record/map */
+ map = root.via.array.ptr[1];
+ if (map.type != MSGPACK_OBJECT_MAP) {
+ continue;
+ }
+ map_size = map.via.map.size;
+
+ if (date_key != NULL) {
+ msgpack_pack_map(&tmp_pck, map_size + 1);
+ }
+ else {
+ msgpack_pack_map(&tmp_pck, map_size);
+ }
+
+ if (date_key != NULL) {
+ /* Append date key */
+ msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
+ msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));
+
+ /* Append date value */
+ switch (date_format) {
+ case FLB_PACK_JSON_DATE_DOUBLE:
+ msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
+ break;
+ case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP:
+ if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms,
+ FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) {
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+ break;
+ case FLB_PACK_JSON_DATE_ISO8601:
+ if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms,
+ FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) {
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+ break;
+ case FLB_PACK_JSON_DATE_EPOCH:
+ msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
+ break;
+ case FLB_PACK_JSON_DATE_EPOCH_MS:
+ msgpack_pack_uint64(&tmp_pck, flb_time_to_millisec(&tms));
+ break;
+ }
+ }
+
+ /* Append remaining keys/values */
+ for (i = 0; i < map_size; i++) {
+ k = &map.via.map.ptr[i].key;
+ v = &map.via.map.ptr[i].val;
+ msgpack_pack_object(&tmp_pck, *k);
+ msgpack_pack_object(&tmp_pck, *v);
+ }
+
+ /*
+ * If the format is the original msgpack style, just continue since
+ * we don't care about separator or JSON convertion at this point.
+ */
+ if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
+ continue;
+ }
+
+ /*
+ * Here we handle two types of records concatenation:
+ *
+ * FLB_PACK_JSON_FORMAT_LINES: add breakline (\n) after each record
+ *
+ *
+ * {'ts':abc,'k1':1}
+ * {'ts':abc,'k1':2}
+ * {N}
+ *
+ * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g:
+ *
+ * {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N}
+ */
+ if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
+ json_format == FLB_PACK_JSON_FORMAT_STREAM) {
+
+ /* Encode current record into JSON in a temporary variable */
+ out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
+ if (!out_js) {
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ /*
+ * One map record has been converted, now append it to the
+ * outgoing out_buf sds variable.
+ */
+ out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js));
+ if (!out_tmp) {
+ flb_sds_destroy(out_js);
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ /* Release temporary json sds buffer */
+ flb_sds_destroy(out_js);
+
+ /* If a realloc happened, check the returned address */
+ if (out_tmp != out_buf) {
+ out_buf = out_tmp;
+ }
+
+ /* Append the breakline only for json lines mode */
+ if (json_format == FLB_PACK_JSON_FORMAT_LINES) {
+ out_tmp = flb_sds_cat(out_buf, "\n", 1);
+ if (!out_tmp) {
+ flb_sds_destroy(out_buf);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+ if (out_tmp != out_buf) {
+ out_buf = out_tmp;
+ }
+ }
+ msgpack_sbuffer_clear(&tmp_sbuf);
+ }
+ }
+
+ /* Release the unpacker */
+ msgpack_unpacked_destroy(&result);
+
+ /* Format to JSON */
+ if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
+ out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+
+ if (!out_buf) {
+ return NULL;
+ }
+ }
+ else {
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ }
+
+ if (out_buf && flb_sds_len(out_buf) == 0) {
+ flb_sds_destroy(out_buf);
+ return NULL;
+ }
+
+ return out_buf;
+}
+
+/**
+ * convert msgpack to JSON string.
+ * This API is similar to snprintf.
+ * @param size Estimated length of json str.
+ * @param data The msgpack_unpacked data.
+ * @return success ? allocated json str ptr : NULL
+ */
+char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj)
+{
+ int ret;
+ char *buf = NULL;
+ char *tmp;
+
+ if (obj == NULL) {
+ return NULL;
+ }
+
+ if (size <= 0) {
+ size = 128;
+ }
+
+ buf = flb_malloc(size);
+ if (!buf) {
+ flb_errno();
+ return NULL;
+ }
+
+ while (1) {
+ ret = flb_msgpack_to_json(buf, size, obj);
+ if (ret <= 0) {
+ /* buffer is small. retry.*/
+ size += 128;
+ tmp = flb_realloc(buf, size);
+ if (tmp) {
+ buf = tmp;
+ }
+ else {
+ flb_free(buf);
+ flb_errno();
+ return NULL;
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ return buf;
+}
+
+int flb_pack_time_now(msgpack_packer *pck)
+{
+ int ret;
+ struct flb_time t;
+
+ flb_time_get(&t);
+ ret = flb_time_append_to_msgpack(&t, pck, 0);
+
+ return ret;
+}
+
+int flb_msgpack_expand_map(char *map_data, size_t map_size,
+ msgpack_object_kv **kv_arr, int kv_arr_len,
+ char** out_buf, int* out_size)
+{
+ msgpack_sbuffer sbuf;
+ msgpack_packer pck;
+ msgpack_unpacked result;
+ size_t off = 0;
+ char *ret_buf;
+ int map_num;
+ int i;
+ int len;
+
+ if (map_data == NULL){
+ return -1;
+ }
+
+ msgpack_unpacked_init(&result);
+ if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) !=
+ MSGPACK_UNPACK_SUCCESS ) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ if (result.data.type != MSGPACK_OBJECT_MAP) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ len = result.data.via.map.size;
+ map_num = kv_arr_len + len;
+
+ msgpack_sbuffer_init(&sbuf);
+ msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
+ msgpack_pack_map(&pck, map_num);
+
+ for (i=0; i<len; i++) {
+ msgpack_pack_object(&pck, result.data.via.map.ptr[i].key);
+ msgpack_pack_object(&pck, result.data.via.map.ptr[i].val);
+ }
+ for (i=0; i<kv_arr_len; i++){
+ msgpack_pack_object(&pck, kv_arr[i]->key);
+ msgpack_pack_object(&pck, kv_arr[i]->val);
+ }
+ msgpack_unpacked_destroy(&result);
+
+ *out_size = sbuf.size;
+ ret_buf = flb_malloc(sbuf.size);
+ *out_buf = ret_buf;
+ if (*out_buf == NULL) {
+ flb_errno();
+ msgpack_sbuffer_destroy(&sbuf);
+ return -1;
+ }
+ memcpy(*out_buf, sbuf.data, sbuf.size);
+ msgpack_sbuffer_destroy(&sbuf);
+
+ return 0;
+}
+
+int flb_pack_init(struct flb_config *config)
+{
+ int ret;
+
+ if (config == NULL) {
+ return -1;
+ }
+ ret = flb_pack_set_null_as_nan(config->convert_nan_to_null);
+
+ return ret;
+}