summaryrefslogtreecommitdiffstats
path: root/fluent-bit/examples/kafka_filter
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/examples/kafka_filter')
-rw-r--r--fluent-bit/examples/kafka_filter/.env4
-rw-r--r--fluent-bit/examples/kafka_filter/.gitignore2
-rw-r--r--fluent-bit/examples/kafka_filter/Dockerfile44
-rw-r--r--fluent-bit/examples/kafka_filter/Makefile11
-rw-r--r--fluent-bit/examples/kafka_filter/docker-compose.yml68
-rw-r--r--fluent-bit/examples/kafka_filter/kafka.conf22
-rw-r--r--fluent-bit/examples/kafka_filter/kafka.lua8
-rw-r--r--fluent-bit/examples/kafka_filter/scripts/common.sh17
-rwxr-xr-xfluent-bit/examples/kafka_filter/scripts/create-topics.sh9
-rwxr-xr-xfluent-bit/examples/kafka_filter/scripts/flb-start.sh8
-rwxr-xr-xfluent-bit/examples/kafka_filter/scripts/kafka-consume.sh9
-rwxr-xr-xfluent-bit/examples/kafka_filter/scripts/kafka-produce.sh13
12 files changed, 215 insertions, 0 deletions
diff --git a/fluent-bit/examples/kafka_filter/.env b/fluent-bit/examples/kafka_filter/.env
new file mode 100644
index 000000000..61c27af84
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/.env
@@ -0,0 +1,4 @@
+KAFKA_HOST=kafka-broker
+KAFKA_PORT=9092
+ZOOKEEPER_HOST=zookeeper
+ZOOKEEPER_PORT=2181
diff --git a/fluent-bit/examples/kafka_filter/.gitignore b/fluent-bit/examples/kafka_filter/.gitignore
new file mode 100644
index 000000000..f8edd2f2e
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/.gitignore
@@ -0,0 +1,2 @@
+build/
+kafka/
diff --git a/fluent-bit/examples/kafka_filter/Dockerfile b/fluent-bit/examples/kafka_filter/Dockerfile
new file mode 100644
index 000000000..5a18e63e6
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/Dockerfile
@@ -0,0 +1,44 @@
+FROM debian:bullseye-slim as builder
+ENV DEBIAN_FRONTEND noninteractive
+ENV KAFKA_URL https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
+ENV KAFKA_SHA256 a76f17a52b8f2cd31de11571ff366a714820b6e4e02893b0159d09db0edaf998
+
+# hadolint ignore=DL3008
+RUN apt-get update && \
+ apt-get upgrade -y && \
+ apt-get install -y --no-install-recommends \
+ build-essential \
+ curl \
+ ca-certificates \
+ cmake \
+ pkg-config \
+ libsasl2-dev \
+ libssl-dev \
+ flex \
+ openjdk-11-jre-headless \
+ bison \
+ netcat-openbsd \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/*
+
+# Download kafka to access "kafka-topics" script
+WORKDIR /kafka
+RUN bash -c 'curl -L $KAFKA_URL | tee kafka.tgz | sha256sum -c <(echo "$KAFKA_SHA256 -")' \
+ && tar --strip-components=1 -xf kafka.tgz \
+ && mv /kafka/bin/kafka-topics.sh /kafka/bin/kafka-topics
+ENV PATH="${PATH}:/kafka/bin"
+
+WORKDIR /build/
+COPY . /source
+RUN cmake -DFLB_DEV=On \
+ -DFLB_IN_KAFKA=On \
+ -DFLB_OUT_KAFKA=On \
+ -DFLB_CONFIG_YAML=Off \
+ /source && \
+ cmake --build . --parallel
+
+FROM builder as runner
+COPY --from=builder /build/bin/fluent-bit /usr/local/bin/fluent-bit
+COPY examples/kafka_filter/kafka.conf /etc/kafka.conf
+COPY examples/kafka_filter/kafka.lua /etc/kafka.lua
+CMD ["/usr/local/bin/fluent-bit", "-c", "/etc/kafka.conf"]
diff --git a/fluent-bit/examples/kafka_filter/Makefile b/fluent-bit/examples/kafka_filter/Makefile
new file mode 100644
index 000000000..75b4b3a8c
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/Makefile
@@ -0,0 +1,11 @@
+build:
+ docker compose build
+
+start:
+ docker compose up -d
+ docker compose logs -f fluent-bit kafka-consumer
+
+stop:
+ docker compose down
+
+.PHONY: build start stop
diff --git a/fluent-bit/examples/kafka_filter/docker-compose.yml b/fluent-bit/examples/kafka_filter/docker-compose.yml
new file mode 100644
index 000000000..dc758c9ec
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/docker-compose.yml
@@ -0,0 +1,68 @@
+version: '3.4'
+
+x-service-common-fields: &service-common-fields
+ volumes:
+ - ./scripts:/scripts
+ env_file:
+ - ./.env
+
+
+services:
+ fluent-bit:
+ <<: *service-common-fields
+ build:
+ context: ../..
+ target: runner
+ dockerfile: examples/kafka_filter/Dockerfile
+ command: /scripts/flb-start.sh
+ depends_on:
+ - kafka-consumer
+
+
+ kafka-create-topics:
+ <<: *service-common-fields
+ image: confluentinc/cp-server:7.0.1
+ command: /scripts/create-topics.sh
+ depends_on:
+ - kafka-broker
+
+
+ kafka-producer:
+ <<: *service-common-fields
+ image: confluentinc/cp-server:7.0.1
+ command: /scripts/kafka-produce.sh
+ depends_on:
+ - fluent-bit
+
+
+ kafka-consumer:
+ <<: *service-common-fields
+ image: confluentinc/cp-server:7.0.1
+ command: /scripts/kafka-consume.sh
+ depends_on:
+ - kafka-create-topics
+
+
+ kafka-broker:
+ image: confluentinc/cp-server:7.0.1
+ hostname: broker
+ container_name: kafka-broker
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: "${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT}"
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
+ KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_HOST}:${KAFKA_PORT}"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
+
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.0.1
+ hostname: zookeeper
+ container_name: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: "${ZOOKEEPER_PORT}"
+ ZOOKEEPER_TICK_TIME: 2000
diff --git a/fluent-bit/examples/kafka_filter/kafka.conf b/fluent-bit/examples/kafka_filter/kafka.conf
new file mode 100644
index 000000000..2c3daf782
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/kafka.conf
@@ -0,0 +1,22 @@
+[SERVICE]
+ Flush 0.1
+ Grace 2
+ Log_Level info
+
+[INPUT]
+ Name kafka
+ brokers kafka-broker:9092
+ topics fb-source
+ poll_ms 100
+ format json
+
+[FILTER]
+ Name lua
+ Match *
+ script kafka.lua
+ call modify_kafka_message
+
+[OUTPUT]
+ Name kafka
+ brokers kafka-broker:9092
+ topics fb-sink
diff --git a/fluent-bit/examples/kafka_filter/kafka.lua b/fluent-bit/examples/kafka_filter/kafka.lua
new file mode 100644
index 000000000..3c6d9dc17
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/kafka.lua
@@ -0,0 +1,8 @@
+local count = 0
+function modify_kafka_message(tag, timestamp, record)
+ count = count + 1
+ local payload = record.payload
+ payload.topic = record.topic
+ payload.status = 'processed by fluent-bit, total records: '..tostring(count)
+ return 1, timestamp, payload
+end
diff --git a/fluent-bit/examples/kafka_filter/scripts/common.sh b/fluent-bit/examples/kafka_filter/scripts/common.sh
new file mode 100644
index 000000000..d9a2bc6f4
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/scripts/common.sh
@@ -0,0 +1,17 @@
+#!/bin/bash -ue
+
+wait_kafka() {
+ while ! nc -z "$KAFKA_HOST" "$KAFKA_PORT"; do
+ sleep 0.1
+ done
+}
+
+wait_topic() {
+ wait_kafka
+ local topic=$1
+ [ -z "$topic" ] && return 1
+ while true; do
+ kafka-topics --list --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT" | grep -q "^$topic$" && break
+ sleep 0.1
+ done
+}
diff --git a/fluent-bit/examples/kafka_filter/scripts/create-topics.sh b/fluent-bit/examples/kafka_filter/scripts/create-topics.sh
new file mode 100755
index 000000000..a7adbddf1
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/scripts/create-topics.sh
@@ -0,0 +1,9 @@
+#!/bin/bash -ue
+
+# shellcheck disable=SC1091
+. /scripts/common.sh
+
+wait_kafka
+
+kafka-topics --create --partitions 1 --replication-factor 1 --topic fb-source --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
+kafka-topics --create --partitions 1 --replication-factor 1 --topic fb-sink --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
diff --git a/fluent-bit/examples/kafka_filter/scripts/flb-start.sh b/fluent-bit/examples/kafka_filter/scripts/flb-start.sh
new file mode 100755
index 000000000..56e5ee0d6
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/scripts/flb-start.sh
@@ -0,0 +1,8 @@
+#!/bin/bash -ue
+
+# shellcheck disable=SC1091
+. /scripts/common.sh
+
+wait_topic fb-sink
+
+exec /usr/local/bin/fluent-bit -c /etc/kafka.conf
diff --git a/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh b/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh
new file mode 100755
index 000000000..729c4876d
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh
@@ -0,0 +1,9 @@
+#!/bin/bash -ue
+
+# shellcheck disable=SC1091
+. /scripts/common.sh
+
+wait_topic fb-sink
+
+kafka-console-consumer --topic fb-sink --bootstrap-server \
+ "$KAFKA_HOST:$KAFKA_PORT"
diff --git a/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh b/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh
new file mode 100755
index 000000000..eea99ca27
--- /dev/null
+++ b/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh
@@ -0,0 +1,13 @@
+#!/bin/bash -ue
+
+# shellcheck disable=SC1091
+. /scripts/common.sh
+
+wait_topic fb-sink
+
+for i in $(seq 1 100); do
+ sleep 1
+ echo "{ \"name\": \"object-$i\" }" | \
+ kafka-console-producer --topic fb-source \
+ --broker-list "$KAFKA_HOST:$KAFKA_PORT"
+done