From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- src/fluent-bit/examples/kafka_filter/.env | 4 ++ src/fluent-bit/examples/kafka_filter/.gitignore | 2 + src/fluent-bit/examples/kafka_filter/Dockerfile | 44 ++++++++++++++ src/fluent-bit/examples/kafka_filter/Makefile | 11 ++++ .../examples/kafka_filter/docker-compose.yml | 68 ++++++++++++++++++++++ src/fluent-bit/examples/kafka_filter/kafka.conf | 22 +++++++ src/fluent-bit/examples/kafka_filter/kafka.lua | 8 +++ .../examples/kafka_filter/scripts/common.sh | 17 ++++++ .../examples/kafka_filter/scripts/create-topics.sh | 9 +++ .../examples/kafka_filter/scripts/flb-start.sh | 8 +++ .../examples/kafka_filter/scripts/kafka-consume.sh | 9 +++ .../examples/kafka_filter/scripts/kafka-produce.sh | 13 +++++ 12 files changed, 215 insertions(+) create mode 100644 src/fluent-bit/examples/kafka_filter/.env create mode 100644 src/fluent-bit/examples/kafka_filter/.gitignore create mode 100644 src/fluent-bit/examples/kafka_filter/Dockerfile create mode 100644 src/fluent-bit/examples/kafka_filter/Makefile create mode 100644 src/fluent-bit/examples/kafka_filter/docker-compose.yml create mode 100644 src/fluent-bit/examples/kafka_filter/kafka.conf create mode 100644 src/fluent-bit/examples/kafka_filter/kafka.lua create mode 100644 src/fluent-bit/examples/kafka_filter/scripts/common.sh create mode 100755 src/fluent-bit/examples/kafka_filter/scripts/create-topics.sh create mode 100755 src/fluent-bit/examples/kafka_filter/scripts/flb-start.sh create mode 100755 src/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh create mode 100755 src/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh (limited to 'src/fluent-bit/examples/kafka_filter') diff --git a/src/fluent-bit/examples/kafka_filter/.env b/src/fluent-bit/examples/kafka_filter/.env new file mode 100644 index 000000000..61c27af84 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/.env @@ -0,0 +1,4 @@ +KAFKA_HOST=kafka-broker +KAFKA_PORT=9092 +ZOOKEEPER_HOST=zookeeper +ZOOKEEPER_PORT=2181 diff --git a/src/fluent-bit/examples/kafka_filter/.gitignore b/src/fluent-bit/examples/kafka_filter/.gitignore new file mode 100644 index 000000000..f8edd2f2e --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/.gitignore @@ -0,0 +1,2 @@ +build/ +kafka/ diff --git a/src/fluent-bit/examples/kafka_filter/Dockerfile b/src/fluent-bit/examples/kafka_filter/Dockerfile new file mode 100644 index 000000000..5a18e63e6 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/Dockerfile @@ -0,0 +1,44 @@ +FROM debian:bullseye-slim as builder +ENV DEBIAN_FRONTEND noninteractive +ENV KAFKA_URL https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz +ENV KAFKA_SHA256 a76f17a52b8f2cd31de11571ff366a714820b6e4e02893b0159d09db0edaf998 + +# hadolint ignore=DL3008 +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + ca-certificates \ + cmake \ + pkg-config \ + libsasl2-dev \ + libssl-dev \ + flex \ + openjdk-11-jre-headless \ + bison \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Download kafka to access "kafka-topics" script +WORKDIR /kafka +RUN bash -c 'curl -L $KAFKA_URL | tee kafka.tgz | sha256sum -c <(echo "$KAFKA_SHA256 -")' \ + && tar --strip-components=1 -xf kafka.tgz \ + && mv /kafka/bin/kafka-topics.sh /kafka/bin/kafka-topics +ENV PATH="${PATH}:/kafka/bin" + +WORKDIR /build/ +COPY . /source +RUN cmake -DFLB_DEV=On \ + -DFLB_IN_KAFKA=On \ + -DFLB_OUT_KAFKA=On \ + -DFLB_CONFIG_YAML=Off \ + /source && \ + cmake --build . --parallel + +FROM builder as runner +COPY --from=builder /build/bin/fluent-bit /usr/local/bin/fluent-bit +COPY examples/kafka_filter/kafka.conf /etc/kafka.conf +COPY examples/kafka_filter/kafka.lua /etc/kafka.lua +CMD ["/usr/local/bin/fluent-bit", "-c", "/etc/kafka.conf"] diff --git a/src/fluent-bit/examples/kafka_filter/Makefile b/src/fluent-bit/examples/kafka_filter/Makefile new file mode 100644 index 000000000..75b4b3a8c --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/Makefile @@ -0,0 +1,11 @@ +build: + docker compose build + +start: + docker compose up -d + docker compose logs -f fluent-bit kafka-consumer + +stop: + docker compose down + +.PHONY: build start stop diff --git a/src/fluent-bit/examples/kafka_filter/docker-compose.yml b/src/fluent-bit/examples/kafka_filter/docker-compose.yml new file mode 100644 index 000000000..dc758c9ec --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/docker-compose.yml @@ -0,0 +1,68 @@ +version: '3.4' + +x-service-common-fields: &service-common-fields + volumes: + - ./scripts:/scripts + env_file: + - ./.env + + +services: + fluent-bit: + <<: *service-common-fields + build: + context: ../.. + target: runner + dockerfile: examples/kafka_filter/Dockerfile + command: /scripts/flb-start.sh + depends_on: + - kafka-consumer + + + kafka-create-topics: + <<: *service-common-fields + image: confluentinc/cp-server:7.0.1 + command: /scripts/create-topics.sh + depends_on: + - kafka-broker + + + kafka-producer: + <<: *service-common-fields + image: confluentinc/cp-server:7.0.1 + command: /scripts/kafka-produce.sh + depends_on: + - fluent-bit + + + kafka-consumer: + <<: *service-common-fields + image: confluentinc/cp-server:7.0.1 + command: /scripts/kafka-consume.sh + depends_on: + - kafka-create-topics + + + kafka-broker: + image: confluentinc/cp-server:7.0.1 + hostname: broker + container_name: kafka-broker + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: "${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT}" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_HOST}:${KAFKA_PORT}" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + + + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + hostname: zookeeper + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: "${ZOOKEEPER_PORT}" + ZOOKEEPER_TICK_TIME: 2000 diff --git a/src/fluent-bit/examples/kafka_filter/kafka.conf b/src/fluent-bit/examples/kafka_filter/kafka.conf new file mode 100644 index 000000000..2c3daf782 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/kafka.conf @@ -0,0 +1,22 @@ +[SERVICE] + Flush 0.1 + Grace 2 + Log_Level info + +[INPUT] + Name kafka + brokers kafka-broker:9092 + topics fb-source + poll_ms 100 + format json + +[FILTER] + Name lua + Match * + script kafka.lua + call modify_kafka_message + +[OUTPUT] + Name kafka + brokers kafka-broker:9092 + topics fb-sink diff --git a/src/fluent-bit/examples/kafka_filter/kafka.lua b/src/fluent-bit/examples/kafka_filter/kafka.lua new file mode 100644 index 000000000..3c6d9dc17 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/kafka.lua @@ -0,0 +1,8 @@ +local count = 0 +function modify_kafka_message(tag, timestamp, record) + count = count + 1 + local payload = record.payload + payload.topic = record.topic + payload.status = 'processed by fluent-bit, total records: '..tostring(count) + return 1, timestamp, payload +end diff --git a/src/fluent-bit/examples/kafka_filter/scripts/common.sh b/src/fluent-bit/examples/kafka_filter/scripts/common.sh new file mode 100644 index 000000000..d9a2bc6f4 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/scripts/common.sh @@ -0,0 +1,17 @@ +#!/bin/bash -ue + +wait_kafka() { + while ! nc -z "$KAFKA_HOST" "$KAFKA_PORT"; do + sleep 0.1 + done +} + +wait_topic() { + wait_kafka + local topic=$1 + [ -z "$topic" ] && return 1 + while true; do + kafka-topics --list --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT" | grep -q "^$topic$" && break + sleep 0.1 + done +} diff --git a/src/fluent-bit/examples/kafka_filter/scripts/create-topics.sh b/src/fluent-bit/examples/kafka_filter/scripts/create-topics.sh new file mode 100755 index 000000000..a7adbddf1 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/scripts/create-topics.sh @@ -0,0 +1,9 @@ +#!/bin/bash -ue + +# shellcheck disable=SC1091 +. /scripts/common.sh + +wait_kafka + +kafka-topics --create --partitions 1 --replication-factor 1 --topic fb-source --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT" +kafka-topics --create --partitions 1 --replication-factor 1 --topic fb-sink --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT" diff --git a/src/fluent-bit/examples/kafka_filter/scripts/flb-start.sh b/src/fluent-bit/examples/kafka_filter/scripts/flb-start.sh new file mode 100755 index 000000000..56e5ee0d6 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/scripts/flb-start.sh @@ -0,0 +1,8 @@ +#!/bin/bash -ue + +# shellcheck disable=SC1091 +. /scripts/common.sh + +wait_topic fb-sink + +exec /usr/local/bin/fluent-bit -c /etc/kafka.conf diff --git a/src/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh b/src/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh new file mode 100755 index 000000000..729c4876d --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/scripts/kafka-consume.sh @@ -0,0 +1,9 @@ +#!/bin/bash -ue + +# shellcheck disable=SC1091 +. /scripts/common.sh + +wait_topic fb-sink + +kafka-console-consumer --topic fb-sink --bootstrap-server \ + "$KAFKA_HOST:$KAFKA_PORT" diff --git a/src/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh b/src/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh new file mode 100755 index 000000000..eea99ca27 --- /dev/null +++ b/src/fluent-bit/examples/kafka_filter/scripts/kafka-produce.sh @@ -0,0 +1,13 @@ +#!/bin/bash -ue + +# shellcheck disable=SC1091 +. /scripts/common.sh + +wait_topic fb-sink + +for i in $(seq 1 100); do + sleep 1 + echo "{ \"name\": \"object-$i\" }" | \ + kafka-console-producer --topic fb-source \ + --broker-list "$KAFKA_HOST:$KAFKA_PORT" +done -- cgit v1.2.3