summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.h
blob: 1ef30e4a95e68b0df376883695bebb26cc6a20d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2017 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDBUF_H
#define _RDBUF_H

#ifndef _WIN32
/* for struct iovec */
#include <sys/socket.h>
#include <sys/types.h>
#endif

#include "rdsysqueue.h"


/**
 * @name Generic byte buffers
 *
 * @{
 *
 * A buffer is a list of segments, each segment having a memory pointer,
 * write offset, and capacity.
 *
 * The main buffer and segment structure is tailored for append-writing
 * or append-pushing foreign memory.
 *
 * Updates of previously written memory regions are possible through the
 * use of write_update() that takes an absolute offset.
 *
 * The write position is part of the buffer and segment structures, while
 * read is a separate object (rd_slice_t) that does not affect the buffer.
 */


/**
 * @brief Buffer segment
 */
typedef struct rd_segment_s {
        TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */
        char *seg_p;                        /**< Backing-store memory */
        size_t seg_of;                      /**< Current relative write-position
                                             *   (length of payload in this segment) */
        size_t seg_size;                    /**< Allocated size of seg_p */
        size_t seg_absof;          /**< Absolute offset of this segment's
                                    *   beginning in the grand rd_buf_t */
        void (*seg_free)(void *p); /**< Optional free function for seg_p */
        int seg_flags;             /**< Segment flags */
#define RD_SEGMENT_F_RDONLY 0x1    /**< Read-only segment */
#define RD_SEGMENT_F_FREE                                                      \
        0x2 /**< Free segment on destroy,                                      \
             *   e.g, not a fixed segment. */
} rd_segment_t;



TAILQ_HEAD(rd_segment_head, rd_segment_s);

/**
 * @brief Buffer, containing a list of segments.
 */
typedef struct rd_buf_s {
        struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */
        size_t rbuf_segment_cnt;              /**< Number of segments */

        rd_segment_t *rbuf_wpos; /**< Current write position seg */
        size_t rbuf_len;         /**< Current (written) length */
        size_t rbuf_erased;      /**< Total number of bytes
                                  *   erased from segments.
                                  *   This amount is taken into
                                  *   account when checking for
                                  *   writable space which is
                                  *   always at the end of the
                                  *   buffer and thus can't make
                                  *   use of the erased parts. */
        size_t rbuf_size;        /**< Total allocated size of
                                  *   all segments. */

        char *rbuf_extra;       /* Extra memory allocated for
                                 * use by segment structs,
                                 * buffer memory, etc. */
        size_t rbuf_extra_len;  /* Current extra memory used */
        size_t rbuf_extra_size; /* Total size of extra memory */
} rd_buf_t;



/**
 * @brief A read-only slice of a buffer.
 */
typedef struct rd_slice_s {
        const rd_buf_t *buf;     /**< Pointer to buffer */
        const rd_segment_t *seg; /**< Current read position segment.
                                  *   Will point to NULL when end of
                                  *   slice is reached. */
        size_t rof;              /**< Relative read offset in segment */
        size_t start;            /**< Slice start offset in buffer */
        size_t end;              /**< Slice end offset in buffer+1 */
} rd_slice_t;



/**
 * @returns the current write position (absolute offset)
 */
static RD_INLINE RD_UNUSED size_t rd_buf_write_pos(const rd_buf_t *rbuf) {
        const rd_segment_t *seg = rbuf->rbuf_wpos;

        if (unlikely(!seg)) {
#if ENABLE_DEVEL
                rd_assert(rbuf->rbuf_len == 0);
#endif
                return 0;
        }
#if ENABLE_DEVEL
        rd_assert(seg->seg_absof + seg->seg_of == rbuf->rbuf_len);
#endif
        return seg->seg_absof + seg->seg_of;
}


/**
 * @returns the number of bytes available for writing (before growing).
 */
static RD_INLINE RD_UNUSED size_t rd_buf_write_remains(const rd_buf_t *rbuf) {
        return rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased);
}



/**
 * @returns the number of bytes remaining to write to the given segment,
 *          and sets the \p *p pointer (unless NULL) to the start of
 *          the contiguous memory.
 */
static RD_INLINE RD_UNUSED size_t
rd_segment_write_remains(const rd_segment_t *seg, void **p) {
        if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
                return 0;
        if (p)
                *p = (void *)(seg->seg_p + seg->seg_of);
        return seg->seg_size - seg->seg_of;
}



/**
 * @returns the last segment for the buffer.
 */
static RD_INLINE RD_UNUSED rd_segment_t *rd_buf_last(const rd_buf_t *rbuf) {
        return TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head);
}


/**
 * @returns the total written buffer length
 */
static RD_INLINE RD_UNUSED size_t rd_buf_len(const rd_buf_t *rbuf) {
        return rbuf->rbuf_len;
}


int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof);


size_t rd_buf_write(rd_buf_t *rbuf, const void *payload, size_t size);
size_t rd_buf_write_slice(rd_buf_t *rbuf, rd_slice_t *slice);
size_t rd_buf_write_update(rd_buf_t *rbuf,
                           size_t absof,
                           const void *payload,
                           size_t size);
void rd_buf_push0(rd_buf_t *rbuf,
                  const void *payload,
                  size_t size,
                  void (*free_cb)(void *),
                  rd_bool_t writable);
#define rd_buf_push(rbuf, payload, size, free_cb)                              \
        rd_buf_push0(rbuf, payload, size, free_cb, rd_false /*not-writable*/)
