summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/stream_processor/flb_sp_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/stream_processor/flb_sp_stream.c')
-rw-r--r--fluent-bit/src/stream_processor/flb_sp_stream.c168
1 files changed, 168 insertions, 0 deletions
diff --git a/fluent-bit/src/stream_processor/flb_sp_stream.c b/fluent-bit/src/stream_processor/flb_sp_stream.c
new file mode 100644
index 000000000..b4f8a37a2
--- /dev/null
+++ b/fluent-bit/src/stream_processor/flb_sp_stream.c
@@ -0,0 +1,168 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_stream.h>
+
+/* Function defined in plugins/in_stream_processor/sp.c */
+int in_stream_processor_add_chunk(const char *buf_data, size_t buf_size,
+ struct flb_input_instance *in);
+
+int flb_sp_stream_create(const char *name, struct flb_sp_task *task,
+ struct flb_sp *sp)
+{
+ int ret;
+ const char *tmp;
+ struct flb_input_instance *in;
+ struct flb_sp_stream *stream;
+
+ /* The name must be different than an input plugin instance name or alias */
+ ret = flb_input_name_exists(name, sp->config);
+ if (ret == FLB_TRUE) {
+ flb_error("[sp] stream name '%s' already exists", name);
+ return -1;
+ }
+
+ /* Create stream context for 'stream processor' */
+ stream = flb_calloc(1, sizeof(struct flb_sp_stream));
+ if (!stream) {
+ flb_errno();
+ return -1;
+ }
+ stream->name = flb_sds_create(name);
+ if (!stream->name) {
+ flb_free(stream);
+ return -1;
+ }
+
+ /*
+ * Register an input plugin instance using 'in_stream_processor', that one
+ * is used as the parent plugin to ingest data back into Fluent Bit
+ * data pipeline.
+ */
+ in = flb_input_new(sp->config, "stream_processor", NULL, FLB_FALSE);
+ if (!in) {
+ flb_error("[sp] cannot create instance of in_stream_processor");
+ flb_free(stream);
+ return -1;
+ }
+
+ /* Set an alias, otherwise the stream will be called stream_processor.N */
+ ret = flb_input_set_property(in, "alias", name);
+ if (ret == -1) {
+ flb_warn("[sp] cannot set stream name, using fallback name %s",
+ in->name);
+ }
+
+ /*
+ * Lookup for Stream properties: at this point we only care about a
+ * possible Tag defined in the query, e.g:
+ *
+ * CREATE STREAM data WITH(tag='mydata') as SELECT * FROM STREAM:apache;
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "tag");
+ if (tmp) {
+ /*
+ * Duplicate value in a new variable since input instance property
+ * will be released upon plugin exit.
+ */
+ stream->tag = flb_sds_create(tmp);
+ if (!stream->tag) {
+ flb_error("[sp] cannot set Tag property");
+ flb_sp_stream_destroy(stream, sp);
+ return -1;
+ }
+
+ /* Tag property is just an assignation, cannot fail */
+ flb_input_set_property(in, "tag", stream->tag);
+ }
+
+ /*
+ * Check if the new stream is 'routable' or not
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "routable");
+ if (tmp) {
+ stream->routable = flb_utils_bool(tmp);
+ flb_input_set_property(in, "routable", tmp);
+ }
+
+ /*
+ * Set storage type
+ */
+ tmp = flb_sp_cmd_stream_prop_get(task->cmd, "storage.type");
+ if (tmp) {
+ flb_input_set_property(in, "storage.type", tmp);
+ }
+
+ /* Initialize instance */
+ ret = flb_input_instance_init(in, sp->config);
+ if (ret == -1) {
+ flb_error("[sp] cannot initialize instance of in_stream_processor");
+ flb_input_instance_exit(in, sp->config);
+ flb_input_instance_destroy(in);
+ }
+ stream->in = in;
+
+ /* Initialize plugin collector (event callback) */
+ flb_input_collector_start(0, in);
+
+#ifdef FLB_HAVE_METRICS
+ /* Override Metrics title */
+ ret = flb_metrics_title(name, in->metrics);
+ if (ret == -1) {
+ flb_warn("[sp] cannot set metrics title, using fallback name %s",
+ in->name);
+ }
+#endif
+
+ /* Storage context */
+ ret = flb_storage_input_create(sp->config->cio, in);
+ if (ret == -1) {
+ flb_error("[sp] cannot initialize storage for stream '%s'",
+ name);
+ flb_sp_stream_destroy(stream, sp);
+ return -1;
+ }
+
+ task->stream = stream;
+ return 0;
+}
+
+int flb_sp_stream_append_data(const char *buf_data, size_t buf_size,
+ struct flb_sp_stream *stream)
+{
+ return in_stream_processor_add_chunk(buf_data, buf_size, stream->in);
+}
+
+void flb_sp_stream_destroy(struct flb_sp_stream *stream, struct flb_sp *sp)
+{
+ flb_sds_destroy(stream->name);
+ flb_sds_destroy(stream->tag);
+ flb_input_instance_exit(stream->in, sp->config);
+ flb_input_instance_destroy(stream->in);
+ flb_free(stream);
+}