summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md
blob: 66f796bcab2b2436172d79917fad2e3c24bc8cf1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
# Introduction to librdkafka - the Apache Kafka C/C++ client library


librdkafka is a high performance C implementation of the Apache
Kafka client, providing a reliable and performant client for production use.
librdkafka also provides a native C++ interface.

<!-- markdown-toc start - Don't edit this section. Run M-x markdown-toc-refresh-toc -->
**Table of Contents**

- [Introduction to librdkafka - the Apache Kafka C/C++ client library](#introduction-to-librdkafka---the-apache-kafka-cc-client-library)
    - [Performance](#performance)
        - [High throughput](#high-throughput)
        - [Low latency](#low-latency)
            - [Latency measurement](#latency-measurement)
        - [Compression](#compression)
    - [Message reliability](#message-reliability)
        - [Producer message delivery success](#producer-message-delivery-success)
        - [Producer message delivery failure](#producer-message-delivery-failure)
            - [Error: Timed out in transmission queue](#error-timed-out-in-transmission-queue)
            - [Error: Timed out in flight to/from broker](#error-timed-out-in-flight-tofrom-broker)
            - [Error: Temporary broker-side error](#error-temporary-broker-side-error)
            - [Error: Temporary errors due to stale metadata](#error-temporary-errors-due-to-stale-metadata)
            - [Error: Local time out](#error-local-time-out)
            - [Error: Permanent errors](#error-permanent-errors)
        - [Producer retries](#producer-retries)
        - [Reordering](#reordering)
        - [Idempotent Producer](#idempotent-producer)
            - [Guarantees](#guarantees)
            - [Ordering and message sequence numbers](#ordering-and-message-sequence-numbers)
            - [Partitioner considerations](#partitioner-considerations)
            - [Message timeout considerations](#message-timeout-considerations)
            - [Leader change](#leader-change)
            - [Error handling](#error-handling)
                - [RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER](#rdkafkaresperroutofordersequencenumber)
                - [RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER](#rdkafkaresperrduplicatesequencenumber)
                - [RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID](#rdkafkaresperrunknownproducerid)
                - [Standard errors](#standard-errors)
                - [Message persistence status](#message-persistence-status)
        - [Transactional Producer](#transactional-producer)
            - [Error handling](#error-handling-1)
            - [Old producer fencing](#old-producer-fencing)
            - [Configuration considerations](#configuration-considerations)
        - [Exactly Once Semantics (EOS) and transactions](#exactly-once-semantics-eos-and-transactions)
    - [Usage](#usage)
        - [Documentation](#documentation)
        - [Initialization](#initialization)
        - [Configuration](#configuration)
            - [Example](#example)
        - [Termination](#termination)
            - [High-level KafkaConsumer](#high-level-kafkaconsumer)
            - [Producer](#producer)
            - [Admin API client](#admin-api-client)
            - [Speeding up termination](#speeding-up-termination)
        - [Threads and callbacks](#threads-and-callbacks)
        - [Brokers](#brokers)
            - [SSL](#ssl)
            - [OAUTHBEARER with support for OIDC](#oauthbearer-with-support-for-oidc)
            - [Sparse connections](#sparse-connections)
                - [Random broker selection](#random-broker-selection)
                - [Persistent broker connections](#persistent-broker-connections)
            - [Connection close](#connection-close)
            - [Fetch From Follower](#fetch-from-follower)
        - [Logging](#logging)
            - [Debug contexts](#debug-contexts)
        - [Feature discovery](#feature-discovery)
        - [Producer API](#producer-api)
        - [Simple Consumer API (legacy)](#simple-consumer-api-legacy)
            - [Offset management](#offset-management)
                - [Auto offset commit](#auto-offset-commit)
                - [At-least-once processing](#at-least-once-processing)
                - [Auto offset reset](#auto-offset-reset)
        - [Consumer groups](#consumer-groups)
            - [Static consumer groups](#static-consumer-groups)
        - [Topics](#topics)
            - [Unknown or unauthorized topics](#unknown-or-unauthorized-topics)
            - [Topic metadata propagation for newly created topics](#topic-metadata-propagation-for-newly-created-topics)
            - [Topic auto creation](#topic-auto-creation)
        - [Metadata](#metadata)
            - [< 0.9.3](#-093)
            - [> 0.9.3](#-093)
            - [Query reasons](#query-reasons)
            - [Caching strategy](#caching-strategy)
        - [Fatal errors](#fatal-errors)
            - [Fatal producer errors](#fatal-producer-errors)
            - [Fatal consumer errors](#fatal-consumer-errors)
    - [Compatibility](#compatibility)
        - [Broker version compatibility](#broker-version-compatibility)
            - [Broker version >= 0.10.0.0 (or trunk)](#broker-version--01000-or-trunk)
            - [Broker versions 0.9.0.x](#broker-versions-090x)
            - [Broker versions 0.8.x.y](#broker-versions-08xy)
            - [Detailed description](#detailed-description)
        - [Supported KIPs](#supported-kips)
        - [Supported protocol versions](#supported-protocol-versions)
- [Recommendations for language binding developers](#recommendations-for-language-binding-developers)
    - [Expose the configuration interface pass-thru](#expose-the-configuration-interface-pass-thru)
    - [Error constants](#error-constants)
    - [Reporting client software name and version to broker](#reporting-client-software-name-and-version-to-broker)
    - [Documentation reuse](#documentation-reuse)
    - [Community support](#community-support)

<!-- markdown-toc end -->


## Performance

librdkafka is a multi-threaded library designed for use on modern hardware and
it attempts to keep memory copying to a minimum. The payload of produced or
consumed messages may pass through without any copying
(if so desired by the application) putting no limit on message sizes.

librdkafka allows you to decide if high throughput is the name of the game,
or if a low latency service is required, or a balance between the two, all
through the configuration property interface.

The single most important configuration properties for performance tuning is
`linger.ms` - how long to wait for `batch.num.messages` or `batch.size` to
fill up in the local per-partition queue before sending the batch of messages
to the broker.

In low throughput scenarios, a lower value improves latency.
As throughput increases, the cost of each broker request becomes significant
impacting both maximum throughput and latency. For higher throughput
applications, latency will typically be lower using a higher `linger.ms` due
to larger batches resulting in a lesser number of requests, yielding decreased
per-message load on the broker. A good general purpose setting is 5ms.
For applications seeking maximum throughput, the recommended value is >= 50ms.


### High throughput

The key to high throughput is message batching - waiting for a certain amount
of messages to accumulate in the local queue before sending them off in
one large message set or batch to the peer. This amortizes the messaging
overhead and eliminates the adverse effect of the round trip time (rtt).

`linger.ms` (also called `queue.buffering.max.ms`) allows librdkafka to
wait up to the specified amount of time to accumulate up to
`batch.num.messages` or `batch.size` in a single batch (MessageSet) before
sending to the broker. The larger the batch the higher the throughput.
Enabling `msg` debugging (set `debug` property to `msg`) will emit log
messages for the accumulation process which lets you see what batch sizes
are being produced.

Example using `linger.ms=1`:

```
... test [0]: MessageSet with 1514 message(s) delivered
... test [3]: MessageSet with 1690 message(s) delivered
... test [0]: MessageSet with 1720 message(s) delivered
... test [3]: MessageSet with 2 message(s) delivered
... test [3]: MessageSet with 4 message(s) delivered
... test [0]: MessageSet with 4 message(s) delivered
... test [3]: MessageSet with 11 message(s) delivered
```

Example using `linger.ms=1000`:
```
... test [0]: MessageSet with 10000 message(s) delivered
... test [0]: MessageSet with 10000 message(s) delivered
... test [0]: MessageSet with 4667 message(s) delivered
... test [3]: MessageSet with 10000 message(s) delivered
... test [3]: MessageSet with 10000 message(s) delivered
... test [3]: MessageSet with 4476 message(s) delivered

```


The default setting of `linger.ms=5` is not suitable for
high throughput, it is recommended to set this value to >50ms, with
throughput leveling out somewhere around 100-1000ms depending on
message produce pattern and sizes.

These setting are set globally (`rd_kafka_conf_t`) but applies on a
per topic+partition basis.


### Low latency

When low latency messaging is required the `linger.ms` should be
tuned to the maximum permitted producer-side latency.
Setting `linger.ms` to 0 or 0.1 will make sure messages are sent as
soon as possible.
Lower buffering time leads to smaller batches and larger per-message overheads,
increasing network, memory and CPU usage for producers, brokers and consumers.

See [How to decrease message latency](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) for more info.


#### Latency measurement

End-to-end latency is preferably measured by synchronizing clocks on producers
and consumers and using the message timestamp on the consumer to calculate
the full latency. Make sure the topic's `log.message.timestamp.type` is set to
the default `CreateTime` (Kafka topic configuration, not librdkafka topic).

Latencies are typically incurred by the producer, network and broker, the
consumer effect on end-to-end latency is minimal.

To break down the end-to-end latencies and find where latencies are adding up
there are a number of metrics available through librdkafka statistics
on the producer:

 * `brokers[].int_latency` is the time, per message, between produce()
   and the message being written to a MessageSet and ProduceRequest.
   High `int_latency` indicates CPU core contention: check CPU load and,
   involuntary context switches (`/proc/<..>/status`).
   Consider using a machine/instance with more CPU cores.
   This metric is only relevant on the producer.

 * `brokers[].outbuf_latency` is the time, per protocol request
   (such as ProduceRequest), between the request being enqueued (which happens
   right after int_latency) and the time the request is written to the
   TCP socket connected to the broker.
   High `outbuf_latency` indicates CPU core contention or network congestion:
   check CPU load and socket SendQ (`netstat -anp | grep :9092`).

 * `brokers[].rtt` is the time, per protocol request, between the request being
   written to the TCP socket and the time the response is received from
   the broker.
   High `rtt` indicates broker load or network congestion:
   check broker metrics, local socket SendQ, network performance, etc.

 * `brokers[].throttle` is the time, per throttled protocol request, the
   broker throttled/delayed handling of a request due to usage quotas.
   The throttle time will also be reflected in `rtt`.

 * `topics[].batchsize` is the size of individual Producer MessageSet batches.
   See below.

 * `topics[].batchcnt` is the number of messages in individual Producer
   MessageSet batches. Due to Kafka protocol overhead a batch with few messages
   will have a higher relative processing and size overhead than a batch
   with many messages.
   Use the `linger.ms` client configuration property to set the maximum
   amount of time allowed for accumulating a single batch, the larger the
   value the larger the batches will grow, thus increasing efficiency.
   When producing messages at a high rate it is recommended to increase
   linger.ms, which will improve throughput and in some cases also latency.


See [STATISTICS.md](STATISTICS.md) for the full definition of metrics.
A JSON schema for the statistics is available in
[statistics-schema.json](src/statistics-schema.json).


### Compression

Producer message compression is enabled through the `compression.codec`
configuration property.

Compression is performed on the batch of messages in the local queue, the
larger the batch the higher likelyhood of a higher compression ratio.
The local batch queue size is controlled through the `batch.num.messages`,
`batch.size`, and `linger.ms` configuration properties as described in the
**High throughput** chapter above.



## Message reliability

Message reliability is an important factor of librdkafka - an application
can rely fully on librdkafka to deliver a message according to the specified
configuration (`request.required.acks` and `message.send.max.retries`, etc).

If the topic configuration property `request.required.acks` is set to wait
for message commit acknowledgements from brokers (any value but 0, see
[`CONFIGURATION.md`](CONFIGURATION.md)
for specifics) then librdkafka will hold on to the message until
all expected acks have been received, gracefully handling the following events:

  * Broker connection failure
  * Topic leader change
  * Produce errors signaled by the broker
  * Network problems

We recommend `request.required.acks` to be set to `all` to make sure
produced messages are acknowledged by all in-sync replica brokers.

This is handled automatically by librdkafka and the application does not need
to take any action at any of the above events.
The message will be resent up to `message.send.max.retries` times before
reporting a failure back to the application.

The delivery report callback is used by librdkafka to signal the status of
a message back to the application, it will be called once for each message
to report the status of message delivery:

  * If `error_code` is non-zero the message delivery failed and the error_code
    indicates the nature of the failure (`rd_kafka_resp_err_t` enum).
  * If `error_code` is zero the message has been successfully delivered.

See Producer API chapter for more details on delivery report callback usage.

The delivery report callback is optional but highly recommended.


### Producer message delivery success

When a ProduceRequest is successfully handled by the broker and a
ProduceResponse is received (also called the ack) without an error code
the messages from the ProduceRequest are enqueued on the delivery report
queue (if a delivery report callback has been set) and will be passed to
the application on the next invocation rd_kafka_poll().


### Producer message delivery failure

The following sub-chapters explains how different produce errors
are handled.

If the error is retryable and there are remaining retry attempts for
the given message(s), an automatic retry will be scheduled by librdkafka,
these retries are not visible to the application.

Only permanent errors and temporary errors that have reached their maximum
retry count will generate a delivery report event to the application with an
error code set.

The application should typically not attempt to retry producing the message
on failure, but instead configure librdkafka to perform these retries
using the `retries` and `retry.backoff.ms` configuration properties.


#### Error: Timed out in transmission queue

Internal error ERR__TIMED_OUT_QUEUE.

The connectivity to the broker may be stalled due to networking contention,
local or remote system issues, etc, and the request has not yet been sent.

The producer can be certain that the message has not been sent to the broker.

This is a retryable error, but is not counted as a retry attempt
since the message was never actually transmitted.

A retry by librdkafka at this point will not cause duplicate messages.


#### Error: Timed out in flight to/from broker

Internal error ERR__TIMED_OUT, ERR__TRANSPORT.

Same reasons as for `Timed out in transmission queue` above, with the
difference that the message may have been sent to the broker and might
be stalling waiting for broker replicas to ack the message, or the response
could be stalled due to networking issues.
At this point the producer can't know if the message reached the broker,
nor if the broker wrote the message to disk and replicas.

This is a retryable error.

A retry by librdkafka at this point may cause duplicate messages.


#### Error: Temporary broker-side error

Broker errors ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS,
ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND.

These errors are considered temporary and librdkafka is will retry them
if permitted by configuration.


#### Error: Temporary errors due to stale metadata

Broker errors ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION.

These errors are considered temporary and a retry is warranted, a metadata
request is automatically sent to find a new leader for the partition.

A retry by librdkafka at this point will not cause duplicate messages.


#### Error: Local time out

Internal error ERR__MSG_TIMED_OUT.

The message could not be successfully transmitted before `message.timeout.ms`
expired, typically due to no leader being available or no broker connection.
The message may have been retried due to other errors but
those error messages are abstracted by the ERR__MSG_TIMED_OUT error code.

Since the `message.timeout.ms` has passed there will be no more retries
by librdkafka.


#### Error: Permanent errors

Any other error is considered a permanent error and the message
will fail immediately, generating a delivery report event with the
distinctive error code.

The full list of permanent errors depend on the broker version and
will likely grow in the future.

Typical permanent broker errors are:
 * ERR_CORRUPT_MESSAGE
 * ERR_MSG_SIZE_TOO_LARGE  - adjust client's or broker's `message.max.bytes`.
 * ERR_UNKNOWN_TOPIC_OR_PART - topic or partition does not exist,
                               automatic topic creation is disabled on the
                               broker or the application is specifying a
                               partition that does not exist.
 * ERR_RECORD_LIST_TOO_LARGE
 * ERR_INVALID_REQUIRED_ACKS
 * ERR_TOPIC_AUTHORIZATION_FAILED
 * ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
 * ERR_CLUSTER_AUTHORIZATION_FAILED


### Producer retries

The ProduceRequest itself is not retried, instead the messages
are put back on the internal partition queue by an insert sort
that maintains their original position (the message order is defined
at the time a message is initially appended to a partition queue, i.e., after
partitioning).
A backoff time (`retry.backoff.ms`) is set on the retried messages which
effectively blocks retry attempts until the backoff time has expired.


### Reordering

As for all retries, if `max.in.flight` > 1 and `retries` > 0, retried messages
may be produced out of order, since a sub-sequent message in a sub-sequent
ProduceRequest may already be in-flight (and accepted by the broker)
by the time the retry for the failing message is sent.

Using the Idempotent Producer prevents reordering even with `max.in.flight` > 1,
see [Idempotent Producer](#idempotent-producer) below for more information.


### Idempotent Producer

librdkafka supports the idempotent producer which provides strict ordering and
and exactly-once producer guarantees.
The idempotent producer is enabled by setting the `enable.idempotence`
configuration property to `true`, this will automatically adjust a number of
other configuration properties to adhere to the idempotency requirements,
see the documentation of `enable.idempotence` in [CONFIGURATION.md](CONFIGURATION.md) for
more information.
Producer instantiation will fail if the user supplied an incompatible value
for any of the automatically adjusted properties, e.g., it is an error to
explicitly set `acks=1` when `enable.idempotence=true` is set.


#### Guarantees

There are three types of guarantees that the idempotent producer can satisfy:

 * Exactly-once - a message is only written to the log once.
                  Does NOT cover the exactly-once consumer case.
 * Ordering - a series of messages are written to the log in the
              order they were produced.
 * Gap-less - **EXPERIMENTAL** a series of messages are written once and
              in order without risk of skipping messages. The sequence
              of messages may be cut short and fail before all
              messages are written, but may not fail individual
              messages in the series.
              This guarantee is disabled by default, but may be enabled
              by setting `enable.gapless.guarantee` if individual message
              failure is a concern.
              Messages that fail due to exceeded timeout (`message.timeout.ms`),
              are permitted by the gap-less guarantee and may cause
              gaps in the message series without raising a fatal error.
              See **Message timeout considerations** below for more info.
              **WARNING**: This is an experimental property subject to
                           change or removal.

All three guarantees are in effect when idempotence is enabled, only
gap-less may be disabled individually.


#### Ordering and message sequence numbers

librdkafka maintains the original produce() ordering per-partition for all
messages produced, using an internal per-partition 64-bit counter
called the msgid which starts at 1. This msgid allows messages to be
re-inserted in the partition message queue in the original order in the
case of retries.

The Idempotent Producer functionality in the Kafka protocol also has
a per-message sequence number, which is a signed 32-bit wrapping counter that is
reset each time the Producer's ID (PID) or Epoch changes.

The librdkafka msgid is used, along with a base msgid value stored
at the time the PID/Epoch was bumped, to calculate the Kafka protocol's
message sequence number.

With Idempotent Producer enabled there is no risk of reordering despite
`max.in.flight` > 1 (capped at 5).

**Note**: "MsgId" in log messages refer to the librdkafka msgid, while "seq"
          refers to the protocol message sequence, "baseseq" is the seq of
          the first message in a batch.
          MsgId starts at 1, while message seqs start at 0.


The producer statistics also maintain two metrics for tracking the next
expected response sequence:

 * `next_ack_seq` - the next sequence to expect an acknowledgement for, which
                    is the last successfully produced MessageSet's last
                    sequence + 1.
 * `next_err_seq` - the next sequence to expect an error for, which is typically
                    the same as `next_ack_seq` until an error occurs, in which
                    case the `next_ack_seq` can't be incremented (since no
                    messages were acked on error). `next_err_seq` is used to
                    properly handle sub-sequent errors due to a failing
                    first request.

**Note**: Both are exposed in partition statistics.



#### Partitioner considerations

Strict ordering is guaranteed on a **per partition** basis.

An application utilizing the idempotent producer should not mix
producing to explicit partitions with partitioner-based partitions
since messages produced for the latter are queued separately until
a topic's partition count is known, which would insert these messages
after the partition-explicit messages regardless of produce order.


#### Message timeout considerations

If messages time out (due to `message.timeout.ms`) while in the producer queue
there will be gaps in the series of produced messages.

E.g., Messages 1,2,3,4,5 are produced by the application.
      While messages 2,3,4 are transmitted to the broker the connection to
      the broker goes down.
      While the broker is down the message timeout expires for message 2 and 3.
      As the connection comes back up messages 4, 5 are transmitted to the
      broker, resulting in a final written message sequence of 1, 4, 5.

The producer gracefully handles this case by draining the in-flight requests
for a given partition when one or more of its queued (not transmitted)
messages are timed out. When all requests are drained the Epoch is bumped and
the base sequence number is reset to the first message in the queue, effectively
skipping the timed out messages as if they had never existed from the
broker's point of view.
The message status for timed out queued messages will be
`RD_KAFKA_MSG_STATUS_NOT_PERSISTED`.

If messages time out while in-flight to the broker (also due to
`message.timeout.ms`), the protocol request will fail, the broker
connection will be closed by the client, and the timed out messages will be
removed from the producer queue. In this case the in-flight messages may be
written to the topic log by the broker, even though
a delivery report with error `ERR__MSG_TIMED_OUT` will be raised, since
the producer timed out the request before getting an acknowledgement back
from the broker.
The message status for timed out in-flight messages will be
`RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED`, indicating that the producer
does not know if the messages were written and acked by the broker,
or dropped in-flight.

An application may inspect the message status by calling
`rd_kafka_message_status()` on the message in the delivery report callback,
to see if the message was (possibly) persisted (written to the topic log) by
the broker or not.

Despite the graceful handling of timeouts, we recommend to use a
large `message.timeout.ms` to minimize the risk of timeouts.

**Warning**: `enable.gapless.guarantee` does not apply to timed-out messages.

**Note**: `delivery.timeout.ms` is an alias for `message.timeout.ms`.


#### Leader change

There are corner cases where an Idempotent Producer has outstanding
ProduceRequests in-flight to the previous leader while a new leader is elected.

A leader change is typically triggered by the original leader
failing or terminating, which has the risk of also failing (some of) the
in-flight ProduceRequests to that broker. To recover the producer to a
consistent state it will not send any ProduceRequests for these partitions to
the new leader broker until all responses for any outstanding ProduceRequests
to the previous partition leader has been received, or these requests have
timed out.
This drain may take up to `min(socket.timeout.ms, message.timeout.ms)`.
If the connection to the previous broker goes down the outstanding requests
are failed immediately.


#### Error handling

Background:
The error handling for the Idempotent Producer, as initially proposed
in the [EOS design document](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8),
missed some corner cases which are now being addressed in [KIP-360](https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer).
There were some intermediate fixes and workarounds prior to KIP-360 that proved
to be incomplete and made the error handling in the client overly complex.
With the benefit of hindsight the librdkafka implementation will attempt
to provide correctness from the lessons learned in the Java client and
provide stricter and less complex error handling.

The follow sections describe librdkafka's handling of the
Idempotent Producer specific errors that may be returned by the broker.


##### RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER

This error is returned by the broker when the sequence number in the
ProduceRequest is larger than the expected next sequence
for the given PID+Epoch+Partition (last BaseSeq + msgcount + 1).
Note: sequence 0 is always accepted.

If the failed request is the head-of-line (next expected sequence to be acked)
it indicates desynchronization between the client and broker:
the client thinks the sequence number is correct but the broker disagrees.
There is no way for the client to recover from this scenario without
risking message loss or duplication, and it is not safe for the
application to manually retry messages.
A fatal error (`RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER`) is raised.

When the request is not head-of-line the previous request failed
(for any reason), which means the messages in the current request
can be retried after waiting for all outstanding requests for this
partition to drain and then reset the Producer ID and start over.


**Java Producer behaviour**:
Fail the batch, reset the pid, and then continue producing
(and retrying sub-sequent) messages. This will lead to gaps
in the message series.



##### RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER

Returned by broker when the request's base sequence number is
less than the expected sequence number (which is the last written
sequence + msgcount).
Note: sequence 0 is always accepted.

This error is typically benign and occurs upon retrying a previously successful
send that was not acknowledged.

The messages will be considered successfully produced but will have neither
timestamp or offset set.


**Java Producer behaviour:**
Treats the message as successfully delivered.


##### RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID

Returned by broker when the PID+Epoch is unknown, which may occur when
the PID's state has expired (due to topic retention, DeleteRecords,
or compaction).

The Java producer added quite a bit of error handling for this case,
extending the ProduceRequest protocol to return the logStartOffset
to give the producer a chance to differentiate between an actual
UNKNOWN_PRODUCER_ID or topic retention having deleted the last
message for this producer (effectively voiding the Producer ID cache).
This workaround proved to be error prone (see explanation in KIP-360)
when the partition leader changed.

KIP-360 suggests removing this error checking in favour of failing fast,
librdkafka follows suite.


If the response is for the first ProduceRequest in-flight
and there are no messages waiting to be retried nor any ProduceRequests
unaccounted for, then the error is ignored and the epoch is incremented,
this is likely to happen for an idle producer who's last written
message has been deleted from the log, and thus its PID state.
Otherwise the producer raises a fatal error
(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID) since the delivery guarantees can't
be satisfied.


**Java Producer behaviour:**
Retries the send in some cases (but KIP-360 will change this).
Not a fatal error in any case.


##### Standard errors

All the standard Produce errors are handled in the usual way,
permanent errors will fail the messages in the batch, while
temporary errors will be retried (if retry count permits).

If a permanent error is returned for a batch in a series of in-flight batches,
the sub-sequent batches will fail with
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER since the sequence number of the
failed batched was never written to the topic log and next expected sequence
thus not incremented on the broker.

A fatal error (RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE) is raised to satisfy
the gap-less guarantee (if `enable.gapless.guarantee` is set) by failing all
queued messages.


##### Message persistence status

To help the application decide what to do in these error cases, a new
per-message API is introduced, `rd_kafka_message_status()`,
which returns one of the following values:

 * `RD_KAFKA_MSG_STATUS_NOT_PERSISTED` - the message has never
   been transmitted to the broker, or failed with an error indicating
   it was not written to the log.
   Application retry will risk ordering, but not duplication.
 * `RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED` - the message was transmitted
   to the broker, but no acknowledgement was received.
   Application retry will risk ordering and duplication.
 * `RD_KAFKA_MSG_STATUS_PERSISTED` - the message was written to the log by
   the broker and fully acknowledged.
   No reason for application to retry.

This method should be called by the application on delivery report error.


### Transactional Producer


#### Error handling

Using the transactional producer simplifies error handling compared to the
standard or idempotent producer, a transactional application will only need
to care about these different types of errors:

 * Retriable errors - the operation failed due to temporary problems,
   such as network timeouts, the operation may be safely retried.
   Use `rd_kafka_error_is_retriable()` to distinguish this case.
 * Abortable errors - if any of the transactional APIs return a non-fatal
   error code the current transaction has failed and the application
   must call `rd_kafka_abort_transaction()`, rewind its input to the
   point before the current transaction started, and attempt a new transaction
   by calling `rd_kafka_begin_transaction()`, etc.
   Use `rd_kafka_error_txn_requires_abort()` to distinguish this case.
 * Fatal errors - the application must cease operations and destroy the
   producer instance.
   Use `rd_kafka_error_is_fatal()` to distinguish this case.
 * For all other errors returned from the transactional API: the current
   recommendation is to treat any error that has neither retriable, abortable,
   or fatal set, as a fatal error.

While the application should log the actual fatal or abortable errors, there
is no need for the application to handle the underlying errors specifically.



#### Old producer fencing

If a new transactional producer instance is started with the same
`transactional.id`, any previous still running producer
instance will be fenced off at the next produce, commit or abort attempt, by
raising a fatal error with the error code set to
`RD_KAFKA_RESP_ERR__FENCED`.


#### Configuration considerations

To make sure messages time out (in case of connectivity problems, etc) within
the transaction, the `message.timeout.ms` configuration property must be
set lower than the `transaction.timeout.ms`, this is enforced when
creating the producer instance.
If `message.timeout.ms` is not explicitly configured it will be adjusted
automatically.




### Exactly Once Semantics (EOS) and transactions

librdkafka supports Exactly One Semantics (EOS) as defined in [KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging).
For more on the use of transactions, see [Transactions in Apache Kafka](https://www.confluent.io/blog/transactions-apache-kafka/).

See [examples/transactions.c](examples/transactions.c) for an example
transactional EOS application.

**Warning**
If the broker version is older than Apache Kafka 2.5.0 then one transactional
producer instance per consumed input partition is required.
For 2.5.0 and later a single producer instance may be used regardless of
the number of input partitions.
See KIP-447 for more information.


## Usage

### Documentation

The librdkafka API is documented in the [`rdkafka.h`](src/rdkafka.h)
header file, the configuration properties are documented in
[`CONFIGURATION.md`](CONFIGURATION.md)

### Initialization

The application needs to instantiate a top-level object `rd_kafka_t` which is
the base container, providing global configuration and shared state.
It is created by calling `rd_kafka_new()`.

It also needs to instantiate one or more topics (`rd_kafka_topic_t`) to be used
for producing to or consuming from. The topic object holds topic-specific
configuration and will be internally populated with a mapping of all available
partitions and their leader brokers.
It is created by calling `rd_kafka_topic_new()`.

Both `rd_kafka_t` and `rd_kafka_topic_t` comes with a configuration API which
is optional.
Not using the API will cause librdkafka to use its default values which are
documented in [`CONFIGURATION.md`](CONFIGURATION.md).

**Note**: An application may create multiple `rd_kafka_t` objects and
	they share no state.

**Note**: An `rd_kafka_topic_t` object may only be used with the `rd_kafka_t`
	object it was created from.



### Configuration

To ease integration with the official Apache Kafka software and lower
the learning curve, librdkafka implements identical configuration
properties as found in the official clients of Apache Kafka.

Configuration is applied prior to object creation using the
`rd_kafka_conf_set()` and `rd_kafka_topic_conf_set()` APIs.

**Note**: The `rd_kafka.._conf_t` objects are not reusable after they have been
	passed to `rd_kafka.._new()`.
	The application does not need to free any config resources after a
	`rd_kafka.._new()` call.

#### Example

```c
    rd_kafka_conf_t *conf;
    rd_kafka_conf_res_t res;
    rd_kafka_t *rk;
    char errstr[512];

    conf = rd_kafka_conf_new();

    res = rd_kafka_conf_set(conf, "compression.codec", "snappy",
                            errstr, sizeof(errstr));
    if (res != RD_KAFKA_CONF_OK)
        fail("%s\n", errstr);

    res = rd_kafka_conf_set(conf, "batch.num.messages", "100",
                            errstr, sizeof(errstr));
    if (res != RD_KAFKA_CONF_OK)
        fail("%s\n", errstr);

    rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        rd_kafka_conf_destroy(rk);
        fail("Failed to create producer: %s\n", errstr);
    }

    /* Note: librdkafka takes ownership of the conf object on success */
```

Configuration properties may be set in any order (except for interceptors) and
may be overwritten before being passed to `rd_kafka_new()`.
`rd_kafka_new()` will verify that the passed configuration is consistent
and will fail and return an error if incompatible configuration properties
are detected. It will also emit log warnings for deprecated and problematic
configuration properties.


### Termination

librdkafka is asynchronous in its nature and performs most operation in its
background threads.

Calling the librdkafka handle destructor tells the librdkafka background
threads to finalize their work, close network connections, clean up, etc, and
may thus take some time. The destructor (`rd_kafka_destroy()`) will block
until all background threads have terminated.

If the destructor blocks indefinitely it typically means there is an outstanding
object reference, such as a message or topic object, that was not destroyed
prior to destroying the client handle.

All objects except for the handle (C: `rd_kafka_t`,
C++: `Consumer,KafkaConsumer,Producer`), such as topic objects, messages,
`topic_partition_t`, `TopicPartition`, events, etc, **MUST** be
destroyed/deleted prior to destroying or closing the handle.

For C, make sure the following objects are destroyed prior to calling
`rd_kafka_consumer_close()` and `rd_kafka_destroy()`:
 * `rd_kafka_message_t`
 * `rd_kafka_topic_t`
 * `rd_kafka_topic_partition_t`
 * `rd_kafka_topic_partition_list_t`
 * `rd_kafka_event_t`
 * `rd_kafka_queue_t`

For C++ make sure the following objects are deleted prior to
calling `KafkaConsumer::close()` and delete on the Consumer, KafkaConsumer or
Producer handle:
 * `Message`
 * `Topic`
 * `TopicPartition`
 * `Event`
 * `Queue`


#### High-level KafkaConsumer

Proper termination sequence for the high-level KafkaConsumer is:
```c
     /* 1) Leave the consumer group, commit final offsets, etc. */
     rd_kafka_consumer_close(rk);

     /* 2) Destroy handle object */
     rd_kafka_destroy(rk);
```

**NOTE**: There is no need to unsubscribe prior to calling `rd_kafka_consumer_close()`.

**NOTE**: Any topic objects created must be destroyed prior to rd_kafka_destroy()

Effects of not doing the above, for:
 1. Final offsets are not committed and the consumer will not actively leave
    the group, it will be kicked out of the group after the `session.timeout.ms`
    expires. It is okay to omit the `rd_kafka_consumer_close()` call in case
    the application does not want to wait for the blocking close call.
 2. librdkafka will continue to operate on the handle. Actual memory leaks.


#### Producer

The proper termination sequence for Producers is:

```c
     /* 1) Make sure all outstanding requests are transmitted and handled. */
     rd_kafka_flush(rk, 60*1000); /* One minute timeout */

     /* 2) Destroy the topic and handle objects */
     rd_kafka_topic_destroy(rkt);  /* Repeat for all topic objects held */
     rd_kafka_destroy(rk);
```

Effects of not doing the above, for:
 1. Messages in-queue or in-flight will be dropped.
 2. librdkafka will continue to operate on the handle. Actual memory leaks.


#### Admin API client

Unlike the Java Admin client, the Admin APIs in librdkafka are available
on any type of client instance and can be used in combination with the
client type's main functionality, e.g., it is perfectly fine to call
`CreateTopics()` in your running producer, or `DeleteRecords()` in your
consumer.

If you need a client instance to only perform Admin API operations the
recommendation is to create a producer instance since it requires less
configuration (no `group.id`) than the consumer and is generally more cost
efficient.
We do recommend that you set `allow.auto.create.topics=false` to avoid
topic metadata lookups to unexpectedly have the broker create topics.



#### Speeding up termination
To speed up the termination of librdkafka an application can set a
termination signal that will be used internally by librdkafka to quickly
cancel any outstanding I/O waits.
Make sure you block this signal in your application.

```c
   char tmp[16];
   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  /* Or whatever signal you decide */
   rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr));
```


### Threads and callbacks

librdkafka uses multiple threads internally to fully utilize modern hardware.
The API is completely thread-safe and the calling application may call any
of the API functions from any of its own threads at any time.

A poll-based API is used to provide signaling back to the application,
the application should call rd_kafka_poll() at regular intervals.
The poll API will call the following configured callbacks (optional):

  * `dr_msg_cb` - Message delivery report callback - signals that a message has
    been delivered or failed delivery, allowing the application to take action
    and to release any application resources used in the message.
  * `error_cb` - Error callback - signals an error. These errors are usually of
    an informational nature, i.e., failure to connect to a broker, and the
    application usually does not need to take any action.
    The type of error is passed as a rd_kafka_resp_err_t enum value,
    including both remote broker errors as well as local failures.
    An application typically does not have to perform any action when
    an error is raised through the error callback, the client will
    automatically try to recover from all errors, given that the
    client and cluster is correctly configured.
    In some specific cases a fatal error may occur which will render
    the client more or less inoperable for further use:
    if the error code in the error callback is set to
    `RD_KAFKA_RESP_ERR__FATAL` the application should retrieve the
    underlying fatal error and reason using the `rd_kafka_fatal_error()` call,
    and then begin terminating the instance.
    The Event API's EVENT_ERROR has a `rd_kafka_event_error_is_fatal()`
    function, and the C++ EventCb has a `fatal()` method, to help the
    application determine if an error is fatal or not.
  * `stats_cb` - Statistics callback - triggered if `statistics.interval.ms`
    is configured to a non-zero value, emitting metrics and internal state
    in JSON format, see [STATISTICS.md].
  * `throttle_cb` - Throttle callback - triggered whenever a broker has
    throttled (delayed) a request.

These callbacks will also be triggered by `rd_kafka_flush()`,
`rd_kafka_consumer_poll()`, and any other functions that serve queues.


Optional callbacks not triggered by poll, these may be called spontaneously
from any thread at any time:

  * `log_cb` - Logging callback - allows the application to output log messages
    generated by librdkafka.
  * `partitioner_cb` - Partitioner callback - application provided message partitioner.
    The partitioner may be called in any thread at any time, it may be
    called multiple times for the same key.
    Partitioner function contraints:
      - MUST NOT call any rd_kafka_*() functions
      - MUST NOT block or execute for prolonged periods of time.
      - MUST return a value between 0 and partition_cnt-1, or the
        special RD_KAFKA_PARTITION_UA value if partitioning
        could not be performed.



### Brokers

On initialization, librdkafka only needs a partial list of
brokers (at least one), called the bootstrap brokers.
The client will connect to the bootstrap brokers specified by the
`bootstrap.servers` configuration property and query cluster Metadata
information which contains the full list of brokers, topic, partitions and their
leaders in the Kafka cluster.

Broker names are specified as `host[:port]` where the port is optional
(default 9092) and the host is either a resolvable hostname or an IPv4 or IPv6
address.
If host resolves to multiple addresses librdkafka will round-robin the
addresses for each connection attempt.
A DNS record containing all broker address can thus be used to provide a
reliable bootstrap broker.


#### SSL

If the client is to connect to a broker's SSL endpoints/listeners the client
needs to be configured with `security.protocol=SSL` for just SSL transport or
`security.protocol=SASL_SSL` for SASL authentication and SSL transport.
The client will try to verify the broker's certificate by checking the
CA root certificates, if the broker's certificate can't be verified
the connection is closed (and retried). This is to protect the client
from connecting to rogue brokers.

The CA root certificate defaults are system specific:
 * On Linux, Mac OSX, and other Unix-like system the OpenSSL default
   CA path will be used, also called the OPENSSLDIR,  which is typically
   `/etc/ssl/certs` (on Linux, typcially in the `ca-certificates` package) and
   `/usr/local/etc/openssl` on Mac OSX (Homebrew).
 * On Windows the Root certificate store is used, unless
   `ssl.ca.certificate.stores` is configured in which case certificates are
   read from the specified stores.
 * If OpenSSL is linked statically, librdkafka will set the default CA
   location to the first of a series of probed paths (see below).

If the system-provided default CA root certificates are not sufficient to
verify the broker's certificate, such as when a self-signed certificate
or a local CA authority is used, the CA certificate must be specified
explicitly so that the client can find it.
This can be done either by providing a PEM file (e.g., `cacert.pem`)
as the `ssl.ca.location` configuration property, or by passing an in-memory
PEM, X.509/DER or PKCS#12 certificate to `rd_kafka_conf_set_ssl_cert()`.

It is also possible to disable broker certificate verification completely
by setting `enable.ssl.certificate.verification=false`, but this is not
recommended since it allows for rogue brokers and man-in-the-middle attacks,
and should only be used for testing and troubleshooting purposes.

CA location probe paths (see [rdkafka_ssl.c](src/rdkafka_ssl.c) for full list)
used when OpenSSL is statically linked:

    "/etc/pki/tls/certs/ca-bundle.crt",
    "/etc/ssl/certs/ca-bundle.crt",
    "/etc/pki/tls/certs/ca-bundle.trust.crt",
    "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
    "/etc/ssl/ca-bundle.pem",
    "/etc/pki/tls/cacert.pem",
    "/etc/ssl/cert.pem",
    "/etc/ssl/cacert.pem",
    "/etc/certs/ca-certificates.crt",
    "/etc/ssl/certs/ca-certificates.crt",
    "/etc/ssl/certs",
    "/usr/local/etc/ssl/cert.pem",
    "/usr/local/etc/ssl/cacert.pem",
    "/usr/local/etc/ssl/certs/cert.pem",
    "/usr/local/etc/ssl/certs/cacert.pem",
    etc..


On **Windows** the Root certificate store is read by default, but any number
of certificate stores can be read by setting the `ssl.ca.certificate.stores`
configuration property to a comma-separated list of certificate store names.
The predefined system store names are:

 * `MY` - User certificates
 * `Root` - System CA certificates (default)
 * `CA` - Intermediate CA certificates
 * `Trust` - Trusted publishers

For example, to read both intermediate and root CAs, set
`ssl.ca.certificate.stores=CA,Root`.


#### OAUTHBEARER with support for OIDC

OAUTHBEARER with OIDC provides a method for the client to authenticate to the
Kafka cluster by requesting an authentication token from an issuing server
and passing the retrieved token to brokers during connection setup.

To use this authentication method the client needs to be configured as follows:

  * `security.protocol` - set to `SASL_SSL` or `SASL_PLAINTEXT`.
  * `sasl.mechanism` - set to `OAUTHBEARER`.
  * `sasl.oauthbearer.method` - set to `OIDC`.
  * `sasl.oauthbearer.token.endpoint.url` - OAUTH issuer token
     endpoint HTTP(S) URI used to retrieve the token.
  * `sasl.oauthbearer.client.id` - public identifier for the application.
    It must be unique across all clients that the authorization server handles.
  * `sasl.oauthbearer.client.secret` - secret known only to the
    application and the authorization server.
    This should be a sufficiently random string that is not guessable.
  * `sasl.oauthbearer.scope` - clients use this to specify the scope of the
    access request to the broker.
  * `sasl.oauthbearer.extensions` - (optional) additional information to be
    provided to the broker. A comma-separated list of key=value pairs.
    For example:
    `supportFeatureX=true,organizationId=sales-emea`


#### Sparse connections

The client will only connect to brokers it needs to communicate with, and
only when necessary.

Examples of needed broker connections are:

 * leaders for partitions being consumed from
 * leaders for partitions being produced to
 * consumer group coordinator broker
 * cluster controller for Admin API operations


##### Random broker selection

When there is no broker connection and a connection to any broker
is needed, such as on startup to retrieve metadata, the client randomly selects
a broker from its list of brokers, which includes both the configured bootstrap
brokers (including brokers manually added with `rd_kafka_brokers_add()`), as
well as the brokers discovered from cluster metadata.
Brokers with no prior connection attempt are tried first.

If there is already an available broker connection to any broker it is used,
rather than connecting to a new one.

The random broker selection and connection scheduling is triggered when:
 * bootstrap servers are configured (`rd_kafka_new()`)
 * brokers are manually added (`rd_kafka_brokers_add()`).
 * a consumer group coordinator needs to be found.
 * acquiring a ProducerID for the Idempotent Producer.
 * cluster or topic metadata is being refreshed.

A single connection attempt will be performed, and the broker will
return to an idle INIT state on failure to connect.

The random broker selection is rate-limited to:
10 < `reconnect.backoff.ms`/2 < 1000 milliseconds.

**Note**: The broker connection will be maintained until it is closed
          by the broker (idle connection reaper).

##### Persistent broker connections

While the random broker selection is useful for one-off queries, there
is need for the client to maintain persistent connections to certain brokers:
 * Consumer: the group coordinator.
 * Consumer: partition leader for topics being fetched from.
 * Producer: partition leader for topics being produced to.

These dependencies are discovered and maintained automatically, marking
matching brokers as persistent, which will make the client maintain connections
to these brokers at all times, reconnecting as necessary.


#### Connection close

A broker connection may be closed by the broker, intermediary network gear,
due to network errors, timeouts, etc.
When a broker connection is closed, librdkafka will back off the next reconnect
attempt (to the given broker) for `reconnect.backoff.ms` -25% to +50% jitter,
this value is increased exponentially for each connect attempt until
`reconnect.backoff.max.ms` is reached, at which time the value is reset
to `reconnect.backoff.ms`.

The broker will disconnect clients that have not sent any protocol requests
within `connections.max.idle.ms` (broker configuration propertion, defaults
to 10 minutes), but there is no fool proof way for the client to know that it
was a deliberate close by the broker and not an error. To avoid logging these
deliberate idle disconnects as errors the client employs some logic to try to
classify a disconnect as an idle disconnect if no requests have been sent in
the last `socket.timeout.ms` or there are no outstanding, or
queued, requests waiting to be sent. In this case the standard "Disconnect"
error log is silenced (will only be seen with debug enabled).

Otherwise, if a connection is closed while there are requests in-flight
the logging level will be LOG_WARNING (4), else LOG_INFO (6).

`log.connection.close=false` may be used to silence all disconnect logs,
but it is recommended to instead rely on the above heuristics.


#### Fetch From Follower

librdkafka supports consuming messages from follower replicas
([KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)).
This is enabled by setting the `client.rack` configuration property which
corresponds to `broker.rack` on the broker. The actual assignment of
consumers to replicas is determined by the configured `replica.selector.class`
on the broker.


### Logging

#### Debug contexts

Extensive debugging of librdkafka can be enabled by setting the
`debug` configuration property to a CSV string of debug contexts:

Debug context | Type     | Description
--------------|----------|----------------------
generic       | *        | General client instance level debugging. Includes initialization and termination debugging.
broker        | *        | Broker and connection state debugging.
topic         | *        | Topic and partition state debugging. Includes leader changes.
metadata      | *        | Cluster and topic metadata retrieval debugging.
feature       | *        | Kafka protocol feature support as negotiated with the broker.
queue         | producer | Message queue debugging.
msg           | *        | Message debugging. Includes information about batching, compression, sizes, etc.
protocol      | *        | Kafka protocol request/response debugging. Includes latency (rtt) printouts.
cgrp          | consumer | Low-level consumer group state debugging.
security      | *        | Security and authentication debugging.
fetch         | consumer | Consumer message fetch debugging. Includes decision when and why messages are fetched.
interceptor   | *        | Interceptor interface debugging.
plugin        | *        | Plugin loading debugging.
consumer      | consumer | High-level consumer debugging.
admin         | admin    | Admin API debugging.
eos           | producer | Idempotent Producer debugging.
mock          | *        | Mock cluster functionality debugging.
assignor      | consumer | Detailed consumer group partition assignor debugging.
conf          | *        | Display set configuration properties on startup.
all           | *        | All of the above.


Suggested debugging settings for troubleshooting:

Problem space          | Type     | Debug setting
-----------------------|----------|-------------------
Producer not delivering messages to broker | producer | `broker,topic,msg`
Consumer not fetching messages | consumer | Start with `consumer`, or use `cgrp,fetch` for detailed information.
Consumer starts reading at unexpected offset | consumer | `consumer` or `cgrp,fetch`
Authentication or connectivity issues | * | `broker,auth`
Protocol handling or latency | * | `broker,protocol`
Topic leader and state | * | `topic,metadata`




### Feature discovery

Apache Kafka broker version 0.10.0 added support for the ApiVersionRequest API
which allows a client to query a broker for its range of supported API versions.

librdkafka supports this functionality and will query each broker on connect
for this information (if `api.version.request=true`) and use it to enable or disable
various protocol features, such as MessageVersion 1 (timestamps), KafkaConsumer, etc.

If the broker fails to respond to the ApiVersionRequest librdkafka will
assume the broker is too old to support the API and fall back to an older
broker version's API. These fallback versions are hardcoded in librdkafka
and is controlled by the `broker.version.fallback` configuration property.



### Producer API

After setting up the `rd_kafka_t` object with type `RD_KAFKA_PRODUCER` and one
or more `rd_kafka_topic_t` objects librdkafka is ready for accepting messages
to be produced and sent to brokers.

The `rd_kafka_produce()` function takes the following arguments:

  * `rkt` - the topic to produce to, previously created with
	  `rd_kafka_topic_new()`
  * `partition` - partition to produce to. If this is set to
	  `RD_KAFKA_PARTITION_UA` (UnAssigned) then the configured partitioner
		  function will be used to select a target partition.
  * `msgflags` - 0, or one of:
	  * `RD_KAFKA_MSG_F_COPY` - librdkafka will immediately make a copy of
	    the payload. Use this when the payload is in non-persistent
	    memory, such as the stack.
	  * `RD_KAFKA_MSG_F_FREE` - let librdkafka free the payload using
	    `free(3)` when it is done with it.

	These two flags are mutually exclusive and neither need to be set in
	which case the payload is neither copied nor freed by librdkafka.

	If `RD_KAFKA_MSG_F_COPY` flag is not set no data copying will be
	performed and librdkafka will hold on the payload pointer until
	the message has been delivered or fails.
	The delivery report callback will be called when librdkafka is done
	with the message to let the application regain ownership of the
	payload memory.
	The application must not free the payload in the delivery report
	callback if `RD_KAFKA_MSG_F_FREE is set`.
  * `payload`,`len` - the message payload
  * `key`,`keylen` - an optional message key which can be used for partitioning.
	  It will be passed to the topic partitioner callback, if any, and
	  will be attached to the message when sending to the broker.
  * `msg_opaque` - an optional application-provided per-message opaque pointer
	  that will be provided in the message delivery callback to let
	  the application reference a specific message.


`rd_kafka_produce()` is a non-blocking API, it will enqueue the message
on an internal queue and return immediately.
If the new message would cause the internal queue to exceed
`queue.buffering.max.messages` or `queue.buffering.max.kbytes`
configuration properties, `rd_kafka_produce()` returns -1 and sets errno
to `ENOBUFS` and last_error to `RD_KAFKA_RESP_ERR__QUEUE_FULL`, thus
providing a backpressure mechanism.


`rd_kafka_producev()` provides an alternative produce API that does not
require a topic `rkt` object and also provides support for extended
message fields, such as timestamp and headers.


**Note**: See `examples/rdkafka_performance.c` for a producer implementation.


### Simple Consumer API (legacy)

NOTE: For the high-level KafkaConsumer interface see rd_kafka_subscribe (rdkafka.h) or KafkaConsumer (rdkafkacpp.h)

The consumer API is a bit more stateful than the producer API.
After creating `rd_kafka_t` with type `RD_KAFKA_CONSUMER` and
`rd_kafka_topic_t` instances the application must also start the consumer
for a given partition by calling `rd_kafka_consume_start()`.

`rd_kafka_consume_start()` arguments:

  * `rkt` - the topic to start consuming from, previously created with
    	  `rd_kafka_topic_new()`.
  * `partition` - partition to consume from.
  * `offset` - message offset to start consuming from. This may either be an
    	     absolute message offset or one of the three special offsets:
	     `RD_KAFKA_OFFSET_BEGINNING` to start consuming from the beginning
	     of the partition's queue (oldest message), or
	     `RD_KAFKA_OFFSET_END` to start consuming at the next message to be
	     produced to the partition, or
	     `RD_KAFKA_OFFSET_STORED` to use the offset store.

After a topic+partition consumer has been started librdkafka will attempt
to keep `queued.min.messages` messages in the local queue by repeatedly
fetching batches of messages from the broker. librdkafka will fetch all
consumed partitions for which that broker is a leader, through a single
request.

This local message queue is then served to the application through three
different consume APIs:

  * `rd_kafka_consume()` - consumes a single message
  * `rd_kafka_consume_batch()` - consumes one or more messages
  * `rd_kafka_consume_callback()` - consumes all messages in the local
    queue and calls a callback function for each one.

These three APIs are listed above the ascending order of performance,
`rd_kafka_consume()` being the slowest and `rd_kafka_consume_callback()` being
the fastest. The different consume variants are provided to cater for different
application needs.

A consumed message, as provided or returned by each of the consume functions,
is represented by the `rd_kafka_message_t` type.

`rd_kafka_message_t` members:

  * `err` - Error signaling back to the application. If this field is non-zero
    	  the `payload` field should be considered an error message and
	  `err` is an error code (`rd_kafka_resp_err_t`).
	  If `err` is zero then the message is a proper fetched message
	  and `payload` et.al contains message payload data.
  * `rkt`,`partition` - Topic and partition for this message or error.
  * `payload`,`len` - Message payload data or error message (err!=0).
  * `key`,`key_len` - Optional message key as specified by the producer
  * `offset` - Message offset

Both the `payload` and `key` memory, as well as the message as a whole, is
owned by librdkafka and must not be used after an `rd_kafka_message_destroy()`
call. librdkafka will share the same messageset receive buffer memory for all
message payloads of that messageset to avoid excessive copying which means
that if the application decides to hang on to a single `rd_kafka_message_t`
it will hinder the backing memory to be released for all other messages
from the same messageset.

When the application is done consuming messages from a topic+partition it
should call `rd_kafka_consume_stop()` to stop the consumer. This will also
purge any messages currently in the local queue.


**Note**: See `examples/rdkafka_performance.c` for a consumer implementation.


#### Offset management

Broker based offset management is available for broker version >= 0.9.0
in conjunction with using the high-level KafkaConsumer interface (see
rdkafka.h or rdkafkacpp.h)

Offset management is also available through a deprecated local offset file,
where the offset is periodically written to a local file for each
topic+partition according to the following topic configuration properties:

  * `enable.auto.commit`
  * `auto.commit.interval.ms`
  * `offset.store.path`
  * `offset.store.sync.interval.ms`

The legacy `auto.commit.enable` topic configuration property is only to be used
with the legacy low-level consumer.
Use `enable.auto.commit` with the modern KafkaConsumer.


##### Auto offset commit

The consumer will automatically commit offsets every `auto.commit.interval.ms`
when `enable.auto.commit` is enabled (default).

Offsets to be committed are kept in a local in-memory offset store,
this offset store is updated by `consumer_poll()` (et.al) to
store the offset of the last message passed to the application
(per topic+partition).

##### At-least-once processing
Since auto commits are performed in a background thread this may result in
the offset for the latest message being committed before the application has
finished processing the message. If the application was to crash or exit
prior to finishing processing, and the offset had been auto committed,
the next incarnation of the consumer application would start at the next
message, effectively missing the message that was processed when the
application crashed.
To avoid this scenario the application can disable the automatic
offset **store** by setting `enable.auto.offset.store` to false
and manually **storing** offsets after processing by calling
`rd_kafka_offsets_store()`.
This gives an application fine-grained control on when a message
is eligible for committing without having to perform the commit itself.
`enable.auto.commit` should be set to true when using manual offset storing.
The latest stored offset will be automatically committed every
`auto.commit.interval.ms`.

**Note**: Only greater offsets are committed, e.g., if the latest committed
          offset was 10 and the application performs an offsets_store()
          with offset 9, that offset will not be committed.


##### Auto offset reset

The consumer will by default try to acquire the last committed offsets for
each topic+partition it is assigned using its configured `group.id`.
If there is no committed offset available, or the consumer is unable to
fetch the committed offsets, the policy of `auto.offset.reset` will kick in.
This configuration property may be set to one the following values:

 * `earliest` - start consuming the earliest message of the partition.
 * `latest` - start consuming the next message to be produced to the partition.
 * `error` - don't start consuming but isntead raise a consumer error
              with error-code `RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET` for
              the topic+partition. This allows the application to decide what
              to do in case there is no committed start offset.


### Consumer groups

Broker based consumer groups (requires Apache Kafka broker >=0.9) are supported,
see KafkaConsumer in rdkafka.h or rdkafkacpp.h

The following diagram visualizes the high-level balanced consumer group state
flow and synchronization between the application, librdkafka consumer,
group coordinator, and partition leader(s).

![Consumer group state diagram](src/librdkafka_cgrp_synch.png)


#### Static consumer groups

By default Kafka consumers are rebalanced each time a new consumer joins
the group or an existing member leaves. This is what is known as a dynamic
membership. Apache Kafka >= 2.3.0 introduces static membership.
Unlike dynamic membership, static members can leave and rejoin a group
within the `session.timeout.ms` without triggering a rebalance, retaining
their existing partitions assignment.

To enable static group membership configure each consumer instance
in the group with a unique `group.instance.id`.

Consumers with `group.instance.id` set will not send a leave group request on
close - session timeout, change of subscription, or a new group member joining
the group, are the only mechanisms that will trigger a group rebalance for
static consumer groups.

If a new consumer joins the group with same `group.instance.id` as an
existing consumer, the existing consumer will be fenced and raise a fatal error.
The fatal error is propagated as a consumer error with error code
`RD_KAFKA_RESP_ERR__FATAL`, use `rd_kafka_fatal_error()` to retrieve
the original fatal error code and reason.

To read more about static group membership, see [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances).


### Note on Batch consume APIs

Using multiple instances of `rd_kafka_consume_batch()` and/or `rd_kafka_consume_batch_queue()`
APIs concurrently is not thread safe and will result in undefined behaviour. We strongly recommend a
single instance of these APIs to be used at a given time. This usecase is not supported and will not
be supported in future as well. There are different ways to achieve similar result:

* Create multiple consumers reading from different partitions. In this way, different partitions
  are read by different consumers and each consumer can run its own batch call.
* Create multiple consumers in same consumer group. In this way, partitions are assigned to
  different consumers and each consumer can run its own batch call.
* Create single consumer and read data from single batch call and process this data in parallel.

Even after this if you feel the need to use multiple instances of these APIs for the same consumer
concurrently, then don't use any of the **seek**, **pause**, **resume** or **rebalancing** operation
in conjunction with these API calls. For **rebalancing** operation to work in sequencial manner, please
set `rebalance_cb` configuration property (refer [examples/rdkafka_complex_consumer_example.c](examples/rdkafka_complex_consumer_example.c)
for the help with the usage) for the consumer.


### Topics

#### Unknown or unauthorized topics

If a consumer application subscribes to non-existent or unauthorized topics
a consumer error will be propagated for each unavailable topic with the
error code set to either `RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART` or a
broker-specific error code, such as
`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`.

As the topic metadata is refreshed every `topic.metadata.refresh.interval.ms`
the unavailable topics are re-checked for availability, but the same error
will not be raised again for the same topic.

If a consumer has Describe (ACL) permissions for a topic but not Read it will
be able to join a consumer group and start consuming the topic, but the Fetch
requests to retrieve messages from the broker will fail with
`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`.
This error will be raised to the application once per partition and
assign()/seek() and the fetcher will back off the next fetch 10 times longer than
the `fetch.error.backoff.ms` (but at least 1 second).
It is recommended that the application takes appropriate action when this
occurs, for instance adjusting its subscription or assignment to exclude the
unauthorized topic.


#### Topic metadata propagation for newly created topics

Due to the asynchronous nature of topic creation in Apache Kafka it may
take some time for a newly created topic to be known by all brokers in the
cluster.
If a client tries to use a topic after topic creation but before the topic
has been fully propagated in the cluster it will seem as if the topic does not
exist which would raise `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` (et.al)
errors to the application.
To avoid these temporary errors being raised, the client will not flag
a topic as non-existent until a propagation time has elapsed, this propagation
defaults to 30 seconds and can be configured with
`topic.metadata.propagation.max.ms`.
The per-topic max propagation time starts ticking as soon as the topic is
referenced (e.g., by produce()).

If messages are produced to unknown topics during the propagation time, the
messages will be queued for later delivery to the broker when the topic
metadata has propagated.
Should the topic propagation time expire without the topic being seen the
produced messages will fail with `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC`.

**Note**: The propagation time will not take affect if a topic is known to
          the client and then deleted, in this case the topic will immediately
          be marked as non-existent and remain non-existent until a topic
          metadata refresh sees the topic again (after the topic has been
          re-created).


#### Topic auto creation

Topic auto creation is supported by librdkafka, if a non-existent topic is
referenced by the client (by produce to, or consuming from, the topic, etc)
the broker will automatically create the topic (with default partition counts
and replication factor) if the broker configuration property
`auto.create.topics.enable=true` is set.

*Note*: A topic that is undergoing automatic creation may be reported as
unavailable, with e.g., `RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART`, during the
time the topic is being created and partition leaders are elected.

While topic auto creation may be useful for producer applications, it is not
particularily valuable for consumer applications since even if the topic
to consume is auto created there is nothing writing messages to the topic.
To avoid consumers automatically creating topics the
`allow.auto.create.topics` consumer configuration property is set to
`false` by default, preventing the consumer to trigger automatic topic
creation on the broker. This requires broker version v0.11.0.0 or later.
The `allow.auto.create.topics` property may be set to `true` to allow
auto topic creation, which also requires `auto.create.topics.enable=true` to
be configured on the broker.



### Metadata

#### < 0.9.3
Previous to the 0.9.3 release librdkafka's metadata handling
was chatty and excessive, which usually isn't a problem in small
to medium-sized clusters, but in large clusters with a large amount
of librdkafka clients the metadata requests could hog broker CPU and bandwidth.

#### > 0.9.3

The remaining Metadata sections describe the current behaviour.

**Note:** "Known topics" in the following section means topics for
          locally created `rd_kafka_topic_t` objects.


#### Query reasons

There are four reasons to query metadata:

 * brokers - update/populate cluster broker list, so the client can
             find and connect to any new brokers added.

 * specific topic - find leader or partition count for specific topic

 * known topics - same, but for all locally known topics.

 * all topics - get topic names for consumer group wildcard subscription
                matching

The above list is sorted so that the sub-sequent entries contain the
information above, e.g., 'known topics' contains enough information to
also satisfy 'specific topic' and 'brokers'.


#### Caching strategy

The prevalent cache timeout is `metadata.max.age.ms`, any cached entry
will remain authoritative for this long or until a relevant broker error
is returned.


 * brokers - eternally cached, the broker list is additative.

 * topics - cached for `metadata.max.age.ms`



### Fatal errors

If an unrecoverable error occurs, a fatal error is triggered in one
or more of the follow ways depending on what APIs the application is utilizing:

 * C: the `error_cb` is triggered with error code `RD_KAFKA_RESP_ERR__FATAL`,
   the application should call `rd_kafka_fatal_error()` to retrieve the
   underlying fatal error code and error string.
 * C: an `RD_KAFKA_EVENT_ERROR` event is triggered and
   `rd_kafka_event_error_is_fatal()` returns true: the fatal error code
   and string are available through `rd_kafka_event_error()`, and `.._string()`.
 * C and C++: any API call may return `RD_KAFKA_RESP_ERR__FATAL`, use
   `rd_kafka_fatal_error()` to retrieve the underlying fatal error code
   and error string.
 * C++: an `EVENT_ERROR` event is triggered and `event.fatal()` returns true:
   the fatal error code and string are available through `event.err()` and
   `event.str()`.


An application may call `rd_kafka_fatal_error()` at any time to check if
a fatal error has been raised.


#### Fatal producer errors

The idempotent producer guarantees of ordering and no duplicates also
requires a way for the client to fail gracefully when these guarantees
can't be satisfied.

If a fatal error has been raised, sub-sequent use of the following API calls
will fail:

 * `rd_kafka_produce()`
 * `rd_kafka_producev()`
 * `rd_kafka_produce_batch()`

The underlying fatal error code will be returned, depending on the error
reporting scheme for each of those APIs.


When a fatal error has occurred the application should call `rd_kafka_flush()`
to wait for all outstanding and queued messages to drain before terminating
the application.
`rd_kafka_purge(RD_KAFKA_PURGE_F_QUEUE)` is automatically called by the client
when a producer fatal error has occurred, messages in-flight are not purged
automatically to allow waiting for the proper acknowledgement from the broker.
The purged messages in queue will fail with error code set to
`RD_KAFKA_RESP_ERR__PURGE_QUEUE`.


#### Fatal consumer errors

A consumer configured for static group membership (`group.instance.id`) may
raise a fatal error if a new consumer instance is started with the same
instance id, causing the existing consumer to be fenced by the new consumer.

This fatal error is propagated on the fenced existing consumer in multiple ways:
 * `error_cb` (if configured) is triggered.
 * `rd_kafka_consumer_poll()` (et.al) will return a message object
   with the `err` field set to `RD_KAFKA_ERR__FATAL`.
 * any sub-sequent calls to state-changing consumer calls will
   return `RD_KAFKA_ERR___FATAL`.
   This includes `rd_kafka_subscribe()`, `rd_kafka_assign()`,
   `rd_kafka_consumer_close()`, `rd_kafka_commit*()`, etc.

The consumer will automatically stop consuming when a fatal error has occurred
and no further subscription, assignment, consumption or offset committing
will be possible. At this point the application should simply destroy the
consumer instance and terminate the application since it has been replaced
by a newer instance.


## Compatibility

### Broker version compatibility

librdkafka supports all released Apache Kafka broker versions since 0.8.0.0.0,
but not all features may be available on all broker versions since some
features rely on newer broker functionality.

**Current defaults:**
 * `api.version.request=true`
 * `broker.version.fallback=0.10.0`
 * `api.version.fallback.ms=0` (never revert to `broker.version.fallback`)

Depending on what broker version you are using, please configure your
librdkafka based client as follows:

#### Broker version >= 0.10.0.0 (or trunk)

For librdkafka >= v1.0.0 there is no need to set any api.version-related
configuration parameters, the defaults are tailored for broker version 0.10.0.0
or later.

For librdkafka < v1.0.0, please specify:
```
api.version.request=true
api.version.fallback.ms=0
```


#### Broker versions 0.9.0.x

```
api.version.request=false
broker.version.fallback=0.9.0.x  (the exact 0.9.0.. version you are using)
```

#### Broker versions 0.8.x.y

```
api.version.request=false
broker.version.fallback=0.8.x.y  (your exact 0.8... broker version)
```

#### Detailed description

Apache Kafka version 0.10.0.0 added support for
[KIP-35](https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version) -
querying the broker for supported API request types and versions -
allowing the client to figure out what features it can use.
But for older broker versions there is no way for the client to reliably know
what protocol features the broker supports.

To alleviate this situation librdkafka has three configuration properties:
 * `api.version.request=true|false` - enables the API version request,
   this requires a >= 0.10.0.0 broker and will cause a disconnect on
   brokers 0.8.x - this disconnect is recognized by librdkafka and on the next
   connection attempt (which is immediate) it will disable the API version
   request and use `broker.version.fallback` as a basis of available features.
   **NOTE**: Due to a bug in broker version 0.9.0.0 & 0.9.0.1 the broker will
   not close the connection when receiving the API version request, instead
   the request will time out in librdkafka after 10 seconds and it will fall
   back to `broker.version.fallback` on the next immediate connection attempt.
 * `broker.version.fallback=X.Y.Z.N` - if the API version request fails
   (if `api.version.request=true`) or API version requests are disabled
   (`api.version.request=false`) then this tells librdkafka what version the
   broker is running and adapts its feature set accordingly.
 * `api.version.fallback.ms=MS` - In the case where `api.version.request=true`
   and the API version request fails, this property dictates for how long
   librdkafka will use `broker.version.fallback` instead of
   `api.version.request=true`. After `MS` has passed the API version request
   will be sent on any new connections made for the broker in question.
   This allows upgrading the Kafka broker to a new version with extended
   feature set without needing to restart or reconfigure the client
   (given that `api.version.request=true`).

*Note: These properties applies per broker.*

The API version query was disabled by default (`api.version.request=false`) in
librdkafka up to and including v0.9.5 due to the afforementioned bug in
broker version 0.9.0.0 & 0.9.0.1, but was changed to `true` in
librdkafka v0.11.0.


### Supported KIPs

The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) supported by librdkafka.


| KIP                                                                      | Kafka release               | Status                                                                                        |
|--------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------------------------------------------------|
| KIP-1 - Stop accepting request.required.acks > 1                         | 0.9.0.0                     | Not enforced on client (due to backwards compat with brokers  <0.8.3)                         |
| KIP-4 - Metadata protocol changes                                        | 0.9.0.0, 0.10.0.0, 0.10.1.0 | Supported                                                                                     |
| KIP-8 - Producer flush()                                                 | 0.9.0.0                     | Supported                                                                                     |
| KIP-12 - SASL Kerberos                                                   | 0.9.0.0                     | Supported (uses SSPI/logged-on-user on Windows, full KRB5 keytabs on Unix)                    |
| KIP-13 - Protocol request throttling (enforced on broker)                | 0.9.0.0                     | Supported                                                                                     |
| KIP-15 - Producer close with timeout                                     | 0.9.0.0                     | Supported (through flush() + destroy())                                                       |
| KIP-19 - Request timeouts                                                | 0.9.0.0                     | Supported                                                                                     |
| KIP-22 - Producer pluggable partitioner                                  | 0.9.0.0                     | Supported (not supported by Go, .NET and Python)                                              |
| KIP-31 - Relative offsets in messagesets                                 | 0.10.0.0                    | Supported                                                                                     |
| KIP-35 - ApiVersionRequest                                               | 0.10.0.0                    | Supported                                                                                     |
| KIP-40 - ListGroups and DescribeGroups                                   | 0.9.0.0                     | Supported                                                                                     |
| KIP-41 - max.poll.records                                                | 0.10.0.0                    | Supported through batch consumption interface (not supported by .NET and Go)                  |
| KIP-42 - Producer and Consumer interceptors                              | 0.10.0.0                    | Supported (not supported by Go, .NET and Python)                                              |
| KIP-43 - SASL PLAIN and handshake                                        | 0.10.0.0                    | Supported                                                                                     |
| KIP-48 - Delegation tokens                                               | 1.1.0                       | Not supported                                                                                 |
| KIP-54 - Sticky partition assignment strategy                            | 0.11.0.0                    | Supported but not available, use KIP-429 instead.                                             |
| KIP-57 - Interoperable LZ4 framing                                       | 0.10.0.0                    | Supported                                                                                     |
| KIP-62 - max.poll.interval and background heartbeats                     | 0.10.1.0                    | Supported                                                                                     |
| KIP-70 - Proper client rebalance event on unsubscribe/subscribe          | 0.10.1.0                    | Supported                                                                                     |
| KIP-74 - max.partition.fetch.bytes                                       | 0.10.1.0                    | Supported                                                                                     |
| KIP-78 - Retrieve Cluster Id                                             | 0.10.1.0                    | Supported (not supported by .NET)                                                             |
| KIP-79 - OffsetsForTimes                                                 | 0.10.1.0                    | Supported                                                                                     |
| KIP-81 - Consumer pre-fetch buffer size                                  | 2.4.0 (WIP)                 | Supported                                                                                     |
| KIP-82 - Record Headers                                                  | 0.11.0.0                    | Supported                                                                                     |
| KIP-84 - SASL SCRAM                                                      | 0.10.2.0                    | Supported                                                                                     |
| KIP-85 - SASL config properties                                          | 0.10.2.0                    | Supported                                                                                     |
| KIP-86 - Configurable SASL callbacks                                     | 2.0.0                       | Not supported                                                                                 |
| KIP-88 - AdminAPI: ListGroupOffsets                                      | 0.10.2.0                    | Supported                                                                                 |
| KIP-91 - Intuitive timeouts in Producer                                  | 2.1.0                       | Supported                                                                                     |
| KIP-92 - Per-partition lag metrics in Consumer                           | 0.10.2.0                    | Supported                                                                                     |
| KIP-97 - Backwards compatibility with older brokers                      | 0.10.2.0                    | Supported                                                                                     |
| KIP-98 - EOS                                                             | 0.11.0.0                    | Supported                                                                                     |
| KIP-102 - Close with timeout in consumer                                 | 0.10.2.0                    | Not supported                                                                                 |
| KIP-107 - AdminAPI: DeleteRecordsBefore                                  | 0.11.0.0                    | Supported                                                                                     |
| KIP-110 - ZStd compression                                               | 2.1.0                       | Supported                                                                                     |
| KIP-117 - AdminClient                                                    | 0.11.0.0                    | Supported                                                                                     |
| KIP-124 - Request rate quotas                                            | 0.11.0.0                    | Partially supported (depending on protocol request)                                           |
| KIP-126 - Producer ensure proper batch size after compression            | 0.11.0.0                    | Supported                                                                                     |
| KIP-133 - AdminAPI: DescribeConfigs and AlterConfigs                     | 0.11.0.0                    | Supported                                                                                     |
| KIP-140 - AdminAPI: ACLs                                                 | 0.11.0.0                    | Supported                                                                                     |
| KIP-144 - Broker reconnect backoff                                       | 0.11.0.0                    | Supported                                                                                     |
| KIP-152 - Improved SASL auth error messages                              | 1.0.0                       | Supported                                                                                     |
| KIP-192 - Cleaner idempotence semantics                                  | 1.0.0                       | Not supported (superceeded by KIP-360)                                                        |
| KIP-195 - AdminAPI: CreatePartitions                                     | 1.0.0                       | Supported                                                                                     |
| KIP-204 - AdminAPI: DeleteRecords                                        | 1.1.0                       | Supported                                                                                     |
| KIP-219 - Client-side throttling                                         | 2.0.0                       | Not supported                                                                                 |
| KIP-222 - AdminAPI: Consumer group operations                            | 2.0.0                       | Supported                                                                                     |
| KIP-223 - Consumer partition lead metric                                 | 2.0.0                       | Not supported                                                                                 |
| KIP-226 - AdminAPI: Dynamic broker config                                | 1.1.0                       | Supported                                                                                     |
| KIP-227 - Consumer Incremental Fetch                                     | 1.1.0                       | Not supported                                                                                 |
| KIP-229 - AdminAPI: DeleteGroups                                         | 1.1.0                       | Supported                                                                                     |
| KIP-235 - DNS alias for secure connections                               | 2.1.0                       | Not supported                                                                                 |
| KIP-249 - AdminAPI: Deletegation Tokens                                  | 2.0.0                       | Not supported                                                                                 |
| KIP-255 - SASL OAUTHBEARER                                               | 2.0.0                       | Supported                                                                                     |
| KIP-266 - Fix indefinite consumer timeouts                               | 2.0.0                       | Supported (bound by session.timeout.ms and max.poll.interval.ms)                              |
| KIP-289 - Consumer group.id default to NULL                              | 2.2.0                       | Supported                                                                                     |
| KIP-294 - SSL endpoint verification                                      | 2.0.0                       | Supported                                                                                     |
| KIP-302 - Use all addresses for resolved broker hostname                 | 2.1.0                       | Supported                                                                                     |
| KIP-320 - Consumer: handle log truncation                                | 2.1.0, 2.2.0                | Supported                                                                                     |
| KIP-322 - DeleteTopics disabled error code                               | 2.1.0                       | Supported                                                                                     |
| KIP-339 - AdminAPI: incrementalAlterConfigs                              | 2.3.0                       | Not supported                                                                                 |
| KIP-341 - Update Sticky partition assignment data                        | 2.3.0                       | Not supported (superceeded by KIP-429)                                                        |
| KIP-342 - Custom SASL OAUTHBEARER extensions                             | 2.1.0                       | Supported                                                                                     |
| KIP-345 - Consumer: Static membership                                    | 2.4.0                       | Supported                                                                                     |
| KIP-357 - AdminAPI: list ACLs per principal                              | 2.1.0                       | Not supported                                                                                 |
| KIP-359 - Producer: use EpochLeaderId                                    | 2.4.0                       | Not supported                                                                                 |
| KIP-360 - Improve handling of unknown Idempotent Producer                | 2.5.0                       | Supported                                                                                     |
| KIP-361 - Consumer: add config to disable auto topic creation            | 2.3.0                       | Supported                                                                                     |
| KIP-368 - SASL periodic reauth                                           | 2.2.0                       | Not supported                                                                                 |
| KIP-369 - Always roundRobin partitioner                                  | 2.4.0                       | Not supported                                                                                 |
| KIP-389 - Consumer group max size                                        | 2.2.0                       | Supported (error is propagated to application, but the consumer does not raise a fatal error) |
| KIP-392 - Allow consumers to fetch from closest replica                  | 2.4.0                       | Supported                                                                                     |
| KIP-394 - Consumer: require member.id in JoinGroupRequest                | 2.2.0                       | Supported                                                                                     |
| KIP-396 - AdminAPI: commit/list offsets                                  | 2.4.0                       | Partially supported (remaining APIs available outside Admin client)                           |
| KIP-412 - AdminAPI: adjust log levels                                    | 2.4.0                       | Not supported                                                                                 |
| KIP-421 - Variables in client config files                               | 2.3.0                       | Not applicable (librdkafka, et.al, does not provide a config file interface, and shouldn't)   |
| KIP-429 - Consumer: incremental rebalance protocol                       | 2.4.0                       | Supported                                                                                     |
| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0                       | Not supported                                                                                 |
| KIP-436 - Start time in stats                                            | 2.3.0                       | Supported                                                                                     |
| KIP-447 - Producer scalability for EOS                                   | 2.5.0                       | Supported                                                                                     |
| KIP-455 - AdminAPI: Replica assignment                                   | 2.4.0 (WIP)                 | Not supported                                                                                 |
| KIP-460 - AdminAPI: electPreferredLeader                                 | 2.4.0                       | Not supported                                                                                 |
| KIP-464 - AdminAPI: defaults for createTopics                            | 2.4.0                       | Supported                                                                                     |
| KIP-467 - Per-message (sort of) error codes in ProduceResponse           | 2.4.0 (WIP)                 | Not supported                                                                                 |
| KIP-480 - Sticky partitioner                                             | 2.4.0                       | Supported                                                                                     |
| KIP-482 - Optional fields in Kafka protocol                              | 2.4.0                       | Partially supported (ApiVersionRequest)                                                       |
| KIP-496 - AdminAPI: delete offsets                                       | 2.4.0                       | Supported                                                                                     |
| KIP-511 - Collect Client's Name and Version                              | 2.4.0                       | Supported                                                                                     |
| KIP-514 - Bounded flush()                                                | 2.4.0                       | Supported                                                                                     |
| KIP-517 - Consumer poll() metrics                                        | 2.4.0                       | Not supported                                                                                 |
| KIP-518 - Allow listing consumer groups per state                        | 2.6.0                       | Supported                                                                                     |
| KIP-519 - Make SSL engine configurable                                   | 2.6.0                       | Supported                                                                                     |
| KIP-525 - Return topic metadata and configs in CreateTopics response     | 2.4.0                       | Not supported                                                                                 |
| KIP-526 - Reduce Producer Metadata Lookups for Large Number of Topics    | 2.5.0                       | Not supported                                                                                 |
| KIP-533 - Add default API timeout to AdminClient                         | 2.5.0                       | Not supported                                                                                 |
| KIP-546 - Add Client Quota APIs to AdminClient                           | 2.6.0                       | Not supported                                                                                 |
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies             | 2.5.0                       | Not supported                                                                                 |
| KIP-568 - Explicit rebalance triggering on the Consumer                  | 2.6.0                       | Not supported                                                                                 |
| KIP-659 - Add metadata to DescribeConfigsResponse                        | 2.6.0                       | Not supported                                                                                 |
| KIP-580 - Exponential backoff for Kafka clients                          | WIP                         | Partially supported                                                                           |
| KIP-584 - Versioning scheme for features                                 | WIP                         | Not supported                                                                                 |
| KIP-588 - Allow producers to recover gracefully from txn timeouts        | 2.8.0 (WIP)                 | Not supported                                                                                 |
| KIP-601 - Configurable socket connection timeout                         | 2.7.0                       | Supported                                                                                     |
| KIP-602 - Use all resolved addresses by default                          | 2.6.0                       | Supported                                                                                     |
| KIP-651 - Support PEM format for SSL certs and keys                      | 2.7.0                       | Supported                                                                                     |
| KIP-654 - Aborted txns with non-flushed msgs should not be fatal         | 2.7.0                       | Supported                                                                                     |
| KIP-735 - Increase default consumer session timeout                      | 3.0.0                       | Supported                                                                                     |
| KIP-768 - SASL/OAUTHBEARER OIDC support                                  | 3.0                         | Supported                                                                                     |




### Supported protocol versions

"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.3.1, while
"librdkafka max" is the maximum ApiVersion supported in the latest
release of librdkafka.


| ApiKey  | Request name        | Kafka max   | librdkafka max          |
| ------- | ------------------- | ----------- | ----------------------- |
| 0       | Produce             | 9           | 7                       |
| 1       | Fetch               | 13          | 11                      |
| 2       | ListOffsets         | 7           | 2                       |
| 3       | Metadata            | 12          | 9                       |
| 8       | OffsetCommit        | 8           | 7                       |
| 9       | OffsetFetch         | 8           | 7                       |
| 10      | FindCoordinator     | 4           | 2                       |
| 11      | JoinGroup           | 9           | 5                       |
| 12      | Heartbeat           | 4           | 3                       |
| 13      | LeaveGroup          | 5           | 1                       |
| 14      | SyncGroup           | 5           | 3                       |
| 15      | DescribeGroups      | 5           | 4                       |
| 16      | ListGroups          | 4           | 4                       |
| 17      | SaslHandshake       | 1           | 1                       |
| 18      | ApiVersions         | 3           | 3                       |
| 19      | CreateTopics        | 7           | 4                       |
| 20      | DeleteTopics        | 6           | 1                       |
| 21      | DeleteRecords       | 2           | 1                       |
| 22      | InitProducerId      | 4           | 4                       |
| 24      | AddPartitionsToTxn  | 3           | 0                       |
| 25      | AddOffsetsToTxn     | 3           | 0                       |
| 26      | EndTxn              | 3           | 1                       |
| 28      | TxnOffsetCommit     | 3           | 3                       |
| 32      | DescribeConfigs     | 4           | 1                       |
| 33      | AlterConfigs        | 2           | 1                       |
| 36      | SaslAuthenticate    | 2           | 0                       |
| 37      | CreatePartitions    | 3           | 0                       |
| 42      | DeleteGroups        | 2           | 1                       |
| 47      | OffsetDelete        | 0           | 0                       |



# Recommendations for language binding developers

These recommendations are targeted for developers that wrap librdkafka
with their high-level languages, such as confluent-kafka-go or node-rdkafka.

## Expose the configuration interface pass-thru

librdkafka's string-based key=value configuration property interface controls
most runtime behaviour and evolves over time.
Most features are also only configuration-based, meaning they do not require a
new API (SSL and SASL are two good examples which are purely enabled through
configuration properties) and thus no changes needed to the binding/application
code.

If your language binding/applications allows configuration properties to be set
in a pass-through fashion without any pre-checking done by your binding code it
means that a simple upgrade of the underlying librdkafka library (but not your
bindings) will provide new features to the user.

## Error constants

The error constants, both the official (value >= 0) errors as well as the
internal (value < 0) errors, evolve constantly.
To avoid hard-coding them to expose to your users, librdkafka provides an API
to extract the full list programmatically during runtime or for
code generation, see `rd_kafka_get_err_descs()`.

## Reporting client software name and version to broker

[KIP-511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) introduces a means for a
Kafka client to report its implementation name and version to the broker, the
broker then exposes this as metrics (e.g., through JMX) to help Kafka operators
troubleshoot problematic clients, understand the impact of broker and client
upgrades, etc.
This requires broker version 2.4.0 or later (metrics added in 2.5.0).

librdkafka will send its name (`librdkafka`) and version (e.g., `v1.3.0`)
upon connect to a supporting broker.
To help distinguish high-level client bindings on top of librdkafka, a client
binding should configure the following two properties:
 * `client.software.name` - set to the binding name, e.g,
   `confluent-kafka-go` or `node-rdkafka`.
 * `client.software.version` - the version of the binding and the version
   of librdkafka, e.g., `v1.3.0-librdkafka-v1.3.0` or
   `1.2.0-librdkafka-v1.3.0`.
   It is **highly recommended** to include the librdkafka version in this
   version string.

These configuration properties are hidden (from CONFIGURATION.md et.al.) as
they should typically not be modified by the user.

## Documentation reuse

You are free to reuse the librdkafka API and CONFIGURATION documentation in
your project, but please do return any documentation improvements back to
librdkafka (file a github pull request).

## Community support

You are welcome to direct your users to
[librdkafka's Gitter chat room](http://gitter.im/edenhill/librdkafka) as long as
you monitor the conversions in there to pick up questions specific to your
bindings.
But for the most part user questions are usually generic enough to apply to all
librdkafka bindings.