summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_log_event_encoder_dynamic_field.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_log_event_encoder_dynamic_field.c')
-rw-r--r--fluent-bit/src/flb_log_event_encoder_dynamic_field.c272
1 files changed, 272 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_log_event_encoder_dynamic_field.c b/fluent-bit/src/flb_log_event_encoder_dynamic_field.c
new file mode 100644
index 000000000..fd8be44a7
--- /dev/null
+++ b/fluent-bit/src/flb_log_event_encoder_dynamic_field.c
@@ -0,0 +1,272 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <fluent-bit/flb_log_event_encoder_dynamic_field.h>
+
+struct flb_log_event_encoder_dynamic_field_scope *
+ flb_log_event_encoder_dynamic_field_scope_current(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ if (cfl_list_is_empty(&field->scopes)) {
+ return NULL;
+ }
+
+ return cfl_list_entry_first(
+ &field->scopes,
+ struct flb_log_event_encoder_dynamic_field_scope,
+ _head);
+}
+
+int flb_log_event_encoder_dynamic_field_scope_enter(
+ struct flb_log_event_encoder_dynamic_field *field,
+ int type)
+{
+ int result;
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ if (type != MSGPACK_OBJECT_MAP &&
+ type != MSGPACK_OBJECT_ARRAY) {
+ return FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+
+ result = flb_log_event_encoder_dynamic_field_append(field);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ return result;
+ }
+
+ scope = flb_calloc(1,
+ sizeof(struct flb_log_event_encoder_dynamic_field_scope));
+
+ if (scope == NULL) {
+ return FLB_EVENT_ENCODER_ERROR_ALLOCATION_ERROR;
+ }
+
+ cfl_list_entry_init(&scope->_head);
+
+ scope->type = type;
+ scope->offset = field->buffer.size;
+
+ cfl_list_prepend(&scope->_head, &field->scopes);
+
+ if (type == MSGPACK_OBJECT_MAP) {
+ flb_mp_map_header_init(&scope->header, &field->packer);
+ }
+ else if (type == MSGPACK_OBJECT_ARRAY) {
+ flb_mp_array_header_init(&scope->header, &field->packer);
+ }
+
+ return FLB_EVENT_ENCODER_SUCCESS;
+}
+
+int flb_log_event_encoder_dynamic_field_scope_leave(
+ struct flb_log_event_encoder_dynamic_field *field,
+ struct flb_log_event_encoder_dynamic_field_scope *scope,
+ int commit)
+{
+ if (scope == NULL) {
+ return FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+
+ if (commit) {
+ /* We increment the entry count on each append because
+ * we don't discriminate based on the scope type so
+ * we need to divide the entry count by two for maps
+ * to ensure the entry count matches the kv pair count
+ */
+
+ if (scope->type == MSGPACK_OBJECT_MAP) {
+ scope->header.entries /= 2;
+ flb_mp_map_header_end(&scope->header);
+ }
+ else {
+ flb_mp_array_header_end(&scope->header);
+ }
+ }
+ else {
+ field->buffer.size = scope->offset;
+ }
+
+ cfl_list_del(&scope->_head);
+
+ flb_free(scope);
+
+ return FLB_EVENT_ENCODER_SUCCESS;
+}
+
+int flb_log_event_encoder_dynamic_field_begin_map(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ return flb_log_event_encoder_dynamic_field_scope_enter(field,
+ MSGPACK_OBJECT_MAP);
+}
+
+int flb_log_event_encoder_dynamic_field_begin_array(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ return flb_log_event_encoder_dynamic_field_scope_enter(field,
+ MSGPACK_OBJECT_ARRAY);
+}
+
+int flb_log_event_encoder_dynamic_field_commit_map(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ return flb_log_event_encoder_dynamic_field_scope_leave(field,
+ scope,
+ FLB_TRUE);
+}
+
+int flb_log_event_encoder_dynamic_field_commit_array(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ return flb_log_event_encoder_dynamic_field_scope_leave(field,
+ scope,
+ FLB_TRUE);
+}
+
+int flb_log_event_encoder_dynamic_field_rollback_map(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ return flb_log_event_encoder_dynamic_field_scope_leave(field,
+ scope,
+ FLB_FALSE);
+}
+
+int flb_log_event_encoder_dynamic_field_rollback_array(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ return flb_log_event_encoder_dynamic_field_scope_leave(field,
+ scope,
+ FLB_TRUE);
+}
+
+int flb_log_event_encoder_dynamic_field_append(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ if (scope == NULL) {
+ if (cfl_list_is_empty(&field->scopes)) {
+ return FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ return FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+
+ flb_mp_map_header_append(&scope->header);
+
+ return FLB_EVENT_ENCODER_SUCCESS;
+}
+
+
+static int flb_log_event_encoder_dynamic_field_flush_scopes(
+ struct flb_log_event_encoder_dynamic_field *field,
+ int commit)
+{
+ int result;
+ struct flb_log_event_encoder_dynamic_field_scope *scope;
+
+ result = FLB_EVENT_ENCODER_SUCCESS;
+
+ do {
+ scope = flb_log_event_encoder_dynamic_field_scope_current(field);
+
+ if (scope != NULL) {
+ result = flb_log_event_encoder_dynamic_field_scope_leave(field,
+ scope,
+ commit);
+ }
+ } while (scope != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS);
+
+ return result;
+}
+
+int flb_log_event_encoder_dynamic_field_flush(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ int result;
+
+ result = flb_log_event_encoder_dynamic_field_flush_scopes(field, FLB_TRUE);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ field->data = field->buffer.data;
+ field->size = field->buffer.size;
+ }
+
+ return result;
+}
+
+int flb_log_event_encoder_dynamic_field_reset(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ msgpack_sbuffer_clear(&field->buffer);
+
+ flb_log_event_encoder_dynamic_field_flush_scopes(field, FLB_FALSE);
+
+ field->data = NULL;
+ field->size = 0;
+
+ return FLB_EVENT_ENCODER_SUCCESS;
+}
+
+int flb_log_event_encoder_dynamic_field_init(
+ struct flb_log_event_encoder_dynamic_field *field,
+ int type)
+{
+ msgpack_sbuffer_init(&field->buffer);
+ msgpack_packer_init(&field->packer,
+ &field->buffer,
+ msgpack_sbuffer_write);
+
+ field->initialized = FLB_TRUE;
+ field->type = type;
+
+ cfl_list_init(&field->scopes);
+ flb_log_event_encoder_dynamic_field_reset(field);
+
+ return FLB_EVENT_ENCODER_SUCCESS;
+}
+
+void flb_log_event_encoder_dynamic_field_destroy(
+ struct flb_log_event_encoder_dynamic_field *field)
+{
+ msgpack_sbuffer_destroy(&field->buffer);
+
+ field->initialized = FLB_FALSE;
+}