/*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* Fluent Bit * ========== * Copyright (C) 2015-2022 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include /* cmetrics */ #include #include #include #include #include #include #define try_to_write_str flb_utils_write_str static int convert_nan_to_null = FLB_FALSE; static int flb_pack_set_null_as_nan(int b) { if (b == FLB_TRUE || b == FLB_FALSE) { convert_nan_to_null = b; } return convert_nan_to_null; } int flb_json_tokenise(const char *js, size_t len, struct flb_pack_state *state) { int ret; int new_tokens = 256; size_t old_size; size_t new_size; void *tmp; ret = jsmn_parse(&state->parser, js, len, state->tokens, state->tokens_size); while (ret == JSMN_ERROR_NOMEM) { /* Get current size of the array in bytes */ old_size = state->tokens_size * sizeof(jsmntok_t); /* New size: add capacity for new 256 entries */ new_size = old_size + (sizeof(jsmntok_t) * new_tokens); tmp = flb_realloc(state->tokens, new_size); if (!tmp) { flb_errno(); return -1; } state->tokens = tmp; state->tokens_size += new_tokens; ret = jsmn_parse(&state->parser, js, len, state->tokens, state->tokens_size); } if (ret == JSMN_ERROR_INVAL) { return FLB_ERR_JSON_INVAL; } if (ret == JSMN_ERROR_PART) { /* This is a partial JSON message, just stop */ flb_trace("[json tokenise] incomplete"); return FLB_ERR_JSON_PART; } state->tokens_count += ret; return 0; } static inline int is_float(const char *buf, int len) { const char *end = buf + len; const char *p = buf; while (p <= end) { if (*p == 'e' && p < end && *(p + 1) == '-') { return 1; } else if (*p == '.') { return 1; } p++; } return 0; } /* Sanitize incoming JSON string */ static inline int pack_string_token(struct flb_pack_state *state, const char *str, int len, msgpack_packer *pck) { int s; int out_len; char *tmp; char *out_buf; if (state->buf_size < len + 1) { s = len + 1; tmp = flb_realloc(state->buf_data, s); if (!tmp) { flb_errno(); return -1; } else { state->buf_data = tmp; state->buf_size = s; } } out_buf = state->buf_data; /* Always decode any UTF-8 or special characters */ out_len = flb_unescape_string_utf8(str, len, out_buf); /* Pack decoded text */ msgpack_pack_str(pck, out_len); msgpack_pack_str_body(pck, out_buf, out_len); return out_len; } /* Receive a tokenized JSON message and convert it to MsgPack */ static char *tokens_to_msgpack(struct flb_pack_state *state, const char *js, int *out_size, int *last_byte, int *out_records) { int i; int flen; int arr_size; int records = 0; const char *p; char *buf; const jsmntok_t *t; msgpack_packer pck; msgpack_sbuffer sbuf; jsmntok_t *tokens; tokens = state->tokens; arr_size = state->tokens_count; if (arr_size == 0) { return NULL; } /* initialize buffers */ msgpack_sbuffer_init(&sbuf); msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); for (i = 0; i < arr_size ; i++) { t = &tokens[i]; if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { break; } if (t->parent == -1) { *last_byte = t->end; records++; } flen = (t->end - t->start); switch (t->type) { case JSMN_OBJECT: msgpack_pack_map(&pck, t->size); break; case JSMN_ARRAY: msgpack_pack_array(&pck, t->size); break; case JSMN_STRING: pack_string_token(state, js + t->start, flen, &pck); break; case JSMN_PRIMITIVE: p = js + t->start; if (*p == 'f') { msgpack_pack_false(&pck); } else if (*p == 't') { msgpack_pack_true(&pck); } else if (*p == 'n') { msgpack_pack_nil(&pck); } else { if (is_float(p, flen)) { msgpack_pack_double(&pck, atof(p)); } else { msgpack_pack_int64(&pck, atoll(p)); } } break; case JSMN_UNDEFINED: msgpack_sbuffer_destroy(&sbuf); return NULL; } } *out_size = sbuf.size; *out_records = records; buf = sbuf.data; return buf; } /* * It parse a JSON string and convert it to MessagePack format, this packer is * useful when a complete JSON message exists, otherwise it will fail until * the message is complete. * * This routine do not keep a state in the parser, do not use it for big * JSON messages. */ static int pack_json_to_msgpack(const char *js, size_t len, char **buffer, size_t *size, int *root_type, int *records, size_t *consumed) { int ret = -1; int n_records; int out; int last; char *buf = NULL; struct flb_pack_state state; ret = flb_pack_state_init(&state); if (ret != 0) { return -1; } ret = flb_json_tokenise(js, len, &state); if (ret != 0) { ret = -1; goto flb_pack_json_end; } if (state.tokens_count == 0) { ret = -1; goto flb_pack_json_end; } buf = tokens_to_msgpack(&state, js, &out, &last, &n_records); if (!buf) { ret = -1; goto flb_pack_json_end; } *root_type = state.tokens[0].type; *size = out; *buffer = buf; *records = n_records; if (consumed != NULL) { *consumed = last; } ret = 0; flb_pack_json_end: flb_pack_state_reset(&state); return ret; } /* Pack unlimited serialized JSON messages into msgpack */ int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size, int *root_type, size_t *consumed) { int records; return pack_json_to_msgpack(js, len, buffer, size, root_type, &records, consumed); } /* * Pack unlimited serialized JSON messages into msgpack, finally it writes on * 'out_records' the number of messages. */ int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size, int *root_type, int *out_records, size_t *consumed) { return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records, consumed); } /* Initialize a JSON packer state */ int flb_pack_state_init(struct flb_pack_state *s) { int tokens = 256; size_t size = 256; jsmn_init(&s->parser); size = sizeof(jsmntok_t) * tokens; s->tokens = flb_malloc(size); if (!s->tokens) { flb_errno(); return -1; } s->tokens_size = tokens; s->tokens_count = 0; s->last_byte = 0; s->multiple = FLB_FALSE; s->buf_data = flb_malloc(size); if (!s->buf_data) { flb_errno(); flb_free(s->tokens); s->tokens = NULL; return -1; } s->buf_size = size; s->buf_len = 0; return 0; } void flb_pack_state_reset(struct flb_pack_state *s) { flb_free(s->tokens); s->tokens = NULL; s->tokens_size = 0; s->tokens_count = 0; s->last_byte = 0; s->buf_size = 0; flb_free(s->buf_data); s->buf_data = NULL; } /* * It parse a JSON string and convert it to MessagePack format. The main * difference of this function and the previous flb_pack_json() is that it * keeps a parser and tokens state, allowing to process big messages and * resume the parsing process instead of start from zero. */ int flb_pack_json_state(const char *js, size_t len, char **buffer, int *size, struct flb_pack_state *state) { int ret; int out; int delim = 0; int last = 0; int records; char *buf; jsmntok_t *t; ret = flb_json_tokenise(js, len, state); state->multiple = FLB_TRUE; if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) { /* * If the caller enabled 'multiple' flag, it means that the incoming * JSON message may have multiple messages concatenated and likely * the last one is only incomplete. * * The following routine aims to determinate how many JSON messages * are OK in the array of tokens, if any, process them and adjust * the JSMN context/buffers. */ /* * jsmn_parse updates jsmn_parser members. (state->parser) * A member 'toknext' points next incomplete object token. * We use toknext - 1 as an index of last member of complete JSON. */ int i; int found = 0; if (state->parser.toknext == 0) { return ret; } for (i = (int)state->parser.toknext - 1; i >= 1; i--) { t = &state->tokens[i]; if (t->parent == -1 && (t->end != 0)) { found++; delim = i; break; } } if (found == 0) { return ret; /* FLB_ERR_JSON_PART */ } state->tokens_count += delim; } else if (ret != 0) { return ret; } if (state->tokens_count == 0 || state->tokens == NULL) { state->last_byte = last; return FLB_ERR_JSON_INVAL; } buf = tokens_to_msgpack(state, js, &out, &last, &records); if (!buf) { return -1; } *size = out; *buffer = buf; state->last_byte = last; return 0; } int flb_metadata_pop_from_msgpack(msgpack_object **metadata, msgpack_unpacked *upk, msgpack_object **map) { if (metadata == NULL || upk == NULL) { return -1; } if (upk->data.type != MSGPACK_OBJECT_ARRAY) { return -1; } *metadata = &upk->data.via.array.ptr[0].via.array.ptr[1]; *map = &upk->data.via.array.ptr[1]; return 0; } static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result) { msgpack_object *metadata; msgpack_object root; msgpack_object *obj; struct flb_time tms; msgpack_object o; root = result.data; if (root.type != MSGPACK_OBJECT_ARRAY) { return -1; } o = root.via.array.ptr[0]; if (o.type != MSGPACK_OBJECT_ARRAY) { return -1; } /* decode expected timestamp only (integer, float or ext) */ o = o.via.array.ptr[0]; if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER && o.type != MSGPACK_OBJECT_FLOAT && o.type != MSGPACK_OBJECT_EXT) { return -1; } /* This is a Fluent Bit record, just do the proper unpacking/printing */ flb_time_pop_from_msgpack(&tms, &result, &obj); flb_metadata_pop_from_msgpack(&metadata, &result, &obj); fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt, (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec); msgpack_object_print(stdout, *metadata); fprintf(stdout, ", "); msgpack_object_print(stdout, *obj); fprintf(stdout, "]\n"); return 0; } void flb_pack_print(const char *data, size_t bytes) { int ret; msgpack_unpacked result; size_t off = 0, cnt = 0; msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { /* Check if we are processing an internal Fluent Bit record */ ret = pack_print_fluent_record(cnt, result); if (ret == 0) { continue; } printf("[%zd] ", cnt++); msgpack_object_print(stdout, result.data); printf("\n"); } msgpack_unpacked_destroy(&result); } void flb_pack_print_metrics(const char *data, size_t bytes) { int ret; size_t off = 0; cfl_sds_t text; struct cmt *cmt = NULL; /* get cmetrics context */ ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); if (ret != 0) { flb_error("could not process metrics payload"); return; } /* convert to text representation */ text = cmt_encode_text_create(cmt); /* destroy cmt context */ cmt_destroy(cmt); printf("%s", text); fflush(stdout); cmt_encode_text_destroy(text); } static inline int try_to_write(char *buf, int *off, size_t left, const char *str, size_t str_len) { if (str_len <= 0){ str_len = strlen(str); } if (left <= *off+str_len) { return FLB_FALSE; } memcpy(buf+*off, str, str_len); *off += str_len; return FLB_TRUE; } /* * Check if a key exists in the map using the 'offset' as an index to define * which element needs to start looking from */ static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset) { int i; msgpack_object p; if (key.type != MSGPACK_OBJECT_STR) { return FLB_FALSE; } for (i = offset; i < map.via.map.size; i++) { p = map.via.map.ptr[i].key; if (p.type != MSGPACK_OBJECT_STR) { continue; } if (key.via.str.size != p.via.str.size) { continue; } if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) { return FLB_TRUE; } } return FLB_FALSE; } static int msgpack2json(char *buf, int *off, size_t left, const msgpack_object *o) { int i; int dup; int ret = FLB_FALSE; int loop; int packed; switch(o->type) { case MSGPACK_OBJECT_NIL: ret = try_to_write(buf, off, left, "null", 4); break; case MSGPACK_OBJECT_BOOLEAN: ret = try_to_write(buf, off, left, (o->via.boolean ? "true":"false"),0); break; case MSGPACK_OBJECT_POSITIVE_INTEGER: { char temp[32] = {0}; i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64); ret = try_to_write(buf, off, left, temp, i); } break; case MSGPACK_OBJECT_NEGATIVE_INTEGER: { char temp[32] = {0}; i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64); ret = try_to_write(buf, off, left, temp, i); } break; case MSGPACK_OBJECT_FLOAT32: case MSGPACK_OBJECT_FLOAT64: { char temp[512] = {0}; if (o->via.f64 == (double)(long long int)o->via.f64) { i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64); } else if (convert_nan_to_null && isnan(o->via.f64) ) { i = snprintf(temp, sizeof(temp)-1, "null"); } else { i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64); } ret = try_to_write(buf, off, left, temp, i); } break; case MSGPACK_OBJECT_STR: if (try_to_write(buf, off, left, "\"", 1) && (o->via.str.size > 0 ? try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size) : 1/* nothing to do */) && try_to_write(buf, off, left, "\"", 1)) { ret = FLB_TRUE; } break; case MSGPACK_OBJECT_BIN: if (try_to_write(buf, off, left, "\"", 1) && (o->via.bin.size > 0 ? try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size) : 1 /* nothing to do */) && try_to_write(buf, off, left, "\"", 1)) { ret = FLB_TRUE; } break; case MSGPACK_OBJECT_EXT: if (!try_to_write(buf, off, left, "\"", 1)) { goto msg2json_end; } /* ext body. fortmat is similar to printf(1) */ { char temp[32] = {0}; int len; loop = o->via.ext.size; for(i=0; ivia.ext.ptr[i]); if (!try_to_write(buf, off, left, temp, len)) { goto msg2json_end; } } } if (!try_to_write(buf, off, left, "\"", 1)) { goto msg2json_end; } ret = FLB_TRUE; break; case MSGPACK_OBJECT_ARRAY: loop = o->via.array.size; if (!try_to_write(buf, off, left, "[", 1)) { goto msg2json_end; } if (loop != 0) { msgpack_object* p = o->via.array.ptr; if (!msgpack2json(buf, off, left, p)) { goto msg2json_end; } for (i=1; ivia.map.size; if (!try_to_write(buf, off, left, "{", 1)) { goto msg2json_end; } if (loop != 0) { msgpack_object k; msgpack_object_kv *p = o->via.map.ptr; packed = 0; dup = FLB_FALSE; k = o->via.map.ptr[0].key; for (i = 0; i < loop; i++) { k = o->via.map.ptr[i].key; dup = key_exists_in_map(k, *o, i + 1); if (dup == FLB_TRUE) { continue; } if (packed > 0) { if (!try_to_write(buf, off, left, ",", 1)) { goto msg2json_end; } } if ( !msgpack2json(buf, off, left, &(p+i)->key) || !try_to_write(buf, off, left, ":", 1) || !msgpack2json(buf, off, left, &(p+i)->val) ) { goto msg2json_end; } packed++; } } ret = try_to_write(buf, off, left, "}", 1); break; default: flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type); } msg2json_end: return ret; } /** * convert msgpack to JSON string. * This API is similar to snprintf. * * @param json_str The buffer to fill JSON string. * @param json_size The size of json_str. * @param data The msgpack_unpacked data. * @return success ? a number characters filled : negative value */ int flb_msgpack_to_json(char *json_str, size_t json_size, const msgpack_object *obj) { int ret = -1; int off = 0; if (json_str == NULL || obj == NULL) { return -1; } ret = msgpack2json(json_str, &off, json_size - 1, obj); json_str[off] = '\0'; return ret ? off: ret; } flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size) { int ret; size_t off = 0; size_t out_size; size_t realloc_size; msgpack_unpacked result; msgpack_object *root; flb_sds_t out_buf; flb_sds_t tmp_buf; /* buffer size strategy */ out_size = in_size * FLB_MSGPACK_TO_JSON_INIT_BUFFER_SIZE; realloc_size = in_size * FLB_MSGPACK_TO_JSON_REALLOC_BUFFER_SIZE; if (realloc_size < 256) { realloc_size = 256; } out_buf = flb_sds_create_size(out_size); if (!out_buf) { flb_errno(); return NULL; } msgpack_unpacked_init(&result); ret = msgpack_unpack_next(&result, in_buf, in_size, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { flb_sds_destroy(out_buf); msgpack_unpacked_destroy(&result); return NULL; } root = &result.data; while (1) { ret = flb_msgpack_to_json(out_buf, out_size, root); if (ret <= 0) { tmp_buf = flb_sds_increase(out_buf, realloc_size); if (tmp_buf) { out_buf = tmp_buf; out_size += realloc_size; } else { flb_errno(); flb_sds_destroy(out_buf); msgpack_unpacked_destroy(&result); return NULL; } } else { break; } } msgpack_unpacked_destroy(&result); flb_sds_len_set(out_buf, ret); return out_buf; } /* * Given a 'format' string type, return it integer representation. This * is used by output plugins that uses pack functions to convert * msgpack records to JSON. */ int flb_pack_to_json_format_type(const char *str) { if (strcasecmp(str, "msgpack") == 0) { return FLB_PACK_JSON_FORMAT_NONE; } else if (strcasecmp(str, "json") == 0) { return FLB_PACK_JSON_FORMAT_JSON; } else if (strcasecmp(str, "json_stream") == 0) { return FLB_PACK_JSON_FORMAT_STREAM; } else if (strcasecmp(str, "json_lines") == 0) { return FLB_PACK_JSON_FORMAT_LINES; } return -1; } /* Given a 'date string type', return it integer representation */ int flb_pack_to_json_date_type(const char *str) { if (strcasecmp(str, "double") == 0) { return FLB_PACK_JSON_DATE_DOUBLE; } else if (strcasecmp(str, "java_sql_timestamp") == 0) { return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP; } else if (strcasecmp(str, "iso8601") == 0) { return FLB_PACK_JSON_DATE_ISO8601; } else if (strcasecmp(str, "epoch") == 0) { return FLB_PACK_JSON_DATE_EPOCH; } else if (strcasecmp(str, "epoch_ms") == 0 || strcasecmp(str, "epoch_millis") == 0 || strcasecmp(str, "epoch_milliseconds") == 0) { return FLB_PACK_JSON_DATE_EPOCH_MS; } return -1; } static int msgpack_pack_formatted_datetime(flb_sds_t out_buf, char time_formatted[], int max_len, msgpack_packer* tmp_pck, struct flb_time* tms, const char *date_format, const char *time_format) { int len; size_t s; struct tm tm; gmtime_r(&tms->tm.tv_sec, &tm); s = strftime(time_formatted, max_len, date_format, &tm); if (!s) { flb_debug("strftime failed in flb_pack_msgpack_to_json_format"); return 1; } /* Format the time, use microsecond precision not nanoseconds */ max_len -= s; len = snprintf(&time_formatted[s], max_len, time_format, (uint64_t) tms->tm.tv_nsec / 1000); if (len >= max_len) { flb_debug("snprintf: %d >= %d in flb_pack_msgpack_to_json_format", len, max_len); return 2; } s += len; msgpack_pack_str(tmp_pck, s); msgpack_pack_str_body(tmp_pck, time_formatted, s); return 0; } flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, int json_format, int date_format, flb_sds_t date_key) { int i; int ok = MSGPACK_UNPACK_SUCCESS; int records = 0; int map_size; size_t off = 0; char time_formatted[38]; flb_sds_t out_tmp; flb_sds_t out_js; flb_sds_t out_buf = NULL; msgpack_unpacked result; msgpack_object root; msgpack_object map; msgpack_sbuffer tmp_sbuf; msgpack_packer tmp_pck; msgpack_object *obj; msgpack_object *k; msgpack_object *v; struct flb_time tms; /* For json lines and streams mode we need a pre-allocated buffer */ if (json_format == FLB_PACK_JSON_FORMAT_LINES || json_format == FLB_PACK_JSON_FORMAT_STREAM) { out_buf = flb_sds_create_size(bytes + bytes / 4); if (!out_buf) { flb_errno(); return NULL; } } /* Create temporary msgpack buffer */ msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); /* * If the format is the original msgpack style of one big array, * registrate the array, otherwise is not necessary. FYI, original format: * * [ * [timestamp, map], * [timestamp, map], * [T, M]... * ] */ if (json_format == FLB_PACK_JSON_FORMAT_JSON) { records = flb_mp_count(data, bytes); if (records <= 0) { msgpack_sbuffer_destroy(&tmp_sbuf); return NULL; } msgpack_pack_array(&tmp_pck, records); } msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, data, bytes, &off) == ok) { /* Each array must have two entries: time and record */ root = result.data; if (root.type != MSGPACK_OBJECT_ARRAY) { continue; } if (root.via.array.size != 2) { continue; } /* Unpack time */ flb_time_pop_from_msgpack(&tms, &result, &obj); /* Get the record/map */ map = root.via.array.ptr[1]; if (map.type != MSGPACK_OBJECT_MAP) { continue; } map_size = map.via.map.size; if (date_key != NULL) { msgpack_pack_map(&tmp_pck, map_size + 1); } else { msgpack_pack_map(&tmp_pck, map_size); } if (date_key != NULL) { /* Append date key */ msgpack_pack_str(&tmp_pck, flb_sds_len(date_key)); msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key)); /* Append date value */ switch (date_format) { case FLB_PACK_JSON_DATE_DOUBLE: msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms)); break; case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP: if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); msgpack_unpacked_destroy(&result); return NULL; } break; case FLB_PACK_JSON_DATE_ISO8601: if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); msgpack_unpacked_destroy(&result); return NULL; } break; case FLB_PACK_JSON_DATE_EPOCH: msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec)); break; case FLB_PACK_JSON_DATE_EPOCH_MS: msgpack_pack_uint64(&tmp_pck, flb_time_to_millisec(&tms)); break; } } /* Append remaining keys/values */ for (i = 0; i < map_size; i++) { k = &map.via.map.ptr[i].key; v = &map.via.map.ptr[i].val; msgpack_pack_object(&tmp_pck, *k); msgpack_pack_object(&tmp_pck, *v); } /* * If the format is the original msgpack style, just continue since * we don't care about separator or JSON convertion at this point. */ if (json_format == FLB_PACK_JSON_FORMAT_JSON) { continue; } /* * Here we handle two types of records concatenation: * * FLB_PACK_JSON_FORMAT_LINES: add breakline (\n) after each record * * * {'ts':abc,'k1':1} * {'ts':abc,'k1':2} * {N} * * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g: * * {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N} */ if (json_format == FLB_PACK_JSON_FORMAT_LINES || json_format == FLB_PACK_JSON_FORMAT_STREAM) { /* Encode current record into JSON in a temporary variable */ out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); if (!out_js) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); msgpack_unpacked_destroy(&result); return NULL; } /* * One map record has been converted, now append it to the * outgoing out_buf sds variable. */ out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js)); if (!out_tmp) { flb_sds_destroy(out_js); flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); msgpack_unpacked_destroy(&result); return NULL; } /* Release temporary json sds buffer */ flb_sds_destroy(out_js); /* If a realloc happened, check the returned address */ if (out_tmp != out_buf) { out_buf = out_tmp; } /* Append the breakline only for json lines mode */ if (json_format == FLB_PACK_JSON_FORMAT_LINES) { out_tmp = flb_sds_cat(out_buf, "\n", 1); if (!out_tmp) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); msgpack_unpacked_destroy(&result); return NULL; } if (out_tmp != out_buf) { out_buf = out_tmp; } } msgpack_sbuffer_clear(&tmp_sbuf); } } /* Release the unpacker */ msgpack_unpacked_destroy(&result); /* Format to JSON */ if (json_format == FLB_PACK_JSON_FORMAT_JSON) { out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); msgpack_sbuffer_destroy(&tmp_sbuf); if (!out_buf) { return NULL; } } else { msgpack_sbuffer_destroy(&tmp_sbuf); } if (out_buf && flb_sds_len(out_buf) == 0) { flb_sds_destroy(out_buf); return NULL; } return out_buf; } /** * convert msgpack to JSON string. * This API is similar to snprintf. * @param size Estimated length of json str. * @param data The msgpack_unpacked data. * @return success ? allocated json str ptr : NULL */ char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj) { int ret; char *buf = NULL; char *tmp; if (obj == NULL) { return NULL; } if (size <= 0) { size = 128; } buf = flb_malloc(size); if (!buf) { flb_errno(); return NULL; } while (1) { ret = flb_msgpack_to_json(buf, size, obj); if (ret <= 0) { /* buffer is small. retry.*/ size += 128; tmp = flb_realloc(buf, size); if (tmp) { buf = tmp; } else { flb_free(buf); flb_errno(); return NULL; } } else { break; } } return buf; } int flb_pack_time_now(msgpack_packer *pck) { int ret; struct flb_time t; flb_time_get(&t); ret = flb_time_append_to_msgpack(&t, pck, 0); return ret; } int flb_msgpack_expand_map(char *map_data, size_t map_size, msgpack_object_kv **kv_arr, int kv_arr_len, char** out_buf, int* out_size) { msgpack_sbuffer sbuf; msgpack_packer pck; msgpack_unpacked result; size_t off = 0; char *ret_buf; int map_num; int i; int len; if (map_data == NULL){ return -1; } msgpack_unpacked_init(&result); if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) != MSGPACK_UNPACK_SUCCESS ) { msgpack_unpacked_destroy(&result); return -1; } if (result.data.type != MSGPACK_OBJECT_MAP) { msgpack_unpacked_destroy(&result); return -1; } len = result.data.via.map.size; map_num = kv_arr_len + len; msgpack_sbuffer_init(&sbuf); msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); msgpack_pack_map(&pck, map_num); for (i=0; ikey); msgpack_pack_object(&pck, kv_arr[i]->val); } msgpack_unpacked_destroy(&result); *out_size = sbuf.size; ret_buf = flb_malloc(sbuf.size); *out_buf = ret_buf; if (*out_buf == NULL) { flb_errno(); msgpack_sbuffer_destroy(&sbuf); return -1; } memcpy(*out_buf, sbuf.data, sbuf.size); msgpack_sbuffer_destroy(&sbuf); return 0; } int flb_pack_init(struct flb_config *config) { int ret; if (config == NULL) { return -1; } ret = flb_pack_set_null_as_nan(config->convert_nan_to_null); return ret; }