#define rd_buf_push_writable(rbuf, payload, size, free_cb)                     \
        rd_buf_push0(rbuf, payload, size, free_cb, rd_true /*writable*/)

size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size);

size_t rd_buf_get_writable(rd_buf_t *rbuf, void **p);

void rd_buf_write_ensure_contig(rd_buf_t *rbuf, size_t size);

void rd_buf_write_ensure(rd_buf_t *rbuf, size_t min_size, size_t max_size);

size_t rd_buf_get_write_iov(const rd_buf_t *rbuf,
                            struct iovec *iovs,
                            size_t *iovcntp,
                            size_t iov_max,
                            size_t size_max);

void rd_buf_init(rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size);
rd_buf_t *rd_buf_new(size_t fixed_seg_cnt, size_t buf_size);

void rd_buf_destroy(rd_buf_t *rbuf);
void rd_buf_destroy_free(rd_buf_t *rbuf);

void rd_buf_dump(const rd_buf_t *rbuf, int do_hexdump);

int unittest_rdbuf(void);


/**@}*/



/**
 * @name Buffer reads operate on slices of an rd_buf_t and does not
 *       modify the underlying rd_buf_t itself.
 *
 * @warning A slice will not be valid/safe after the buffer or
 *          segments have been modified by a buf write operation
 *          (write, update, write_seek, etc).
 * @{
 */


/**
 * @returns the remaining length in the slice
 */
#define rd_slice_remains(slice) ((slice)->end - rd_slice_abs_offset(slice))

/**
 * @returns the total size of the slice, regardless of current position.
 */
#define rd_slice_size(slice) ((slice)->end - (slice)->start)

/**
 * @returns the read position in the slice as a new slice.
 */
static RD_INLINE RD_UNUSED rd_slice_t rd_slice_pos(const rd_slice_t *slice) {
        rd_slice_t newslice = *slice;

        if (!slice->seg)
                return newslice;

        newslice.start = slice->seg->seg_absof + slice->rof;

        return newslice;
}

/**
 * @returns the read position as an absolute buffer byte offset.
 * @remark this is the buffer offset, not the slice's local offset.
 */
static RD_INLINE RD_UNUSED size_t rd_slice_abs_offset(const rd_slice_t *slice) {
        if (unlikely(!slice->seg)) /* reader has reached the end */
                return slice->end;

        return slice->seg->seg_absof + slice->rof;
}

/**
 * @returns the read position as a byte offset.
 * @remark this is the slice-local offset, not the backing buffer's offset.
 */
static RD_INLINE RD_UNUSED size_t rd_slice_offset(const rd_slice_t *slice) {
        if (unlikely(!slice->seg)) /* reader has reached the end */
                return rd_slice_size(slice);

        return (slice->seg->seg_absof + slice->rof) - slice->start;
}



int rd_slice_init_seg(rd_slice_t *slice,
                      const rd_buf_t *rbuf,
                      const rd_segment_t *seg,
                      size_t rof,
                      size_t size);
int rd_slice_init(rd_slice_t *slice,
                  const rd_buf_t *rbuf,
                  size_t absof,
                  size_t size);
void rd_slice_init_full(rd_slice_t *slice, const rd_buf_t *rbuf);

size_t rd_slice_reader(rd_slice_t *slice, const void **p);
size_t rd_slice_peeker(const rd_slice_t *slice, const void **p);

size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size);
size_t
rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size);

size_t rd_slice_read_uvarint(rd_slice_t *slice, uint64_t *nump);

/**
 * @brief Read a zig-zag varint-encoded signed integer from \p slice,
 *        storing the decoded number in \p nump on success (return value > 0).
 *
 * @returns the number of bytes read on success or 0 in case of
 *          buffer underflow.
 */
static RD_UNUSED RD_INLINE size_t rd_slice_read_varint(rd_slice_t *slice,
                                                       int64_t *nump) {
        size_t r;
        uint64_t unum;

        r = rd_slice_read_uvarint(slice, &unum);
        if (likely(r > 0)) {
                /* Zig-zag decoding */
                *nump = (int64_t)((unum >> 1) ^ -(int64_t)(unum & 1));
        }

        return r;
}



const void *rd_slice_ensure_contig(rd_slice_t *slice, size_t size);

int rd_slice_seek(rd_slice_t *slice, size_t offset);

size_t rd_slice_get_iov(const rd_slice_t *slice,
                        struct iovec *iovs,
                        size_t *iovcntp,
                        size_t iov_max,
                        size_t size_max);


uint32_t rd_slice_crc32(rd_slice_t *slice);
uint32_t rd_slice_crc32c(rd_slice_t *slice);


int rd_slice_narrow(rd_slice_t *slice,
                    rd_slice_t *save_slice,
                    size_t size) RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_relative(rd_slice_t *slice,
                             rd_slice_t *save_slice,
                             size_t relsize) RD_WARN_UNUSED_RESULT;
void rd_slice_widen(rd_slice_t *slice, const rd_slice_t *save_slice);
int rd_slice_narrow_copy(const rd_slice_t *orig,
                         rd_slice_t *new_slice,
                         size_t size) RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_copy_relative(const rd_slice_t *orig,
                                  rd_slice_t *new_slice,
                                  size_t relsize) RD_WARN_UNUSED_RESULT;

void rd_slice_dump(const rd_slice_t *slice, int do_hexdump);


/**@}*/



#endif /* _RDBUF_H */