summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h655
1 files changed, 655 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h
new file mode 100644
index 000000000..396765857
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_proto.h
@@ -0,0 +1,655 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_PROTO_H_
+#define _RDKAFKA_PROTO_H_
+
+
+#include "rdendian.h"
+#include "rdvarint.h"
+
+/* Protocol defines */
+#include "rdkafka_protocol.h"
+
+
+
+/** Default generic retry count for failed requests.
+ * This may be overriden for specific request types. */
+#define RD_KAFKA_REQUEST_DEFAULT_RETRIES 2
+
+/** Max (practically infinite) retry count */
+#define RD_KAFKA_REQUEST_MAX_RETRIES INT_MAX
+
+/** Do not retry request */
+#define RD_KAFKA_REQUEST_NO_RETRIES 0
+
+
+/**
+ * Request types
+ */
+struct rd_kafkap_reqhdr {
+ int32_t Size;
+ int16_t ApiKey;
+ int16_t ApiVersion;
+ int32_t CorrId;
+ /* ClientId follows */
+};
+
+#define RD_KAFKAP_REQHDR_SIZE (4 + 2 + 2 + 4)
+#define RD_KAFKAP_RESHDR_SIZE (4 + 4)
+
+/**
+ * Response header
+ */
+struct rd_kafkap_reshdr {
+ int32_t Size;
+ int32_t CorrId;
+};
+
+
+/**
+ * Request type v1 (flexible version)
+ *
+ * i32 Size
+ * i16 ApiKey
+ * i16 ApiVersion
+ * i32 CorrId
+ * string ClientId (2-byte encoding, not compact string)
+ * uvarint Tags
+ * <Request payload>
+ * uvarint EndTags
+ *
+ * Any struct-type (non-primitive or array type) field in the request payload
+ * must also have a trailing tags list, this goes for structs in arrays as well.
+ */
+
+/**
+ * @brief Protocol request type (ApiKey) to name/string.
+ *
+ * Generate updates to this list with generate_proto.sh.
+ */
+static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) {
+ static const char *names[] = {
+ [RD_KAFKAP_Produce] = "Produce",
+ [RD_KAFKAP_Fetch] = "Fetch",
+ [RD_KAFKAP_ListOffsets] = "ListOffsets",
+ [RD_KAFKAP_Metadata] = "Metadata",
+ [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
+ [RD_KAFKAP_StopReplica] = "StopReplica",
+ [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata",
+ [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown",
+ [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
+ [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
+ [RD_KAFKAP_FindCoordinator] = "FindCoordinator",
+ [RD_KAFKAP_JoinGroup] = "JoinGroup",
+ [RD_KAFKAP_Heartbeat] = "Heartbeat",
+ [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
+ [RD_KAFKAP_SyncGroup] = "SyncGroup",
+ [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
+ [RD_KAFKAP_ListGroups] = "ListGroups",
+ [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
+ [RD_KAFKAP_ApiVersion] = "ApiVersion",
+ [RD_KAFKAP_CreateTopics] = "CreateTopics",
+ [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
+ [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
+ [RD_KAFKAP_InitProducerId] = "InitProducerId",
+ [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
+ [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
+ [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
+ [RD_KAFKAP_EndTxn] = "EndTxn",
+ [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
+ [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
+ [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
+ [RD_KAFKAP_CreateAcls] = "CreateAcls",
+ [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
+ [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
+ [RD_KAFKAP_AlterConfigs] = "AlterConfigs",
+ [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs",
+ [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs",
+ [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate",
+ [RD_KAFKAP_CreatePartitions] = "CreatePartitions",
+ [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken",
+ [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken",
+ [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken",
+ [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken",
+ [RD_KAFKAP_DeleteGroups] = "DeleteGroups",
+ [RD_KAFKAP_ElectLeaders] = "ElectLeadersRequest",
+ [RD_KAFKAP_IncrementalAlterConfigs] =
+ "IncrementalAlterConfigsRequest",
+ [RD_KAFKAP_AlterPartitionReassignments] =
+ "AlterPartitionReassignmentsRequest",
+ [RD_KAFKAP_ListPartitionReassignments] =
+ "ListPartitionReassignmentsRequest",
+ [RD_KAFKAP_OffsetDelete] = "OffsetDeleteRequest",
+ [RD_KAFKAP_DescribeClientQuotas] = "DescribeClientQuotasRequest",
+ [RD_KAFKAP_AlterClientQuotas] = "AlterClientQuotasRequest",
+ [RD_KAFKAP_DescribeUserScramCredentials] =
+ "DescribeUserScramCredentialsRequest",
+ [RD_KAFKAP_AlterUserScramCredentials] =
+ "AlterUserScramCredentialsRequest",
+ [RD_KAFKAP_Vote] = "VoteRequest",
+ [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest",
+ [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest",
+ [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest",
+ [RD_KAFKAP_AlterIsr] = "AlterIsrRequest",
+ [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest",
+ [RD_KAFKAP_Envelope] = "EnvelopeRequest",
+ [RD_KAFKAP_FetchSnapshot] = "FetchSnapshot",
+ [RD_KAFKAP_DescribeCluster] = "DescribeCluster",
+ [RD_KAFKAP_DescribeProducers] = "DescribeProducers",
+ [RD_KAFKAP_BrokerHeartbeat] = "BrokerHeartbeat",
+ [RD_KAFKAP_UnregisterBroker] = "UnregisterBroker",
+ [RD_KAFKAP_DescribeTransactions] = "DescribeTransactions",
+ [RD_KAFKAP_ListTransactions] = "ListTransactions",
+ [RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds",
+ };
+ static RD_TLS char ret[64];
+
+ if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) ||
+ !names[ApiKey]) {
+ rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
+ return ret;
+ }
+
+ return names[ApiKey];
+}
+
+
+
+/**
+ * @brief ApiKey version support tuple.
+ */
+struct rd_kafka_ApiVersion {
+ int16_t ApiKey;
+ int16_t MinVer;
+ int16_t MaxVer;
+};
+
+/**
+ * @brief ApiVersion.ApiKey comparator.
+ */
+static RD_UNUSED int rd_kafka_ApiVersion_key_cmp(const void *_a,
+ const void *_b) {
+ const struct rd_kafka_ApiVersion *a =
+ (const struct rd_kafka_ApiVersion *)_a;
+ const struct rd_kafka_ApiVersion *b =
+ (const struct rd_kafka_ApiVersion *)_b;
+ return RD_CMP(a->ApiKey, b->ApiKey);
+}
+
+
+
+typedef enum {
+ RD_KAFKA_READ_UNCOMMITTED = 0,
+ RD_KAFKA_READ_COMMITTED = 1
+} rd_kafka_isolation_level_t;
+
+
+
+#define RD_KAFKA_CTRL_MSG_ABORT 0
+#define RD_KAFKA_CTRL_MSG_COMMIT 1
+
+
+/**
+ * @enum Coordinator type, used with FindCoordinatorRequest
+ */
+typedef enum rd_kafka_coordtype_t {
+ RD_KAFKA_COORD_GROUP = 0,
+ RD_KAFKA_COORD_TXN = 1
+} rd_kafka_coordtype_t;
+
+
+/**
+ *
+ * Kafka protocol string representation prefixed with a convenience header
+ *
+ * Serialized format:
+ * { uint16, data.. }
+ *
+ */
+typedef struct rd_kafkap_str_s {
+ /* convenience header (aligned access, host endian) */
+ int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
+ const char *str; /* points into data[] or other memory,
+ * not NULL-terminated */
+} rd_kafkap_str_t;
+
+
+#define RD_KAFKAP_STR_LEN_NULL -1
+#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
+
+/* Returns the length of the string of a kafka protocol string representation */
+#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
+#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
+
+/* Returns the actual size of a kafka protocol string representation. */
+#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
+#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
+
+
+/** @returns true if kstr is pre-serialized through .._new() */
+#define RD_KAFKAP_STR_IS_SERIALIZED(kstr) \
+ (((const char *)((kstr) + 1)) + 2 == (const char *)((kstr)->str))
+
+/* Serialized Kafka string: only works for _new() kstrs.
+ * Check with RD_KAFKAP_STR_IS_SERIALIZED */
+#define RD_KAFKAP_STR_SER(kstr) ((kstr) + 1)
+
+/* Macro suitable for "%.*s" printing. */
+#define RD_KAFKAP_STR_PR(kstr) \
+ (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
+ (kstr)->str
+
+/* strndupa() a Kafka string */
+#define RD_KAFKAP_STR_DUPA(destptr, kstr) \
+ rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+/* strndup() a Kafka string */
+#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
+
+#define RD_KAFKAP_STR_INITIALIZER \
+ { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL }
+
+/**
+ * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
+ */
+static RD_UNUSED void rd_kafkap_str_destroy(rd_kafkap_str_t *kstr) {
+ rd_free(kstr);
+}
+
+
+
+/**
+ * Allocate a new Kafka string and make a copy of 'str'.
+ * If 'len' is -1 the length will be calculated.
+ * Supports Kafka NULL strings.
+ * Nul-terminates the string, but the trailing \0 is not part of
+ * the serialized string.
+ */
+static RD_INLINE RD_UNUSED rd_kafkap_str_t *rd_kafkap_str_new(const char *str,
+ int len) {
+ rd_kafkap_str_t *kstr;
+ int16_t klen;
+
+ if (!str)
+ len = RD_KAFKAP_STR_LEN_NULL;
+ else if (len == -1)
+ len = (int)strlen(str);
+
+ kstr = (rd_kafkap_str_t *)rd_malloc(
+ sizeof(*kstr) + 2 + (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
+ kstr->len = len;
+
+ /* Serialised format: 16-bit string length */
+ klen = htobe16(len);
+ memcpy(kstr + 1, &klen, 2);
+
+ /* Pre-Serialised format: non null-terminated string */
+ if (len == RD_KAFKAP_STR_LEN_NULL)
+ kstr->str = NULL;
+ else {
+ kstr->str = ((const char *)(kstr + 1)) + 2;
+ memcpy((void *)kstr->str, str, len);
+ ((char *)kstr->str)[len] = '\0';
+ }
+
+ return kstr;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafka_pstr_destroy()
+ */
+static RD_INLINE RD_UNUSED rd_kafkap_str_t *
+rd_kafkap_str_copy(const rd_kafkap_str_t *src) {
+ return rd_kafkap_str_new(src->str, src->len);
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp(const rd_kafkap_str_t *a,
+ const rd_kafkap_str_t *b) {
+ int minlen = RD_MIN(a->len, b->len);
+ int r = memcmp(a->str, b->str, minlen);
+ if (r)
+ return r;
+ else
+ return RD_CMP(a->len, b->len);
+}
+
+static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str(const rd_kafkap_str_t *a,
+ const char *str) {
+ int len = (int)strlen(str);
+ int minlen = RD_MIN(a->len, len);
+ int r = memcmp(a->str, str, minlen);
+ if (r)
+ return r;
+ else
+ return RD_CMP(a->len, len);
+}
+
+static RD_INLINE RD_UNUSED int
+rd_kafkap_str_cmp_str2(const char *str, const rd_kafkap_str_t *b) {
+ int len = (int)strlen(str);
+ int minlen = RD_MIN(b->len, len);
+ int r = memcmp(str, b->str, minlen);
+ if (r)
+ return r;
+ else
+ return RD_CMP(len, b->len);
+}
+
+
+
+/**
+ *
+ * Kafka protocol bytes array representation prefixed with a convenience header
+ *
+ * Serialized format:
+ * { uint32, data.. }
+ *
+ */
+typedef struct rd_kafkap_bytes_s {
+ /* convenience header (aligned access, host endian) */
+ int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
+ const void *data; /* points just past the struct, or other memory,
+ * not NULL-terminated */
+ const char _data[1]; /* Bytes following struct when new()ed */
+} rd_kafkap_bytes_t;
+
+
+#define RD_KAFKAP_BYTES_LEN_NULL -1
+#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
+ ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
+
+/* Returns the length of the bytes of a kafka protocol bytes representation */
+#define RD_KAFKAP_BYTES_LEN0(len) \
+ ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0 : (len))
+#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
+
+/* Returns the actual size of a kafka protocol bytes representation. */
+#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
+#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
+
+/** @returns true if kbyes is pre-serialized through .._new() */
+#define RD_KAFKAP_BYTES_IS_SERIALIZED(kstr) \
+ (((const char *)((kbytes) + 1)) + 2 == (const char *)((kbytes)->data))
+
+/* Serialized Kafka bytes: only works for _new() kbytes */
+#define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes) + 1)
+
+
+/**
+ * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
+ */
+static RD_UNUSED void rd_kafkap_bytes_destroy(rd_kafkap_bytes_t *kbytes) {
+ rd_free(kbytes);
+}
+
+
+/**
+ * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
+ * If \p len > 0 but \p bytes is NULL no copying is performed by
+ * the bytes structure will be allocated to fit \p size bytes.
+ *
+ * Supports:
+ * - Kafka NULL bytes (bytes==NULL,len==0),
+ * - Empty bytes (bytes!=NULL,len==0)
+ * - Copy data (bytes!=NULL,len>0)
+ * - No-copy, just alloc (bytes==NULL,len>0)
+ */
+static RD_INLINE RD_UNUSED rd_kafkap_bytes_t *
+rd_kafkap_bytes_new(const char *bytes, int32_t len) {
+ rd_kafkap_bytes_t *kbytes;
+ int32_t klen;
+
+ if (!bytes && !len)
+ len = RD_KAFKAP_BYTES_LEN_NULL;
+
+ kbytes = (rd_kafkap_bytes_t *)rd_malloc(
+ sizeof(*kbytes) + 4 + (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
+ kbytes->len = len;
+
+ klen = htobe32(len);
+ memcpy((void *)(kbytes + 1), &klen, 4);
+
+ if (len == RD_KAFKAP_BYTES_LEN_NULL)
+ kbytes->data = NULL;
+ else {
+ kbytes->data = ((const char *)(kbytes + 1)) + 4;
+ if (bytes)
+ memcpy((void *)kbytes->data, bytes, len);
+ }
+
+ return kbytes;
+}
+
+
+/**
+ * Makes a copy of `src`. The copy will be fully allocated and should
+ * be freed with rd_kafkap_bytes_destroy()
+ */
+static RD_INLINE RD_UNUSED rd_kafkap_bytes_t *
+rd_kafkap_bytes_copy(const rd_kafkap_bytes_t *src) {
+ return rd_kafkap_bytes_new((const char *)src->data, src->len);
+}
+
+
+static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp(const rd_kafkap_bytes_t *a,
+ const rd_kafkap_bytes_t *b) {
+ int minlen = RD_MIN(a->len, b->len);
+ int r = memcmp(a->data, b->data, minlen);
+ if (r)
+ return r;
+ else
+ return RD_CMP(a->len, b->len);
+}
+
+static RD_INLINE RD_UNUSED int
+rd_kafkap_bytes_cmp_data(const rd_kafkap_bytes_t *a,
+ const char *data,
+ int len) {
+ int minlen = RD_MIN(a->len, len);
+ int r = memcmp(a->data, data, minlen);
+ if (r)
+ return r;
+ else
+ return RD_CMP(a->len, len);
+}
+
+
+
+typedef struct rd_kafka_buf_s rd_kafka_buf_t;
+
+
+#define RD_KAFKA_NODENAME_SIZE 256
+
+
+
+/**
+ * @brief Message overheads (worst-case)
+ */
+
+/**
+ * MsgVersion v0..v1
+ */
+/* Offset + MessageSize */
+#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8 + 4)
+/* CRC + Magic + Attr + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4 + 1 + 1 + 4 + 4)
+/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
+#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4 + 1 + 1 + 8 + 4 + 4)
+/* Maximum per-message overhead */
+#define RD_KAFKAP_MESSAGE_V0_OVERHEAD \
+ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
+#define RD_KAFKAP_MESSAGE_V1_OVERHEAD \
+ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
+
+/**
+ * MsgVersion v2
+ */
+#define RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD \
+ ( /* Length (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + /* Attributes */ \
+ 1 + /* TimestampDelta (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int64_t) + /* OffsetDelta (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + /* KeyLen (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + /* ValueLen (varint) */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t) + /* HeaderCnt (varint): */ \
+ RD_UVARINT_ENC_SIZEOF(int32_t))
+
+#define RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD \
+ ( /* Length (varint) */ \
+ RD_UVARINT_ENC_SIZE_0() + /* Attributes */ \
+ 1 + /* TimestampDelta (varint) */ \
+ RD_UVARINT_ENC_SIZE_0() + /* OffsetDelta (varint) */ \
+ RD_UVARINT_ENC_SIZE_0() + /* KeyLen (varint) */ \
+ RD_UVARINT_ENC_SIZE_0() + /* ValueLen (varint) */ \
+ RD_UVARINT_ENC_SIZE_0() + /* HeaderCnt (varint): */ \
+ RD_UVARINT_ENC_SIZE_0())
+
+
+/**
+ * @brief MessageSets are not explicitly versioned but depends on the
+ * Produce/Fetch API version and the encompassed Message versions.
+ * We use the Message version (MsgVersion, aka MagicByte) to describe
+ * the MessageSet version, that is, MsgVersion <= 1 uses the old
+ * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
+ */
+
+/* Old MessageSet header: none */
+#define RD_KAFKAP_MSGSET_V0_SIZE 0
+
+/* MessageSet v2 header */
+#define RD_KAFKAP_MSGSET_V2_SIZE \
+ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4 + 4)
+
+/* Byte offsets for MessageSet fields */
+#define RD_KAFKAP_MSGSET_V2_OF_Length (8)
+#define RD_KAFKAP_MSGSET_V2_OF_MagicByte (8 + 4 + 4)
+#define RD_KAFKAP_MSGSET_V2_OF_CRC (8 + 4 + 4 + 1)
+#define RD_KAFKAP_MSGSET_V2_OF_Attributes (8 + 4 + 4 + 1 + 4)
+#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8 + 4 + 4 + 1 + 4 + 2)
+#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8 + 4 + 4 + 1 + 4 + 2 + 4)
+#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8)
+#define RD_KAFKAP_MSGSET_V2_OF_ProducerId (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8)
+#define RD_KAFKAP_MSGSET_V2_OF_ProducerEpoch \
+ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8)
+#define RD_KAFKAP_MSGSET_V2_OF_BaseSequence \
+ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2)
+#define RD_KAFKAP_MSGSET_V2_OF_RecordCount \
+ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)
+
+
+
+/**
+ * @name Producer ID and Epoch for the Idempotent Producer
+ * @{
+ *
+ */
+
+/**
+ * @brief Producer ID and Epoch
+ */
+typedef struct rd_kafka_pid_s {
+ int64_t id; /**< Producer Id */
+ int16_t epoch; /**< Producer Epoch */
+} rd_kafka_pid_t;
+
+#define RD_KAFKA_PID_INITIALIZER \
+ { -1, -1 }
+
+/**
+ * @returns true if \p PID is valid
+ */
+#define rd_kafka_pid_valid(PID) ((PID).id != -1)
+
+/**
+ * @brief Check two pids for equality
+ */
+static RD_UNUSED RD_INLINE int rd_kafka_pid_eq(const rd_kafka_pid_t a,
+ const rd_kafka_pid_t b) {
+ return a.id == b.id && a.epoch == b.epoch;
+}
+
+/**
+ * @brief Pid+epoch comparator
+ */
+static RD_UNUSED int rd_kafka_pid_cmp(const void *_a, const void *_b) {
+ const rd_kafka_pid_t *a = _a, *b = _b;
+
+ if (a->id < b->id)
+ return -1;
+ else if (a->id > b->id)
+ return 1;
+
+ return (int)a->epoch - (int)b->epoch;
+}
+
+
+/**
+ * @returns the string representation of a PID in a thread-safe
+ * static buffer.
+ */
+static RD_UNUSED const char *rd_kafka_pid2str(const rd_kafka_pid_t pid) {
+ static RD_TLS char buf[2][64];
+ static RD_TLS int i;
+
+ if (!rd_kafka_pid_valid(pid))
+ return "PID{Invalid}";
+
+ i = (i + 1) % 2;
+
+ rd_snprintf(buf[i], sizeof(buf[i]), "PID{Id:%" PRId64 ",Epoch:%hd}",
+ pid.id, pid.epoch);
+
+ return buf[i];
+}
+
+/**
+ * @brief Reset the PID to invalid/init state
+ */
+static RD_UNUSED RD_INLINE void rd_kafka_pid_reset(rd_kafka_pid_t *pid) {
+ pid->id = -1;
+ pid->epoch = -1;
+}
+
+
+/**
+ * @brief Bump the epoch of a valid PID
+ */
+static RD_UNUSED RD_INLINE rd_kafka_pid_t
+rd_kafka_pid_bump(const rd_kafka_pid_t old) {
+ rd_kafka_pid_t new_pid = {
+ old.id, (int16_t)(((int)old.epoch + 1) & (int)INT16_MAX)};
+ return new_pid;
+}
+
+/**@}*/
+
+
+#endif /* _RDKAFKA_PROTO_H_ */