/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_PROTO_H_ #define _RDKAFKA_PROTO_H_ #include "rdendian.h" #include "rdvarint.h" /* Protocol defines */ #include "rdkafka_protocol.h" /** Default generic retry count for failed requests. * This may be overriden for specific request types. */ #define RD_KAFKA_REQUEST_DEFAULT_RETRIES 2 /** Max (practically infinite) retry count */ #define RD_KAFKA_REQUEST_MAX_RETRIES INT_MAX /** Do not retry request */ #define RD_KAFKA_REQUEST_NO_RETRIES 0 /** * Request types */ struct rd_kafkap_reqhdr { int32_t Size; int16_t ApiKey; int16_t ApiVersion; int32_t CorrId; /* ClientId follows */ }; #define RD_KAFKAP_REQHDR_SIZE (4 + 2 + 2 + 4) #define RD_KAFKAP_RESHDR_SIZE (4 + 4) /** * Response header */ struct rd_kafkap_reshdr { int32_t Size; int32_t CorrId; }; /** * Request type v1 (flexible version) * * i32 Size * i16 ApiKey * i16 ApiVersion * i32 CorrId * string ClientId (2-byte encoding, not compact string) * uvarint Tags * * uvarint EndTags * * Any struct-type (non-primitive or array type) field in the request payload * must also have a trailing tags list, this goes for structs in arrays as well. */ /** * @brief Protocol request type (ApiKey) to name/string. * * Generate updates to this list with generate_proto.sh. */ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) { static const char *names[] = { [RD_KAFKAP_Produce] = "Produce", [RD_KAFKAP_Fetch] = "Fetch", [RD_KAFKAP_ListOffsets] = "ListOffsets", [RD_KAFKAP_Metadata] = "Metadata", [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr", [RD_KAFKAP_StopReplica] = "StopReplica", [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata", [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown", [RD_KAFKAP_OffsetCommit] = "OffsetCommit", [RD_KAFKAP_OffsetFetch] = "OffsetFetch", [RD_KAFKAP_FindCoordinator] = "FindCoordinator", [RD_KAFKAP_JoinGroup] = "JoinGroup", [RD_KAFKAP_Heartbeat] = "Heartbeat", [RD_KAFKAP_LeaveGroup] = "LeaveGroup", [RD_KAFKAP_SyncGroup] = "SyncGroup", [RD_KAFKAP_DescribeGroups] = "DescribeGroups", [RD_KAFKAP_ListGroups] = "ListGroups", [RD_KAFKAP_SaslHandshake] = "SaslHandshake", [RD_KAFKAP_ApiVersion] = "ApiVersion", [RD_KAFKAP_CreateTopics] = "CreateTopics", [RD_KAFKAP_DeleteTopics] = "DeleteTopics", [RD_KAFKAP_DeleteRecords] = "DeleteRecords", [RD_KAFKAP_InitProducerId] = "InitProducerId", [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch", [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn", [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn", [RD_KAFKAP_EndTxn] = "EndTxn", [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers", [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit", [RD_KAFKAP_DescribeAcls] = "DescribeAcls", [RD_KAFKAP_CreateAcls] = "CreateAcls", [RD_KAFKAP_DeleteAcls] = "DeleteAcls", [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs", [RD_KAFKAP_AlterConfigs] = "AlterConfigs", [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs", [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs", [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate", [RD_KAFKAP_CreatePartitions] = "CreatePartitions", [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken", [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken", [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken", [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken", [RD_KAFKAP_DeleteGroups] = "DeleteGroups", [RD_KAFKAP_ElectLeaders] = "ElectLeadersRequest", [RD_KAFKAP_IncrementalAlterConfigs] = "IncrementalAlterConfigsRequest", [RD_KAFKAP_AlterPartitionReassignments] = "AlterPartitionReassignmentsRequest", [RD_KAFKAP_ListPartitionReassignments] = "ListPartitionReassignmentsRequest", [RD_KAFKAP_OffsetDelete] = "OffsetDeleteRequest", [RD_KAFKAP_DescribeClientQuotas] = "DescribeClientQuotasRequest", [RD_KAFKAP_AlterClientQuotas] = "AlterClientQuotasRequest", [RD_KAFKAP_DescribeUserScramCredentials] = "DescribeUserScramCredentialsRequest", [RD_KAFKAP_AlterUserScramCredentials] = "AlterUserScramCredentialsRequest", [RD_KAFKAP_Vote] = "VoteRequest", [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest", [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest", [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest", [RD_KAFKAP_AlterIsr] = "AlterIsrRequest", [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest", [RD_KAFKAP_Envelope] = "EnvelopeRequest", [RD_KAFKAP_FetchSnapshot] = "FetchSnapshot", [RD_KAFKAP_DescribeCluster] = "DescribeCluster", [RD_KAFKAP_DescribeProducers] = "DescribeProducers", [RD_KAFKAP_BrokerHeartbeat] = "BrokerHeartbeat", [RD_KAFKAP_UnregisterBroker] = "UnregisterBroker", [RD_KAFKAP_DescribeTransactions] = "DescribeTransactions", [RD_KAFKAP_ListTransactions] = "ListTransactions", [RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds", }; static RD_TLS char ret[64]; if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) || !names[ApiKey]) { rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey); return ret; } return names[ApiKey]; } /** * @brief ApiKey version support tuple. */ struct rd_kafka_ApiVersion { int16_t ApiKey; int16_t MinVer; int16_t MaxVer; }; /** * @brief ApiVersion.ApiKey comparator. */ static RD_UNUSED int rd_kafka_ApiVersion_key_cmp(const void *_a, const void *_b) { const struct rd_kafka_ApiVersion *a = (const struct rd_kafka_ApiVersion *)_a; const struct rd_kafka_ApiVersion *b = (const struct rd_kafka_ApiVersion *)_b; return RD_CMP(a->ApiKey, b->ApiKey); } typedef enum { RD_KAFKA_READ_UNCOMMITTED = 0, RD_KAFKA_READ_COMMITTED = 1 } rd_kafka_isolation_level_t; #define RD_KAFKA_CTRL_MSG_ABORT 0 #define RD_KAFKA_CTRL_MSG_COMMIT 1 /** * @enum Coordinator type, used with FindCoordinatorRequest */ typedef enum rd_kafka_coordtype_t { RD_KAFKA_COORD_GROUP = 0, RD_KAFKA_COORD_TXN = 1 } rd_kafka_coordtype_t; /** * * Kafka protocol string representation prefixed with a convenience header * * Serialized format: * { uint16, data.. } * */ typedef struct rd_kafkap_str_s { /* convenience header (aligned access, host endian) */ int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */ const char *str; /* points into data[] or other memory, * not NULL-terminated */ } rd_kafkap_str_t; #define RD_KAFKAP_STR_LEN_NULL -1 #define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL) /* Returns the length of the string of a kafka protocol string representation */ #define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len)) #define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len) /* Returns the actual size of a kafka protocol string representation. */ #define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len)) #define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len) /** @returns true if kstr is pre-serialized through .._new() */ #define RD_KAFKAP_STR_IS_SERIALIZED(kstr) \ (((const char *)((kstr) + 1)) + 2 == (const char *)((kstr)->str)) /* Serialized Kafka string: only works for _new() kstrs. * Check with RD_KAFKAP_STR_IS_SERIALIZED */ #define RD_KAFKAP_STR_SER(kstr) ((kstr) + 1) /* Macro suitable for "%.*s" printing. */ #define RD_KAFKAP_STR_PR(kstr) \ (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \ (kstr)->str /* strndupa() a Kafka string */ #define RD_KAFKAP_STR_DUPA(destptr, kstr) \ rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr)) /* strndup() a Kafka string */ #define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr)) #define RD_KAFKAP_STR_INITIALIZER \ { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL } /** * Frees a Kafka string previously allocated with `rd_kafkap_str_new()` */ static RD_UNUSED void rd_kafkap_str_destroy(rd_kafkap_str_t *kstr) { rd_free(kstr); } /** * Allocate a new Kafka string and make a copy of 'str'. * If 'len' is -1 the length will be calculated. * Supports Kafka NULL strings. * Nul-terminates the string, but the trailing \0 is not part of * the serialized string. */ static RD_INLINE RD_UNUSED rd_kafkap_str_t *rd_kafkap_str_new(const char *str, int len) { rd_kafkap_str_t *kstr; int16_t klen; if (!str) len = RD_KAFKAP_STR_LEN_NULL; else if (len == -1) len = (int)strlen(str); kstr = (rd_kafkap_str_t *)rd_malloc( sizeof(*kstr) + 2 + (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1)); kstr->len = len; /* Serialised format: 16-bit string length */ klen = htobe16(len); memcpy(kstr + 1, &klen, 2); /* Pre-Serialised format: non null-terminated string */ if (len == RD_KAFKAP_STR_LEN_NULL) kstr->str = NULL; else { kstr->str = ((const char *)(kstr + 1)) + 2; memcpy((void *)kstr->str, str, len); ((char *)kstr->str)[len] = '\0'; } return kstr; } /** * Makes a copy of `src`. The copy will be fully allocated and should * be freed with rd_kafka_pstr_destroy() */ static RD_INLINE RD_UNUSED rd_kafkap_str_t * rd_kafkap_str_copy(const rd_kafkap_str_t *src) { return rd_kafkap_str_new(src->str, src->len); } static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp(const rd_kafkap_str_t *a, const rd_kafkap_str_t *b) { int minlen = RD_MIN(a->len, b->len); int r = memcmp(a->str, b->str, minlen); if (r) return r; else return RD_CMP(a->len, b->len); } static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str(const rd_kafkap_str_t *a, const char *str) { int len = (int)strlen(str); int minlen = RD_MIN(a->len, len); int r = memcmp(a->str, str, minlen); if (r) return r; else return RD_CMP(a->len, len); } static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2(const char *str, const rd_kafkap_str_t *b) { int len = (int)strlen(str); int minlen = RD_MIN(b->len, len); int r = memcmp(str, b->str, minlen); if (r) return r; else return RD_CMP(len, b->len); } /** * * Kafka protocol bytes array representation prefixed with a convenience header * * Serialized format: * { uint32, data.. } * */ typedef struct rd_kafkap_bytes_s { /* convenience header (aligned access, host endian) */ int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */ const void *data; /* points just past the struct, or other memory, * not NULL-terminated */ const char _data[1]; /* Bytes following struct when new()ed */ } rd_kafkap_bytes_t; #define RD_KAFKAP_BYTES_LEN_NULL -1 #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \ ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL) /* Returns the length of the bytes of a kafka protocol bytes representation */ #define RD_KAFKAP_BYTES_LEN0(len) \ ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0 : (len)) #define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len) /* Returns the actual size of a kafka protocol bytes representation. */ #define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len)) #define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len) /** @returns true if kbyes is pre-serialized through .._new() */ #define RD_KAFKAP_BYTES_IS_SERIALIZED(kstr) \ (((const char *)((kbytes) + 1)) + 2 == (const char *)((kbytes)->data)) /* Serialized Kafka bytes: only works for _new() kbytes */ #define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes) + 1) /** * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()` */ static RD_UNUSED void rd_kafkap_bytes_destroy(rd_kafkap_bytes_t *kbytes) { rd_free(kbytes); } /** * @brief Allocate a new Kafka bytes and make a copy of 'bytes'. * If \p len > 0 but \p bytes is NULL no copying is performed by * the bytes structure will be allocated to fit \p size bytes. * * Supports: * - Kafka NULL bytes (bytes==NULL,len==0), * - Empty bytes (bytes!=NULL,len==0) * - Copy data (bytes!=NULL,len>0) * - No-copy, just alloc (bytes==NULL,len>0) */ static RD_INLINE RD_UNUSED rd_kafkap_bytes_t * rd_kafkap_bytes_new(const char *bytes, int32_t len) { rd_kafkap_bytes_t *kbytes; int32_t klen; if (!bytes && !len) len = RD_KAFKAP_BYTES_LEN_NULL; kbytes = (rd_kafkap_bytes_t *)rd_malloc( sizeof(*kbytes) + 4 + (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len)); kbytes->len = len; klen = htobe32(len); memcpy((void *)(kbytes + 1), &klen, 4); if (len == RD_KAFKAP_BYTES_LEN_NULL) kbytes->data = NULL; else { kbytes->data = ((const char *)(kbytes + 1)) + 4; if (bytes) memcpy((void *)kbytes->data, bytes, len); } return kbytes; } /** * Makes a copy of `src`. The copy will be fully allocated and should * be freed with rd_kafkap_bytes_destroy() */ static RD_INLINE RD_UNUSED rd_kafkap_bytes_t * rd_kafkap_bytes_copy(const rd_kafkap_bytes_t *src) { return rd_kafkap_bytes_new((const char *)src->data, src->len); } static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp(const rd_kafkap_bytes_t *a, const rd_kafkap_bytes_t *b) { int minlen = RD_MIN(a->len, b->len); int r = memcmp(a->data, b->data, minlen); if (r) return r; else return RD_CMP(a->len, b->len); } static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp_data(const rd_kafkap_bytes_t *a, const char *data, int len) { int minlen = RD_MIN(a->len, len); int r = memcmp(a->data, data, minlen); if (r) return r; else return RD_CMP(a->len, len); } typedef struct rd_kafka_buf_s rd_kafka_buf_t; #define RD_KAFKA_NODENAME_SIZE 256 /** * @brief Message overheads (worst-case) */ /** * MsgVersion v0..v1 */ /* Offset + MessageSize */ #define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8 + 4) /* CRC + Magic + Attr + KeyLen + ValueLen */ #define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4 + 1 + 1 + 4 + 4) /* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */ #define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4 + 1 + 1 + 8 + 4 + 4) /* Maximum per-message overhead */ #define RD_KAFKAP_MESSAGE_V0_OVERHEAD \ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE) #define RD_KAFKAP_MESSAGE_V1_OVERHEAD \ (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE) /** * MsgVersion v2 */ #define RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD \ ( /* Length (varint) */ \ RD_UVARINT_ENC_SIZEOF(int32_t) + /* Attributes */ \ 1 + /* TimestampDelta (varint) */ \ RD_UVARINT_ENC_SIZEOF(int64_t) + /* OffsetDelta (varint) */ \ RD_UVARINT_ENC_SIZEOF(int32_t) + /* KeyLen (varint) */ \ RD_UVARINT_ENC_SIZEOF(int32_t) + /* ValueLen (varint) */ \ RD_UVARINT_ENC_SIZEOF(int32_t) + /* HeaderCnt (varint): */ \ RD_UVARINT_ENC_SIZEOF(int32_t)) #define RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD \ ( /* Length (varint) */ \ RD_UVARINT_ENC_SIZE_0() + /* Attributes */ \ 1 + /* TimestampDelta (varint) */ \ RD_UVARINT_ENC_SIZE_0() + /* OffsetDelta (varint) */ \ RD_UVARINT_ENC_SIZE_0() + /* KeyLen (varint) */ \ RD_UVARINT_ENC_SIZE_0() + /* ValueLen (varint) */ \ RD_UVARINT_ENC_SIZE_0() + /* HeaderCnt (varint): */ \ RD_UVARINT_ENC_SIZE_0()) /** * @brief MessageSets are not explicitly versioned but depends on the * Produce/Fetch API version and the encompassed Message versions. * We use the Message version (MsgVersion, aka MagicByte) to describe * the MessageSet version, that is, MsgVersion <= 1 uses the old * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2 */ /* Old MessageSet header: none */ #define RD_KAFKAP_MSGSET_V0_SIZE 0 /* MessageSet v2 header */ #define RD_KAFKAP_MSGSET_V2_SIZE \ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4 + 4) /* Byte offsets for MessageSet fields */ #define RD_KAFKAP_MSGSET_V2_OF_Length (8) #define RD_KAFKAP_MSGSET_V2_OF_MagicByte (8 + 4 + 4) #define RD_KAFKAP_MSGSET_V2_OF_CRC (8 + 4 + 4 + 1) #define RD_KAFKAP_MSGSET_V2_OF_Attributes (8 + 4 + 4 + 1 + 4) #define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8 + 4 + 4 + 1 + 4 + 2) #define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8 + 4 + 4 + 1 + 4 + 2 + 4) #define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8) #define RD_KAFKAP_MSGSET_V2_OF_ProducerId (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8) #define RD_KAFKAP_MSGSET_V2_OF_ProducerEpoch \ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8) #define RD_KAFKAP_MSGSET_V2_OF_BaseSequence \ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2) #define RD_KAFKAP_MSGSET_V2_OF_RecordCount \ (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4) /** * @name Producer ID and Epoch for the Idempotent Producer * @{ * */ /** * @brief Producer ID and Epoch */ typedef struct rd_kafka_pid_s { int64_t id; /**< Producer Id */ int16_t epoch; /**< Producer Epoch */ } rd_kafka_pid_t; #define RD_KAFKA_PID_INITIALIZER \ { -1, -1 } /** * @returns true if \p PID is valid */ #define rd_kafka_pid_valid(PID) ((PID).id != -1) /** * @brief Check two pids for equality */ static RD_UNUSED RD_INLINE int rd_kafka_pid_eq(const rd_kafka_pid_t a, const rd_kafka_pid_t b) { return a.id == b.id && a.epoch == b.epoch; } /** * @brief Pid+epoch comparator */ static RD_UNUSED int rd_kafka_pid_cmp(const void *_a, const void *_b) { const rd_kafka_pid_t *a = _a, *b = _b; if (a->id < b->id) return -1; else if (a->id > b->id) return 1; return (int)a->epoch - (int)b->epoch; } /** * @returns the string representation of a PID in a thread-safe * static buffer. */ static RD_UNUSED const char *rd_kafka_pid2str(const rd_kafka_pid_t pid) { static RD_TLS char buf[2][64]; static RD_TLS int i; if (!rd_kafka_pid_valid(pid)) return "PID{Invalid}"; i = (i + 1) % 2; rd_snprintf(buf[i], sizeof(buf[i]), "PID{Id:%" PRId64 ",Epoch:%hd}", pid.id, pid.epoch); return buf[i]; } /** * @brief Reset the PID to invalid/init state */ static RD_UNUSED RD_INLINE void rd_kafka_pid_reset(rd_kafka_pid_t *pid) { pid->id = -1; pid->epoch = -1; } /** * @brief Bump the epoch of a valid PID */ static RD_UNUSED RD_INLINE rd_kafka_pid_t rd_kafka_pid_bump(const rd_kafka_pid_t old) { rd_kafka_pid_t new_pid = { old.id, (int16_t)(((int)old.epoch + 1) & (int)INT16_MAX)}; return new_pid; } /**@}*/ #endif /* _RDKAFKA_PROTO_H_ */