summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_ra_key.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_ra_key.c')
-rw-r--r--fluent-bit/src/flb_ra_key.c808
1 files changed, 808 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_ra_key.c b/fluent-bit/src/flb_ra_key.c
new file mode 100644
index 00000000..f167a766
--- /dev/null
+++ b/fluent-bit/src/flb_ra_key.c
@@ -0,0 +1,808 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/record_accessor/flb_ra_parser.h>
+#include <msgpack.h>
+#include <limits.h>
+
+/* Map msgpack object into flb_ra_value representation */
+static int msgpack_object_to_ra_value(msgpack_object o,
+ struct flb_ra_value *result)
+{
+ result->o = o;
+
+ /* Compose result with found value */
+ if (o.type == MSGPACK_OBJECT_BOOLEAN) {
+ result->type = FLB_RA_BOOL;
+ result->val.boolean = o.via.boolean;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
+ o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ result->type = FLB_RA_INT;
+ result->val.i64 = o.via.i64;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_FLOAT32 ||
+ o.type == MSGPACK_OBJECT_FLOAT) {
+ result->type = FLB_RA_FLOAT;
+ result->val.f64 = o.via.f64;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_STR) {
+ result->type = FLB_RA_STRING;
+ result->val.string = flb_sds_create_len((char *) o.via.str.ptr,
+ o.via.str.size);
+
+ /* Handle cases where flb_sds_create_len fails */
+ if (result->val.string == NULL) {
+ return -1;
+ }
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_MAP) {
+ /* return boolean 'true', just denoting the existence of the key */
+ result->type = FLB_RA_BOOL;
+ result->val.boolean = true;
+ return 0;
+ }
+ else if (o.type == MSGPACK_OBJECT_NIL) {
+ result->type = FLB_RA_NULL;
+ return 0;
+ }
+
+ return -1;
+}
+
+/* Return the entry position of key/val in the map */
+static int ra_key_val_id(flb_sds_t ckey, msgpack_object map)
+{
+ int i;
+ int map_size;
+ msgpack_object key;
+
+ if (map.type != MSGPACK_OBJECT_MAP) {
+ return -1;
+ }
+
+ map_size = map.via.map.size;
+ for (i = map_size - 1; i >= 0; i--) {
+ key = map.via.map.ptr[i].key;
+
+ if (key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ /* Compare by length and by key name */
+ if (flb_sds_cmp(ckey, key.via.str.ptr, key.via.str.size) != 0) {
+ continue;
+ }
+
+ return i;
+ }
+
+ return -1;
+}
+
+static int msgpack_object_strcmp(msgpack_object o, char *str, int len)
+{
+ if (o.type != MSGPACK_OBJECT_STR) {
+ return -1;
+ }
+
+ if (o.via.str.size != len) {
+ return -1;
+ }
+
+ return strncmp(o.via.str.ptr, str, len);
+}
+
+/* Lookup perfect match of sub-keys and map content */
+static int subkey_to_object(msgpack_object *map, struct mk_list *subkeys,
+ msgpack_object **out_key, msgpack_object **out_val)
+{
+ int i = 0;
+ int levels;
+ int matched = 0;
+ msgpack_object *found = NULL;
+ msgpack_object *key = NULL;
+ msgpack_object *val = NULL;
+ msgpack_object cur;
+ struct mk_list *head;
+ struct flb_ra_subentry *entry;
+
+ /* Expected number of map levels in the map */
+ levels = mk_list_size(subkeys);
+
+ cur = *map;
+
+ mk_list_foreach(head, subkeys) {
+ /* expected entry */
+ entry = mk_list_entry(head, struct flb_ra_subentry, _head);
+
+ /* Array Handling */
+ if (entry->type == FLB_RA_PARSER_ARRAY_ID) {
+ /* check the current msgpack object is an array */
+ if (cur.type != MSGPACK_OBJECT_ARRAY) {
+ return -1;
+ }
+
+ /* Index limit and ensure no overflow */
+ if (entry->array_id == INT_MAX ||
+ cur.via.array.size < entry->array_id + 1) {
+ return -1;
+ }
+
+ val = &cur.via.array.ptr[entry->array_id];
+ cur = *val;
+ key = NULL; /* fill NULL since the type is array. */
+ goto next;
+ }
+
+ if (cur.type != MSGPACK_OBJECT_MAP) {
+ break;
+ }
+
+ i = ra_key_val_id(entry->str, cur);
+ if (i == -1) {
+ found = NULL;
+ continue;
+ }
+
+ key = &cur.via.map.ptr[i].key;
+ val = &cur.via.map.ptr[i].val;
+
+ /* A bit obvious, but it's better to validate data type */
+ if (key->type != MSGPACK_OBJECT_STR) {
+ found = NULL;
+ continue;
+ }
+
+ found = key;
+ cur = cur.via.map.ptr[i].val;
+
+ next:
+ matched++;
+
+ if (levels == matched) {
+ break;
+ }
+ }
+
+ /* No matches */
+ if (!found || (matched > 0 && levels != matched)) {
+ return -1;
+ }
+
+ *out_key = (msgpack_object *) key;
+ *out_val = (msgpack_object *) val;
+
+ return 0;
+}
+
+struct flb_ra_value *flb_ra_key_to_value(flb_sds_t ckey,
+ msgpack_object map,
+ struct mk_list *subkeys)
+{
+ int i;
+ int ret;
+ msgpack_object val;
+ msgpack_object *out_key;
+ msgpack_object *out_val;
+ struct flb_ra_value *result;
+
+ /* Get the key position in the map */
+ i = ra_key_val_id(ckey, map);
+ if (i == -1) {
+ return NULL;
+ }
+
+ /* Reference entries */
+ val = map.via.map.ptr[i].val;
+
+ /* Create the result context */
+ result = flb_calloc(1, sizeof(struct flb_ra_value));
+ if (!result) {
+ flb_errno();
+ return NULL;
+ }
+ result->o = val;
+
+ if ((val.type == MSGPACK_OBJECT_MAP || val.type == MSGPACK_OBJECT_ARRAY)
+ && subkeys != NULL && mk_list_size(subkeys) > 0) {
+
+ ret = subkey_to_object(&val, subkeys, &out_key, &out_val);
+ if (ret == 0) {
+ ret = msgpack_object_to_ra_value(*out_val, result);
+ if (ret == -1) {
+ flb_free(result);
+ return NULL;
+ }
+ return result;
+ }
+ else {
+ flb_free(result);
+ return NULL;
+ }
+ }
+ else {
+ ret = msgpack_object_to_ra_value(val, result);
+ if (ret == -1) {
+ flb_error("[ra key] cannot process key value");
+ flb_free(result);
+ return NULL;
+ }
+ }
+
+ return result;
+}
+
+int flb_ra_key_value_get(flb_sds_t ckey, msgpack_object map,
+ struct mk_list *subkeys,
+ msgpack_object **start_key,
+ msgpack_object **out_key, msgpack_object **out_val)
+{
+ int i;
+ int ret;
+ msgpack_object val;
+ msgpack_object *o_key;
+ msgpack_object *o_val;
+
+ /* Get the key position in the map */
+ i = ra_key_val_id(ckey, map);
+ if (i == -1) {
+ return -1;
+ }
+
+ /* Reference entries */
+ *start_key = &map.via.map.ptr[i].key;
+ val = map.via.map.ptr[i].val;
+
+ if ((val.type == MSGPACK_OBJECT_MAP || val.type == MSGPACK_OBJECT_ARRAY)
+ && subkeys != NULL && mk_list_size(subkeys) > 0) {
+ ret = subkey_to_object(&val, subkeys, &o_key, &o_val);
+ if (ret == 0) {
+ *out_key = o_key;
+ *out_val = o_val;
+ return 0;
+ }
+ }
+ else {
+ *out_key = &map.via.map.ptr[i].key;
+ *out_val = &map.via.map.ptr[i].val;
+ return 0;
+ }
+
+ return -1;
+}
+
+int flb_ra_key_strcmp(flb_sds_t ckey, msgpack_object map,
+ struct mk_list *subkeys, char *str, int len)
+{
+ int i;
+ int ret;
+ msgpack_object val;
+ msgpack_object *out_key;
+ msgpack_object *out_val;
+
+ /* Get the key position in the map */
+ i = ra_key_val_id(ckey, map);
+ if (i == -1) {
+ return -1;
+ }
+
+ /* Reference map value */
+ val = map.via.map.ptr[i].val;
+
+ if ((val.type == MSGPACK_OBJECT_MAP || val.type == MSGPACK_OBJECT_ARRAY)
+ && subkeys != NULL && mk_list_size(subkeys) > 0) {
+ ret = subkey_to_object(&val, subkeys, &out_key, &out_val);
+ if (ret == 0) {
+ return msgpack_object_strcmp(*out_val, str, len);
+ }
+ else {
+ return -1;
+ }
+ }
+
+ return msgpack_object_strcmp(val, str, len);
+}
+
+int flb_ra_key_regex_match(flb_sds_t ckey, msgpack_object map,
+ struct mk_list *subkeys, struct flb_regex *regex,
+ struct flb_regex_search *result)
+{
+ int i;
+ int ret;
+ msgpack_object val;
+ msgpack_object *out_key;
+ msgpack_object *out_val;
+
+ /* Get the key position in the map */
+ i = ra_key_val_id(ckey, map);
+ if (i == -1) {
+ return -1;
+ }
+
+ /* Reference map value */
+ val = map.via.map.ptr[i].val;
+
+ if ((val.type == MSGPACK_OBJECT_MAP || val.type == MSGPACK_OBJECT_ARRAY)
+ && subkeys != NULL && mk_list_size(subkeys) > 0) {
+ ret = subkey_to_object(&val, subkeys, &out_key, &out_val);
+ if (ret == 0) {
+ if (out_val->type != MSGPACK_OBJECT_STR) {
+ return -1;
+ }
+
+ if (result) {
+ /* Regex + capture mode */
+ return flb_regex_do(regex,
+ (char *) out_val->via.str.ptr,
+ out_val->via.str.size,
+ result);
+ }
+ else {
+ /* No capture */
+ return flb_regex_match(regex,
+ (unsigned char *) out_val->via.str.ptr,
+ out_val->via.str.size);
+ }
+ }
+ return -1;
+ }
+
+ if (val.type != MSGPACK_OBJECT_STR) {
+ return -1;
+ }
+
+ if (result) {
+ /* Regex + capture mode */
+ return flb_regex_do(regex, (char *) val.via.str.ptr, val.via.str.size,
+ result);
+ }
+ else {
+ /* No capture */
+ return flb_regex_match(regex, (unsigned char *) val.via.str.ptr,
+ val.via.str.size);
+ }
+
+ return -1;
+}
+
+static int update_subkey(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_key, msgpack_object *in_val,
+ msgpack_packer *mp_pck);
+
+
+static int update_subkey_array(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_key, msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+ int i;
+ int ret;
+ int size;
+
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+
+ /* check the current msgpack object is an array */
+ if (obj->type != MSGPACK_OBJECT_ARRAY) {
+ flb_error("%s: object is not array", __FUNCTION__);
+ return -1;
+ }
+ size = obj->via.array.size;
+ /* Index limit and ensure no overflow */
+ if (entry->array_id == INT_MAX ||
+ size < entry->array_id + 1) {
+ flb_trace("%s: out of index", __FUNCTION__);
+ return -1;
+ }
+
+ msgpack_pack_array(mp_pck, size);
+ for (i=0; i<size; i++) {
+ if (i != entry->array_id) {
+ msgpack_pack_object(mp_pck, obj->via.array.ptr[i]);
+ continue;
+ }
+ *matched += 1;
+ if (levels == *matched) {
+ flb_trace("%s: update val matched=%d", __FUNCTION__, *matched);
+ /* update value */
+ msgpack_pack_object(mp_pck, *in_val);
+ continue;
+ }
+
+ if (subkeys->next == NULL) {
+ flb_trace("%s: end of subkey", __FUNCTION__);
+ return -1;
+ }
+ ret = update_subkey(&obj->via.array.ptr[i], subkeys->next,
+ levels, matched,
+ in_key, in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int update_subkey_map(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_key, msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+ int i;
+ int ret_id;
+ int size;
+ int ret;
+ msgpack_object_kv kv;
+
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+ /* check the current msgpack object is a map */
+ if (obj->type != MSGPACK_OBJECT_MAP) {
+ flb_trace("%s: object is not map", __FUNCTION__);
+ return -1;
+ }
+ size = obj->via.map.size;
+
+ ret_id = ra_key_val_id(entry->str, *obj);
+ if (ret_id < 0) {
+ flb_trace("%s: not found", __FUNCTION__);
+ return -1;
+ }
+
+ msgpack_pack_map(mp_pck, size);
+ for (i=0; i<size; i++) {
+ if (i != ret_id) {
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].val);
+ continue;
+ }
+ *matched += 1;
+ if (levels == *matched) {
+ flb_trace("%s update key/val matched=%d", __FUNCTION__, *matched);
+ /* update key/value */
+ kv = obj->via.map.ptr[i];
+ if (in_key != NULL) {
+ kv.key = *in_key;
+ }
+ msgpack_pack_object(mp_pck, kv.key);
+ if (in_val != NULL) {
+ kv.val = *in_val;
+ }
+ msgpack_pack_object(mp_pck, kv.val);
+
+ continue;
+ }
+ if (subkeys->next == NULL) {
+ flb_trace("%s: end of subkey", __FUNCTION__);
+ return -1;
+ }
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
+ ret = update_subkey(&(obj->via.map.ptr[i].val), subkeys->next,
+ levels, matched,
+ in_key, in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int update_subkey(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_key, msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+
+ if (entry->type == FLB_RA_PARSER_ARRAY_ID) {
+ return update_subkey_array(obj, subkeys,
+ levels, matched,
+ in_key, in_val, mp_pck);
+ }
+ return update_subkey_map(obj, subkeys, levels, matched, in_key, in_val, mp_pck);
+}
+
+int flb_ra_key_value_update(struct flb_ra_parser *rp, msgpack_object map,
+ msgpack_object *in_key, msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ int kv_id;
+ int i;
+ int map_size;
+ int ret;
+ int levels;
+ int matched = 0;
+
+ /* Get the key position in the map */
+ kv_id = ra_key_val_id(rp->key->name, map);
+ if (kv_id == -1) {
+ return -1;
+ }
+
+ levels = mk_list_size(rp->key->subkeys);
+
+ map_size = map.via.map.size;
+
+ msgpack_pack_map(mp_pck, map_size);
+ if (levels == 0) {
+ /* no subkeys */
+ for (i=0; i<map_size; i++) {
+ if (i != kv_id) {
+ /* pack original key/val */
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
+ continue;
+ }
+
+ /* update key/val */
+ if (in_key != NULL) {
+ msgpack_pack_object(mp_pck, *in_key);
+ }
+ else {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
+ }
+ if (in_val != NULL) {
+ msgpack_pack_object(mp_pck, *in_val);
+ }
+ else {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
+ }
+ }
+ return 0;
+ }
+
+ for (i=0; i<map_size; i++) {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
+ if (i != kv_id) {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
+ continue;
+ }
+ ret = update_subkey(&(map.via.map.ptr[i].val), rp->key->subkeys,
+ levels, &matched,
+ in_key, in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int append_subkey(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_val,
+ msgpack_packer *mp_pck);
+
+
+static int append_subkey_array(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+ int i;
+ int ret;
+ int size;
+
+ /* check the current msgpack object is an array */
+ if (obj->type != MSGPACK_OBJECT_ARRAY) {
+ flb_trace("%s: object is not array", __FUNCTION__);
+ return -1;
+ }
+ size = obj->via.array.size;
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+
+ if (levels == *matched) {
+ /* append val */
+ msgpack_pack_array(mp_pck, size+1);
+ for (i=0; i<size; i++) {
+ msgpack_pack_object(mp_pck, obj->via.array.ptr[i]);
+ }
+ msgpack_pack_object(mp_pck, *in_val);
+
+ *matched = -1;
+ return 0;
+ }
+
+ /* Index limit and ensure no overflow */
+ if (entry->array_id == INT_MAX ||
+ size < entry->array_id + 1) {
+ flb_trace("%s: out of index", __FUNCTION__);
+ return -1;
+ }
+
+ msgpack_pack_array(mp_pck, size);
+ for (i=0; i<size; i++) {
+ if (i != entry->array_id) {
+ msgpack_pack_object(mp_pck, obj->via.array.ptr[i]);
+ continue;
+ }
+ if (*matched >= 0) {
+ *matched += 1;
+ }
+ if (subkeys->next == NULL) {
+ flb_trace("%s: end of subkey", __FUNCTION__);
+ return -1;
+ }
+ ret = append_subkey(&obj->via.array.ptr[i], subkeys->next,
+ levels, matched,
+ in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int append_subkey_map(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+ int i;
+ int ret_id;
+ int size;
+ int ret;
+
+ /* check the current msgpack object is a map */
+ if (obj->type != MSGPACK_OBJECT_MAP) {
+ flb_trace("%s: object is not map", __FUNCTION__);
+ return -1;
+ }
+ size = obj->via.map.size;
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+
+ if (levels == *matched) {
+ /* append val */
+ msgpack_pack_map(mp_pck, size+1);
+ for (i=0; i<size; i++) {
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].val);
+ }
+ msgpack_pack_str(mp_pck, flb_sds_len(entry->str));
+ msgpack_pack_str_body(mp_pck, entry->str, flb_sds_len(entry->str));
+ msgpack_pack_object(mp_pck, *in_val);
+
+ *matched = -1;
+ return 0;
+ }
+
+
+ ret_id = ra_key_val_id(entry->str, *obj);
+ if (ret_id < 0) {
+ flb_trace("%s: not found", __FUNCTION__);
+ return -1;
+ }
+
+ msgpack_pack_map(mp_pck, size);
+ for (i=0; i<size; i++) {
+ if (i != ret_id) {
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].val);
+ continue;
+ }
+
+ if (*matched >= 0) {
+ *matched += 1;
+ }
+ if (subkeys->next == NULL) {
+ flb_trace("%s: end of subkey", __FUNCTION__);
+ return -1;
+ }
+ msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
+ ret = append_subkey(&(obj->via.map.ptr[i].val), subkeys->next,
+ levels, matched,
+ in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int append_subkey(msgpack_object *obj, struct mk_list *subkeys,
+ int levels, int *matched,
+ msgpack_object *in_val,
+ msgpack_packer *mp_pck)
+{
+ struct flb_ra_subentry *entry;
+
+ entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
+
+ if (entry->type == FLB_RA_PARSER_ARRAY_ID) {
+ return append_subkey_array(obj, subkeys,
+ levels, matched,
+ in_val, mp_pck);
+ }
+ return append_subkey_map(obj, subkeys, levels, matched, in_val, mp_pck);
+}
+
+int flb_ra_key_value_append(struct flb_ra_parser *rp, msgpack_object map,
+ msgpack_object *in_val, msgpack_packer *mp_pck)
+{
+ int ref_level;
+ int map_size;
+ int i;
+ int kv_id;
+ int ret;
+ int matched = 0;
+
+ map_size = map.via.map.size;
+
+ /* Decrement since the last key doesn't exist */
+ ref_level = mk_list_size(rp->key->subkeys) - 1;
+ if (ref_level < 0) {
+ /* no subkeys */
+ msgpack_pack_map(mp_pck, map_size+1);
+ for (i=0; i<map_size; i++) {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
+ }
+ msgpack_pack_str(mp_pck, flb_sds_len(rp->key->name));
+ msgpack_pack_str_body(mp_pck, rp->key->name, flb_sds_len(rp->key->name));
+ msgpack_pack_object(mp_pck, *in_val);
+ return 0;
+ }
+
+ /* Get the key position in the map */
+ kv_id = ra_key_val_id(rp->key->name, map);
+ if (kv_id == -1) {
+ return -1;
+ }
+
+ msgpack_pack_map(mp_pck, map_size);
+ for (i=0; i<map_size; i++) {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
+ if (i != kv_id) {
+ msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
+ continue;
+ }
+ ret = append_subkey(&(map.via.map.ptr[i].val), rp->key->subkeys,
+ ref_level, &matched,
+ in_val, mp_pck);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void flb_ra_key_value_destroy(struct flb_ra_value *v)
+{
+ if (v->type == FLB_RA_STRING) {
+ flb_sds_destroy(v->val.string);
+ }
+ flb_free(v);
+}