diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.c | 1880 |
1 files changed, 1880 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.c new file mode 100644 index 00000000..1392cf7b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdbuf.c @@ -0,0 +1,1880 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "rd.h" +#include "rdbuf.h" +#include "rdunittest.h" +#include "rdlog.h" +#include "rdcrc32.h" +#include "crc32c.h" + + +static size_t +rd_buf_get_writable0(rd_buf_t *rbuf, rd_segment_t **segp, void **p); + + +/** + * @brief Destroy the segment and free its payload. + * + * @remark Will NOT unlink from buffer. + */ +static void rd_segment_destroy(rd_segment_t *seg) { + /* Free payload */ + if (seg->seg_free && seg->seg_p) + seg->seg_free(seg->seg_p); + + if (seg->seg_flags & RD_SEGMENT_F_FREE) + rd_free(seg); +} + +/** + * @brief Initialize segment with absolute offset, backing memory pointer, + * and backing memory size. + * @remark The segment is NOT linked. + */ +static void rd_segment_init(rd_segment_t *seg, void *mem, size_t size) { + memset(seg, 0, sizeof(*seg)); + seg->seg_p = mem; + seg->seg_size = size; +} + + +/** + * @brief Append segment to buffer + * + * @remark Will set the buffer position to the new \p seg if no existing wpos. + * @remark Will set the segment seg_absof to the current length of the buffer. + */ +static rd_segment_t *rd_buf_append_segment(rd_buf_t *rbuf, rd_segment_t *seg) { + TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link); + rbuf->rbuf_segment_cnt++; + seg->seg_absof = rbuf->rbuf_len; + rbuf->rbuf_len += seg->seg_of; + rbuf->rbuf_size += seg->seg_size; + + /* Update writable position */ + if (!rbuf->rbuf_wpos) + rbuf->rbuf_wpos = seg; + else + rd_buf_get_writable0(rbuf, NULL, NULL); + + return seg; +} + + + +/** + * @brief Attempt to allocate \p size bytes from the buffers extra buffers. + * @returns the allocated pointer which MUST NOT be freed, or NULL if + * not enough memory. + * @remark the returned pointer is memory-aligned to be safe. + */ +static void *extra_alloc(rd_buf_t *rbuf, size_t size) { + size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */ + void *p; + + if (of + size > rbuf->rbuf_extra_size) + return NULL; + + p = rbuf->rbuf_extra + of; /* Aligned pointer */ + + rbuf->rbuf_extra_len = of + size; + + return p; +} + + + +/** + * @brief Get a pre-allocated segment if available, or allocate a new + * segment with the extra amount of \p size bytes allocated for payload. + * + * Will not append the segment to the buffer. + */ +static rd_segment_t *rd_buf_alloc_segment0(rd_buf_t *rbuf, size_t size) { + rd_segment_t *seg; + + /* See if there is enough room in the extra buffer for + * allocating the segment header and the buffer, + * or just the segment header, else fall back to malloc. */ + if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) { + rd_segment_init(seg, size > 0 ? seg + 1 : NULL, size); + + } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) { + rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size); + if (size > 0) + seg->seg_free = rd_free; + + } else if ((seg = rd_malloc(sizeof(*seg) + size))) { + rd_segment_init(seg, size > 0 ? seg + 1 : NULL, size); + seg->seg_flags |= RD_SEGMENT_F_FREE; + + } else + rd_assert(!*"segment allocation failure"); + + return seg; +} + +/** + * @brief Allocate between \p min_size .. \p max_size of backing memory + * and add it as a new segment to the buffer. + * + * The buffer position is updated to point to the new segment. + * + * The segment will be over-allocated if permitted by max_size + * (max_size == 0 or max_size > min_size). + */ +static rd_segment_t * +rd_buf_alloc_segment(rd_buf_t *rbuf, size_t min_size, size_t max_size) { + rd_segment_t *seg; + + /* Over-allocate if allowed. */ + if (min_size != max_size || max_size == 0) + max_size = RD_MAX(sizeof(*seg) * 4, + RD_MAX(min_size * 2, rbuf->rbuf_size / 2)); + + seg = rd_buf_alloc_segment0(rbuf, max_size); + + rd_buf_append_segment(rbuf, seg); + + return seg; +} + + +/** + * @brief Ensures that \p size bytes will be available + * for writing and the position will be updated to point to the + * start of this contiguous block. + */ +void rd_buf_write_ensure_contig(rd_buf_t *rbuf, size_t size) { + rd_segment_t *seg = rbuf->rbuf_wpos; + + if (seg) { + void *p; + size_t remains = rd_segment_write_remains(seg, &p); + + if (remains >= size) + return; /* Existing segment has enough space. */ + + /* Future optimization: + * If existing segment has enough remaining space to warrant + * a split, do it, before allocating a new one. */ + } + + /* Allocate new segment */ + rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size); +} + +/** + * @brief Ensures that at least \p size bytes will be available for + * a future write. + * + * Typically used prior to a call to rd_buf_get_write_iov() + */ +void rd_buf_write_ensure(rd_buf_t *rbuf, size_t min_size, size_t max_size) { + size_t remains; + while ((remains = rd_buf_write_remains(rbuf)) < min_size) + rd_buf_alloc_segment(rbuf, min_size - remains, + max_size ? max_size - remains : 0); +} + + +/** + * @returns the segment at absolute offset \p absof, or NULL if out of range. + * + * @remark \p hint is an optional segment where to start looking, such as + * the current write or read position. + */ +rd_segment_t *rd_buf_get_segment_at_offset(const rd_buf_t *rbuf, + const rd_segment_t *hint, + size_t absof) { + const rd_segment_t *seg = hint; + + if (unlikely(absof >= rbuf->rbuf_len)) + return NULL; + + /* Only use current write position if possible and if it helps */ + if (!seg || absof < seg->seg_absof) + seg = TAILQ_FIRST(&rbuf->rbuf_segments); + + do { + if (absof >= seg->seg_absof && + absof < seg->seg_absof + seg->seg_of) { + rd_dassert(seg->seg_absof <= rd_buf_len(rbuf)); + return (rd_segment_t *)seg; + } + } while ((seg = TAILQ_NEXT(seg, seg_link))); + + return NULL; +} + + +/** + * @brief Split segment \p seg at absolute offset \p absof, appending + * a new segment after \p seg with its memory pointing to the + * memory starting at \p absof. + * \p seg 's memory will be shorted to the \p absof. + * + * The new segment is NOT appended to the buffer. + * + * @warning MUST ONLY be used on the LAST segment + * + * @warning if a segment is inserted between these two splitted parts + * it is imperative that the later segment's absof is corrected. + * + * @remark The seg_free callback is retained on the original \p seg + * and is not copied to the new segment, but flags are copied. + */ +static rd_segment_t * +rd_segment_split(rd_buf_t *rbuf, rd_segment_t *seg, size_t absof) { + rd_segment_t *newseg; + size_t relof; + + rd_assert(seg == rbuf->rbuf_wpos); + rd_assert(absof >= seg->seg_absof && + absof <= seg->seg_absof + seg->seg_of); + + relof = absof - seg->seg_absof; + + newseg = rd_buf_alloc_segment0(rbuf, 0); + + /* Add later part of split bytes to new segment */ + newseg->seg_p = seg->seg_p + relof; + newseg->seg_of = seg->seg_of - relof; + newseg->seg_size = seg->seg_size - relof; + newseg->seg_absof = SIZE_MAX; /* Invalid */ + newseg->seg_flags |= seg->seg_flags; + + /* Remove earlier part of split bytes from previous segment */ + seg->seg_of = relof; + seg->seg_size = relof; + + /* newseg's length will be added to rbuf_len in append_segment(), + * so shave it off here from seg's perspective. */ + rbuf->rbuf_len -= newseg->seg_of; + rbuf->rbuf_size -= newseg->seg_size; + + return newseg; +} + + + +/** + * @brief Unlink and destroy a segment, updating the \p rbuf + * with the decrease in length and capacity. + */ +static void rd_buf_destroy_segment(rd_buf_t *rbuf, rd_segment_t *seg) { + rd_assert(rbuf->rbuf_segment_cnt > 0 && rbuf->rbuf_len >= seg->seg_of && + rbuf->rbuf_size >= seg->seg_size); + + TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link); + rbuf->rbuf_segment_cnt--; + rbuf->rbuf_len -= seg->seg_of; + rbuf->rbuf_size -= seg->seg_size; + if (rbuf->rbuf_wpos == seg) + rbuf->rbuf_wpos = NULL; + + rd_segment_destroy(seg); +} + + +/** + * @brief Free memory associated with the \p rbuf, but not the rbuf itself. + * Segments will be destroyed. + */ +void rd_buf_destroy(rd_buf_t *rbuf) { + rd_segment_t *seg, *tmp; + +#if ENABLE_DEVEL + /* FIXME */ + if (rbuf->rbuf_len > 0 && 0) { + size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len; + float fill_grade = + (float)rbuf->rbuf_len / (float)rbuf->rbuf_size; + + printf("fill grade: %.2f%% (%" PRIusz + " bytes over-allocated)\n", + fill_grade * 100.0f, overalloc); + } +#endif + + + TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) { + rd_segment_destroy(seg); + } + + if (rbuf->rbuf_extra) + rd_free(rbuf->rbuf_extra); +} + + +/** + * @brief Same as rd_buf_destroy() but also frees the \p rbuf itself. + */ +void rd_buf_destroy_free(rd_buf_t *rbuf) { + rd_buf_destroy(rbuf); + rd_free(rbuf); +} + +/** + * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments + * where the first segment will have a \p buf_size of backing memory. + * + * The caller may rearrange the backing memory as it see fits. + */ +void rd_buf_init(rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) { + size_t totalloc = 0; + + memset(rbuf, 0, sizeof(*rbuf)); + TAILQ_INIT(&rbuf->rbuf_segments); + + if (!fixed_seg_cnt) { + assert(!buf_size); + return; + } + + /* Pre-allocate memory for a fixed set of segments that are known + * before-hand, to minimize the number of extra allocations + * needed for well-known layouts (such as headers, etc) */ + totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt; + + /* Pre-allocate extra space for the backing buffer. */ + totalloc += buf_size; + + rbuf->rbuf_extra_size = totalloc; + rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size); +} + + +/** + * @brief Allocates a buffer object and initializes it. + * @sa rd_buf_init() + */ +rd_buf_t *rd_buf_new(size_t fixed_seg_cnt, size_t buf_size) { + rd_buf_t *rbuf = rd_malloc(sizeof(*rbuf)); + rd_buf_init(rbuf, fixed_seg_cnt, buf_size); + return rbuf; +} + + +/** + * @brief Convenience writer iterator interface. + * + * After writing to \p p the caller must update the written length + * by calling rd_buf_write(rbuf, NULL, written_length) + * + * @returns the number of contiguous writable bytes in segment + * and sets \p *p to point to the start of the memory region. + */ +static size_t +rd_buf_get_writable0(rd_buf_t *rbuf, rd_segment_t **segp, void **p) { + rd_segment_t *seg; + + for (seg = rbuf->rbuf_wpos; seg; seg = TAILQ_NEXT(seg, seg_link)) { + size_t len = rd_segment_write_remains(seg, p); + + /* Even though the write offset hasn't changed we + * avoid future segment scans by adjusting the + * wpos here to the first writable segment. */ + rbuf->rbuf_wpos = seg; + if (segp) + *segp = seg; + + if (unlikely(len == 0)) + continue; + + /* Also adjust absof if the segment was allocated + * before the previous segment's memory was exhausted + * and thus now might have a lower absolute offset + * than the previos segment's now higher relative offset. */ + if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len) + seg->seg_absof = rbuf->rbuf_len; + + return len; + } + + return 0; +} + +size_t rd_buf_get_writable(rd_buf_t *rbuf, void **p) { + rd_segment_t *seg; + return rd_buf_get_writable0(rbuf, &seg, p); +} + + + +/** + * @brief Write \p payload of \p size bytes to current position + * in buffer. A new segment will be allocated and appended + * if needed. + * + * @returns the write position where payload was written (pre-write). + * Returning the pre-positition allows write_update() to later + * update the same location, effectively making write()s + * also a place-holder mechanism. + * + * @remark If \p payload is NULL only the write position is updated, + * in this mode it is required for the buffer to have enough + * memory for the NULL write (as it would otherwise cause + * uninitialized memory in any new segments allocated from this + * function). + */ +size_t rd_buf_write(rd_buf_t *rbuf, const void *payload, size_t size) { + size_t remains = size; + size_t initial_absof; + const char *psrc = (const char *)payload; + + initial_absof = rbuf->rbuf_len; + + /* Ensure enough space by pre-allocating segments. */ + rd_buf_write_ensure(rbuf, size, 0); + + while (remains > 0) { + void *p = NULL; + rd_segment_t *seg = NULL; + size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p); + size_t wlen = RD_MIN(remains, segremains); + + rd_dassert(seg == rbuf->rbuf_wpos); + rd_dassert(wlen > 0); + rd_dassert(seg->seg_p + seg->seg_of <= (char *)p && + (char *)p < seg->seg_p + seg->seg_size); + + if (payload) { + memcpy(p, psrc, wlen); + psrc += wlen; + } + + seg->seg_of += wlen; + rbuf->rbuf_len += wlen; + remains -= wlen; + } + + rd_assert(remains == 0); + + return initial_absof; +} + + + +/** + * @brief Write \p slice to \p rbuf + * + * @remark The slice position will be updated. + * + * @returns the number of bytes witten (always slice length) + */ +size_t rd_buf_write_slice(rd_buf_t *rbuf, rd_slice_t *slice) { + const void *p; + size_t rlen; + size_t sum = 0; + + while ((rlen = rd_slice_reader(slice, &p))) { + size_t r; + r = rd_buf_write(rbuf, p, rlen); + rd_dassert(r != 0); + sum += r; + } + + return sum; +} + + + +/** + * @brief Write \p payload of \p size at absolute offset \p absof + * WITHOUT updating the total buffer length. + * + * This is used to update a previously written region, such + * as updating the header length. + * + * @returns the number of bytes written, which may be less than \p size + * if the update spans multiple segments. + */ +static size_t rd_segment_write_update(rd_segment_t *seg, + size_t absof, + const void *payload, + size_t size) { + size_t relof; + size_t wlen; + + rd_dassert(absof >= seg->seg_absof); + relof = absof - seg->seg_absof; + rd_assert(relof <= seg->seg_of); + wlen = RD_MIN(size, seg->seg_of - relof); + rd_dassert(relof + wlen <= seg->seg_of); + + memcpy(seg->seg_p + relof, payload, wlen); + + return wlen; +} + + + +/** + * @brief Write \p payload of \p size at absolute offset \p absof + * WITHOUT updating the total buffer length. + * + * This is used to update a previously written region, such + * as updating the header length. + */ +size_t rd_buf_write_update(rd_buf_t *rbuf, + size_t absof, + const void *payload, + size_t size) { + rd_segment_t *seg; + const char *psrc = (const char *)payload; + size_t of; + + /* Find segment for offset */ + seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); + rd_assert(seg && *"invalid absolute offset"); + + for (of = 0; of < size; seg = TAILQ_NEXT(seg, seg_link)) { + rd_assert(seg->seg_absof <= rd_buf_len(rbuf)); + size_t wlen = rd_segment_write_update(seg, absof + of, + psrc + of, size - of); + of += wlen; + } + + rd_dassert(of == size); + + return of; +} + + + +/** + * @brief Push reference memory segment to current write position. + */ +void rd_buf_push0(rd_buf_t *rbuf, + const void *payload, + size_t size, + void (*free_cb)(void *), + rd_bool_t writable) { + rd_segment_t *prevseg, *seg, *tailseg = NULL; + + if ((prevseg = rbuf->rbuf_wpos) && + rd_segment_write_remains(prevseg, NULL) > 0) { + /* If the current segment still has room in it split it + * and insert the pushed segment in the middle (below). */ + tailseg = rd_segment_split( + rbuf, prevseg, prevseg->seg_absof + prevseg->seg_of); + } + + seg = rd_buf_alloc_segment0(rbuf, 0); + seg->seg_p = (char *)payload; + seg->seg_size = size; + seg->seg_of = size; + seg->seg_free = free_cb; + if (!writable) + seg->seg_flags |= RD_SEGMENT_F_RDONLY; + + rd_buf_append_segment(rbuf, seg); + + if (tailseg) + rd_buf_append_segment(rbuf, tailseg); +} + + + +/** + * @brief Erase \p size bytes at \p absof from buffer. + * + * @returns the number of bytes erased. + * + * @remark This is costly since it forces a memory move. + */ +size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size) { + rd_segment_t *seg, *next = NULL; + size_t of; + + /* Find segment for offset */ + seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof); + + /* Adjust segments until size is exhausted, then continue scanning to + * update the absolute offset. */ + for (of = 0; seg && of < size; seg = next) { + /* Example: + * seg_absof = 10 + * seg_of = 7 + * absof = 12 + * of = 1 + * size = 4 + * + * rof = 3 relative segment offset where to erase + * eraseremains = 3 remaining bytes to erase + * toerase = 3 available bytes to erase in segment + * segremains = 1 remaining bytes in segment after to + * the right of the erased part, i.e., + * the memory that needs to be moved to the + * left. + */ + /** Relative offset in segment for the absolute offset */ + size_t rof = (absof + of) - seg->seg_absof; + /** How much remains to be erased */ + size_t eraseremains = size - of; + /** How much can be erased from this segment */ + size_t toerase = RD_MIN(seg->seg_of - rof, eraseremains); + /** How much remains in the segment after the erased part */ + size_t segremains = seg->seg_of - (rof + toerase); + + next = TAILQ_NEXT(seg, seg_link); + + seg->seg_absof -= of; + + if (unlikely(toerase == 0)) + continue; + + if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY))) + RD_BUG("rd_buf_erase() called on read-only segment"); + + if (likely(segremains > 0)) + memmove(seg->seg_p + rof, seg->seg_p + rof + toerase, + segremains); + + seg->seg_of -= toerase; + rbuf->rbuf_len -= toerase; + + of += toerase; + + /* If segment is now empty, remove it */ + if (seg->seg_of == 0) + rd_buf_destroy_segment(rbuf, seg); + } + + /* Update absolute offset of remaining segments */ + for (seg = next; seg; seg = TAILQ_NEXT(seg, seg_link)) { + rd_assert(seg->seg_absof >= of); + seg->seg_absof -= of; + } + + rbuf->rbuf_erased += of; + + return of; +} + + + +/** + * @brief Do a write-seek, updating the write position to the given + * absolute \p absof. + * + * @warning Any sub-sequent segments will be destroyed. + * + * @returns -1 if the offset is out of bounds, else 0. + */ +int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof) { + rd_segment_t *seg, *next; + size_t relof; + + seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); + if (unlikely(!seg)) + return -1; + + relof = absof - seg->seg_absof; + if (unlikely(relof > seg->seg_of)) + return -1; + + /* Destroy sub-sequent segments in reverse order so that + * destroy_segment() length checks are correct. + * Will decrement rbuf_len et.al. */ + for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head); + next != seg;) { + rd_segment_t *this = next; + next = TAILQ_PREV(this, rd_segment_head, seg_link); + rd_buf_destroy_segment(rbuf, this); + } + + /* Update relative write offset */ + seg->seg_of = relof; + rbuf->rbuf_wpos = seg; + rbuf->rbuf_len = seg->seg_absof + seg->seg_of; + + rd_assert(rbuf->rbuf_len == absof); + + return 0; +} + + +/** + * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable + * segments from the buffer's current write position. + * + * @param iovcntp will be set to the number of populated \p iovs[] + * @param size_max limits the total number of bytes made available. + * Note: this value may be overshot with the size of one + * segment. + * + * @returns the total number of bytes in the represented segments. + * + * @remark the write position will NOT be updated. + */ +size_t rd_buf_get_write_iov(const rd_buf_t *rbuf, + struct iovec *iovs, + size_t *iovcntp, + size_t iov_max, + size_t size_max) { + const rd_segment_t *seg; + size_t iovcnt = 0; + size_t sum = 0; + + for (seg = rbuf->rbuf_wpos; seg && iovcnt < iov_max && sum < size_max; + seg = TAILQ_NEXT(seg, seg_link)) { + size_t len; + void *p; + + len = rd_segment_write_remains(seg, &p); + if (unlikely(len == 0)) + continue; + + iovs[iovcnt].iov_base = p; + iovs[iovcnt++].iov_len = len; + + sum += len; + } + + *iovcntp = iovcnt; + + return sum; +} + + + +/** + * @name Slice reader interface + * + * @{ + */ + +/** + * @brief Initialize a new slice of \p size bytes starting at \p seg with + * relative offset \p rof. + * + * @returns 0 on success or -1 if there is not at least \p size bytes available + * in the buffer. + */ +int rd_slice_init_seg(rd_slice_t *slice, + const rd_buf_t *rbuf, + const rd_segment_t *seg, + size_t rof, + size_t size) { + /* Verify that \p size bytes are indeed available in the buffer. */ + if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size))) + return -1; + + slice->buf = rbuf; + slice->seg = seg; + slice->rof = rof; + slice->start = seg->seg_absof + rof; + slice->end = slice->start + size; + + rd_assert(seg->seg_absof + rof >= slice->start && + seg->seg_absof + rof <= slice->end); + + rd_assert(slice->end <= rd_buf_len(rbuf)); + + return 0; +} + +/** + * @brief Initialize new slice of \p size bytes starting at offset \p absof + * + * @returns 0 on success or -1 if there is not at least \p size bytes available + * in the buffer. + */ +int rd_slice_init(rd_slice_t *slice, + const rd_buf_t *rbuf, + size_t absof, + size_t size) { + const rd_segment_t *seg = + rd_buf_get_segment_at_offset(rbuf, NULL, absof); + if (unlikely(!seg)) + return -1; + + return rd_slice_init_seg(slice, rbuf, seg, absof - seg->seg_absof, + size); +} + +/** + * @brief Initialize new slice covering the full buffer \p rbuf + */ +void rd_slice_init_full(rd_slice_t *slice, const rd_buf_t *rbuf) { + int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf)); + rd_assert(r == 0); +} + + + +/** + * @sa rd_slice_reader() rd_slice_peeker() + */ +size_t rd_slice_reader0(rd_slice_t *slice, const void **p, int update_pos) { + size_t rof = slice->rof; + size_t rlen; + const rd_segment_t *seg; + + /* Find segment with non-zero payload */ + for (seg = slice->seg; + seg && seg->seg_absof + rof < slice->end && seg->seg_of == rof; + seg = TAILQ_NEXT(seg, seg_link)) + rof = 0; + + if (unlikely(!seg || seg->seg_absof + rof >= slice->end)) + return 0; + + *p = (const void *)(seg->seg_p + rof); + rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice)); + + if (update_pos) { + if (slice->seg != seg) { + rd_assert(seg->seg_absof + rof >= slice->start && + seg->seg_absof + rof + rlen <= slice->end); + slice->seg = seg; + slice->rof = rlen; + } else { + slice->rof += rlen; + } + } + + return rlen; +} + + +/** + * @brief Convenience reader iterator interface. + * + * Call repeatedly from while loop until it returns 0. + * + * @param slice slice to read from, position will be updated. + * @param p will be set to the start of \p *rlenp contiguous bytes of memory + * @param rlenp will be set to the number of bytes available in \p p + * + * @returns the number of bytes read, or 0 if slice is empty. + */ +size_t rd_slice_reader(rd_slice_t *slice, const void **p) { + return rd_slice_reader0(slice, p, 1 /*update_pos*/); +} + +/** + * @brief Identical to rd_slice_reader() but does NOT update the read position + */ +size_t rd_slice_peeker(const rd_slice_t *slice, const void **p) { + return rd_slice_reader0((rd_slice_t *)slice, p, 0 /*dont update_pos*/); +} + + + +/** + * @brief Read \p size bytes from current read position, + * advancing the read offset by the number of bytes copied to \p dst. + * + * If there are less than \p size remaining in the buffer + * then 0 is returned and no bytes are copied. + * + * @returns \p size, or 0 if \p size bytes are not available in buffer. + * + * @remark This performs a complete read, no partitial reads. + * + * @remark If \p dst is NULL only the read position is updated. + */ +size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) { + size_t remains = size; + char *d = (char *)dst; /* Possibly NULL */ + size_t rlen; + const void *p; + size_t orig_end = slice->end; + + if (unlikely(rd_slice_remains(slice) < size)) + return 0; + + /* Temporarily shrink slice to offset + \p size */ + slice->end = rd_slice_abs_offset(slice) + size; + + while ((rlen = rd_slice_reader(slice, &p))) { + rd_dassert(remains >= rlen); + if (dst) { + memcpy(d, p, rlen); + d += rlen; + } + remains -= rlen; + } + + rd_dassert(remains == 0); + + /* Restore original size */ + slice->end = orig_end; + + return size; +} + + +/** + * @brief Read \p size bytes from absolute slice offset \p offset + * and store in \p dst, without updating the slice read position. + * + * @returns \p size if the offset and size was within the slice, else 0. + */ +size_t +rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size) { + rd_slice_t sub = *slice; + + if (unlikely(rd_slice_seek(&sub, offset) == -1)) + return 0; + + return rd_slice_read(&sub, dst, size); +} + + +/** + * @brief Read a varint-encoded unsigned integer from \p slice, + * storing the decoded number in \p nump on success (return value > 0). + * + * @returns the number of bytes read on success or 0 in case of + * buffer underflow. + */ +size_t rd_slice_read_uvarint(rd_slice_t *slice, uint64_t *nump) { + uint64_t num = 0; + int shift = 0; + size_t rof = slice->rof; + const rd_segment_t *seg; + + /* Traverse segments, byte for byte, until varint is decoded + * or no more segments available (underflow). */ + for (seg = slice->seg; seg; seg = TAILQ_NEXT(seg, seg_link)) { + for (; rof < seg->seg_of; rof++) { + unsigned char oct; + + if (unlikely(seg->seg_absof + rof >= slice->end)) + return 0; /* Underflow */ + + oct = *(const unsigned char *)(seg->seg_p + rof); + + num |= (uint64_t)(oct & 0x7f) << shift; + shift += 7; + + if (!(oct & 0x80)) { + /* Done: no more bytes expected */ + *nump = num; + + /* Update slice's read pointer and offset */ + if (slice->seg != seg) + slice->seg = seg; + slice->rof = rof + 1; /* including the +1 byte + * that was just read */ + + return shift / 7; + } + } + + rof = 0; + } + + return 0; /* Underflow */ +} + + +/** + * @returns a pointer to \p size contiguous bytes at the current read offset. + * If there isn't \p size contiguous bytes available NULL will + * be returned. + * + * @remark The read position is updated to point past \p size. + */ +const void *rd_slice_ensure_contig(rd_slice_t *slice, size_t size) { + void *p; + + if (unlikely(rd_slice_remains(slice) < size || + slice->rof + size > slice->seg->seg_of)) + return NULL; + + p = slice->seg->seg_p + slice->rof; + + rd_slice_read(slice, NULL, size); + + return p; +} + + + +/** + * @brief Sets the slice's read position. The offset is the slice offset, + * not buffer offset. + * + * @returns 0 if offset was within range, else -1 in which case the position + * is not changed. + */ +int rd_slice_seek(rd_slice_t *slice, size_t offset) { + const rd_segment_t *seg; + size_t absof = slice->start + offset; + + if (unlikely(absof >= slice->end)) + return -1; + + seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof); + rd_assert(seg); + + slice->seg = seg; + slice->rof = absof - seg->seg_absof; + rd_assert(seg->seg_absof + slice->rof >= slice->start && + seg->seg_absof + slice->rof <= slice->end); + + return 0; +} + + +/** + * @brief Narrow the current slice to \p size, saving + * the original slice state info \p save_slice. + * + * Use rd_slice_widen() to restore the saved slice + * with the read count updated from the narrowed slice. + * + * This is useful for reading a sub-slice of a larger slice + * without having to pass the lesser length around. + * + * @returns 1 if enough underlying slice buffer memory is available, else 0. + */ +int rd_slice_narrow(rd_slice_t *slice, rd_slice_t *save_slice, size_t size) { + if (unlikely(slice->start + size > slice->end)) + return 0; + *save_slice = *slice; + slice->end = slice->start + size; + rd_assert(rd_slice_abs_offset(slice) <= slice->end); + return 1; +} + +/** + * @brief Same as rd_slice_narrow() but using a relative size \p relsize + * from the current read position. + */ +int rd_slice_narrow_relative(rd_slice_t *slice, + rd_slice_t *save_slice, + size_t relsize) { + return rd_slice_narrow(slice, save_slice, + rd_slice_offset(slice) + relsize); +} + + +/** + * @brief Restore the original \p save_slice size from a previous call to + * rd_slice_narrow(), while keeping the updated read pointer from + * \p slice. + */ +void rd_slice_widen(rd_slice_t *slice, const rd_slice_t *save_slice) { + slice->end = save_slice->end; +} + + +/** + * @brief Copy the original slice \p orig to \p new_slice and adjust + * the new slice length to \p size. + * + * This is a side-effect free form of rd_slice_narrow() which is not to + * be used with rd_slice_widen(). + * + * @returns 1 if enough underlying slice buffer memory is available, else 0. + */ +int rd_slice_narrow_copy(const rd_slice_t *orig, + rd_slice_t *new_slice, + size_t size) { + if (unlikely(orig->start + size > orig->end)) + return 0; + *new_slice = *orig; + new_slice->end = orig->start + size; + rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end); + return 1; +} + +/** + * @brief Same as rd_slice_narrow_copy() but with a relative size from + * the current read position. + */ +int rd_slice_narrow_copy_relative(const rd_slice_t *orig, + rd_slice_t *new_slice, + size_t relsize) { + return rd_slice_narrow_copy(orig, new_slice, + rd_slice_offset(orig) + relsize); +} + + + +/** + * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable + * segments from the slice's current read position. + * + * @param iovcntp will be set to the number of populated \p iovs[] + * @param size_max limits the total number of bytes made available. + * Note: this value may be overshot with the size of one + * segment. + * + * @returns the total number of bytes in the represented segments. + * + * @remark will NOT update the read position. + */ +size_t rd_slice_get_iov(const rd_slice_t *slice, + struct iovec *iovs, + size_t *iovcntp, + size_t iov_max, + size_t size_max) { + const void *p; + size_t rlen; + size_t iovcnt = 0; + size_t sum = 0; + rd_slice_t copy = *slice; /* Use a copy of the slice so we dont + * update the position for the caller. */ + + while (sum < size_max && iovcnt < iov_max && + (rlen = rd_slice_reader(©, &p))) { + iovs[iovcnt].iov_base = (void *)p; + iovs[iovcnt++].iov_len = rlen; + + sum += rlen; + } + + *iovcntp = iovcnt; + + return sum; +} + + + +/** + * @brief CRC32 calculation of slice. + * + * @returns the calculated CRC + * + * @remark the slice's position is updated. + */ +uint32_t rd_slice_crc32(rd_slice_t *slice) { + rd_crc32_t crc; + const void *p; + size_t rlen; + + crc = rd_crc32_init(); + + while ((rlen = rd_slice_reader(slice, &p))) + crc = rd_crc32_update(crc, p, rlen); + + return (uint32_t)rd_crc32_finalize(crc); +} + +/** + * @brief Compute CRC-32C of segments starting at at buffer position \p absof, + * also supporting the case where the position/offset is not at the + * start of the first segment. + * + * @remark the slice's position is updated. + */ +uint32_t rd_slice_crc32c(rd_slice_t *slice) { + const void *p; + size_t rlen; + uint32_t crc = 0; + + while ((rlen = rd_slice_reader(slice, &p))) + crc = rd_crc32c(crc, (const char *)p, rlen); + + return crc; +} + + + +/** + * @name Debugging dumpers + * + * + */ + +static void rd_segment_dump(const rd_segment_t *seg, + const char *ind, + size_t relof, + int do_hexdump) { + fprintf(stderr, + "%s((rd_segment_t *)%p): " + "p %p, of %" PRIusz + ", " + "absof %" PRIusz ", size %" PRIusz ", free %p, flags 0x%x\n", + ind, seg, seg->seg_p, seg->seg_of, seg->seg_absof, + seg->seg_size, seg->seg_free, seg->seg_flags); + rd_assert(relof <= seg->seg_of); + if (do_hexdump) + rd_hexdump(stderr, "segment", seg->seg_p + relof, + seg->seg_of - relof); +} + +void rd_buf_dump(const rd_buf_t *rbuf, int do_hexdump) { + const rd_segment_t *seg; + + fprintf(stderr, + "((rd_buf_t *)%p):\n" + " len %" PRIusz " size %" PRIusz ", %" PRIusz "/%" PRIusz + " extra memory used\n", + rbuf, rbuf->rbuf_len, rbuf->rbuf_size, rbuf->rbuf_extra_len, + rbuf->rbuf_extra_size); + + if (rbuf->rbuf_wpos) { + fprintf(stderr, " wpos:\n"); + rd_segment_dump(rbuf->rbuf_wpos, " ", 0, 0); + } + + if (rbuf->rbuf_segment_cnt > 0) { + size_t segcnt = 0; + + fprintf(stderr, " %" PRIusz " linked segments:\n", + rbuf->rbuf_segment_cnt); + TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) { + rd_segment_dump(seg, " ", 0, do_hexdump); + segcnt++; + rd_assert(segcnt <= rbuf->rbuf_segment_cnt); + } + } +} + +void rd_slice_dump(const rd_slice_t *slice, int do_hexdump) { + const rd_segment_t *seg; + size_t relof; + + fprintf(stderr, + "((rd_slice_t *)%p):\n" + " buf %p (len %" PRIusz "), seg %p (absof %" PRIusz + "), " + "rof %" PRIusz ", start %" PRIusz ", end %" PRIusz + ", size %" PRIusz ", offset %" PRIusz "\n", + slice, slice->buf, rd_buf_len(slice->buf), slice->seg, + slice->seg ? slice->seg->seg_absof : 0, slice->rof, + slice->start, slice->end, rd_slice_size(slice), + rd_slice_offset(slice)); + relof = slice->rof; + + for (seg = slice->seg; seg; seg = TAILQ_NEXT(seg, seg_link)) { + rd_segment_dump(seg, " ", relof, do_hexdump); + relof = 0; + } +} + + +/** + * @name Unit-tests + * + * + * + */ + + +/** + * @brief Basic write+read test + */ +static int do_unittest_write_read(void) { + rd_buf_t b; + char ones[1024]; + char twos[1024]; + char threes[1024]; + char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ + char buf[1024 * 3]; + rd_slice_t slice; + size_t r, pos; + + memset(ones, 0x1, sizeof(ones)); + memset(twos, 0x2, sizeof(twos)); + memset(threes, 0x3, sizeof(threes)); + memset(fiftyfives, 0x55, sizeof(fiftyfives)); + memset(buf, 0x55, sizeof(buf)); + + rd_buf_init(&b, 2, 1000); + + /* + * Verify write + */ + r = rd_buf_write(&b, ones, 200); + RD_UT_ASSERT(r == 0, "write() returned position %" PRIusz, r); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200, "pos() returned position %" PRIusz, pos); + + r = rd_buf_write(&b, twos, 800); + RD_UT_ASSERT(r == 200, "write() returned position %" PRIusz, r); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200 + 800, "pos() returned position %" PRIusz, pos); + + /* Buffer grows here */ + r = rd_buf_write(&b, threes, 1); + RD_UT_ASSERT(pos == 200 + 800, "write() returned position %" PRIusz, r); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200 + 800 + 1, "pos() returned position %" PRIusz, + pos); + + /* + * Verify read + */ + /* Get full slice. */ + rd_slice_init_full(&slice, &b); + + r = rd_slice_read(&slice, buf, 200 + 800 + 2); + RD_UT_ASSERT(r == 0, + "read() > remaining should have failed, gave %" PRIusz, r); + r = rd_slice_read(&slice, buf, 200 + 800 + 1); + RD_UT_ASSERT(r == 200 + 800 + 1, + "read() returned %" PRIusz " (%" PRIusz " remains)", r, + rd_slice_remains(&slice)); + + RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones"); + RD_UT_ASSERT(!memcmp(buf + 200, twos, 800), "verify twos"); + RD_UT_ASSERT(!memcmp(buf + 200 + 800, threes, 1), "verify threes"); + RD_UT_ASSERT(!memcmp(buf + 200 + 800 + 1, fiftyfives, 100), + "verify 55s"); + + rd_buf_destroy(&b); + + RD_UT_PASS(); +} + + +/** + * @brief Helper read verifier, not a unit-test itself. + */ +#define do_unittest_read_verify(b, absof, len, verify) \ + do { \ + int __fail = do_unittest_read_verify0(b, absof, len, verify); \ + RD_UT_ASSERT(!__fail, \ + "read_verify(absof=%" PRIusz ",len=%" PRIusz \ + ") " \ + "failed", \ + (size_t)absof, (size_t)len); \ + } while (0) + +static int do_unittest_read_verify0(const rd_buf_t *b, + size_t absof, + size_t len, + const char *verify) { + rd_slice_t slice, sub; + char buf[1024]; + size_t half; + size_t r; + int i; + + rd_assert(sizeof(buf) >= len); + + /* Get reader slice */ + i = rd_slice_init(&slice, b, absof, len); + RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i); + + r = rd_slice_read(&slice, buf, len); + RD_UT_ASSERT(r == len, + "read() returned %" PRIusz " expected %" PRIusz + " (%" PRIusz " remains)", + r, len, rd_slice_remains(&slice)); + + RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); + + r = rd_slice_offset(&slice); + RD_UT_ASSERT(r == len, "offset() returned %" PRIusz ", not %" PRIusz, r, + len); + + half = len / 2; + i = rd_slice_seek(&slice, half); + RD_UT_ASSERT(i == 0, "seek(%" PRIusz ") returned %d", half, i); + r = rd_slice_offset(&slice); + RD_UT_ASSERT(r == half, "offset() returned %" PRIusz ", not %" PRIusz, + r, half); + + /* Get a sub-slice covering the later half. */ + sub = rd_slice_pos(&slice); + r = rd_slice_offset(&sub); + RD_UT_ASSERT(r == 0, "sub: offset() returned %" PRIusz ", not %" PRIusz, + r, (size_t)0); + r = rd_slice_size(&sub); + RD_UT_ASSERT(r == half, + "sub: size() returned %" PRIusz ", not %" PRIusz, r, half); + r = rd_slice_remains(&sub); + RD_UT_ASSERT(r == half, + "sub: remains() returned %" PRIusz ", not %" PRIusz, r, + half); + + /* Read half */ + r = rd_slice_read(&sub, buf, half); + RD_UT_ASSERT(r == half, + "sub read() returned %" PRIusz " expected %" PRIusz + " (%" PRIusz " remains)", + r, len, rd_slice_remains(&sub)); + + RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); + + r = rd_slice_offset(&sub); + RD_UT_ASSERT(r == rd_slice_size(&sub), + "sub offset() returned %" PRIusz ", not %" PRIusz, r, + rd_slice_size(&sub)); + r = rd_slice_remains(&sub); + RD_UT_ASSERT(r == 0, + "sub: remains() returned %" PRIusz ", not %" PRIusz, r, + (size_t)0); + + return 0; +} + + +/** + * @brief write_seek() and split() test + */ +static int do_unittest_write_split_seek(void) { + rd_buf_t b; + char ones[1024]; + char twos[1024]; + char threes[1024]; + char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ + char buf[1024 * 3]; + size_t r, pos; + rd_segment_t *seg, *newseg; + + memset(ones, 0x1, sizeof(ones)); + memset(twos, 0x2, sizeof(twos)); + memset(threes, 0x3, sizeof(threes)); + memset(fiftyfives, 0x55, sizeof(fiftyfives)); + memset(buf, 0x55, sizeof(buf)); + + rd_buf_init(&b, 0, 0); + + /* + * Verify write + */ + r = rd_buf_write(&b, ones, 400); + RD_UT_ASSERT(r == 0, "write() returned position %" PRIusz, r); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 400, "pos() returned position %" PRIusz, pos); + + do_unittest_read_verify(&b, 0, 400, ones); + + /* + * Seek and re-write + */ + r = rd_buf_write_seek(&b, 200); + RD_UT_ASSERT(r == 0, "seek() failed"); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200, "pos() returned position %" PRIusz, pos); + + r = rd_buf_write(&b, twos, 100); + RD_UT_ASSERT(pos == 200, "write() returned position %" PRIusz, r); + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); + + do_unittest_read_verify(&b, 0, 200, ones); + do_unittest_read_verify(&b, 200, 100, twos); + + /* Make sure read() did not modify the write position. */ + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); + + /* Split buffer, write position is now at split where writes + * are not allowed (mid buffer). */ + seg = rd_buf_get_segment_at_offset(&b, NULL, 50); + RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment"); + newseg = rd_segment_split(&b, seg, 50); + rd_buf_append_segment(&b, newseg); + seg = rd_buf_get_segment_at_offset(&b, NULL, 50); + RD_UT_ASSERT(seg != NULL, "seg"); + RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg); + RD_UT_ASSERT(seg->seg_of > 0, + "assumed beginning of segment, got %" PRIusz, seg->seg_of); + + pos = rd_buf_write_pos(&b); + RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); + + /* Re-verify that nothing changed */ + do_unittest_read_verify(&b, 0, 200, ones); + do_unittest_read_verify(&b, 200, 100, twos); + + /* Do a write seek at buffer boundary, sub-sequent buffers should + * be destroyed. */ + r = rd_buf_write_seek(&b, 50); + RD_UT_ASSERT(r == 0, "seek() failed"); + do_unittest_read_verify(&b, 0, 50, ones); + + rd_buf_destroy(&b); + + RD_UT_PASS(); +} + +/** + * @brief Unittest to verify payload is correctly written and read. + * Each written u32 word is the running CRC of the word count. + */ +static int do_unittest_write_read_payload_correctness(void) { + uint32_t crc; + uint32_t write_crc, read_crc; + const int seed = 12345; + rd_buf_t b; + const size_t max_cnt = 20000; + rd_slice_t slice; + size_t r; + size_t i; + int pass; + + crc = rd_crc32_init(); + crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); + + rd_buf_init(&b, 0, 0); + for (i = 0; i < max_cnt; i++) { + crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); + rd_buf_write(&b, &crc, sizeof(crc)); + } + + write_crc = rd_crc32_finalize(crc); + + r = rd_buf_len(&b); + RD_UT_ASSERT(r == max_cnt * sizeof(crc), + "expected length %" PRIusz ", not %" PRIusz, r, + max_cnt * sizeof(crc)); + + /* + * Now verify the contents with a reader. + */ + rd_slice_init_full(&slice, &b); + + r = rd_slice_remains(&slice); + RD_UT_ASSERT(r == rd_buf_len(&b), + "slice remains %" PRIusz ", should be %" PRIusz, r, + rd_buf_len(&b)); + + for (pass = 0; pass < 2; pass++) { + /* Two passes: + * - pass 1: using peek() + * - pass 2: using read() + */ + const char *pass_str = pass == 0 ? "peek" : "read"; + + crc = rd_crc32_init(); + crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); + + for (i = 0; i < max_cnt; i++) { + uint32_t buf_crc; + + crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); + + if (pass == 0) + r = rd_slice_peek(&slice, i * sizeof(buf_crc), + &buf_crc, sizeof(buf_crc)); + else + r = rd_slice_read(&slice, &buf_crc, + sizeof(buf_crc)); + RD_UT_ASSERT(r == sizeof(buf_crc), + "%s() at #%" PRIusz + " failed: " + "r is %" PRIusz " not %" PRIusz, + pass_str, i, r, sizeof(buf_crc)); + RD_UT_ASSERT(buf_crc == crc, + "%s: invalid crc at #%" PRIusz + ": expected %" PRIu32 ", read %" PRIu32, + pass_str, i, crc, buf_crc); + } + + read_crc = rd_crc32_finalize(crc); + + RD_UT_ASSERT(read_crc == write_crc, + "%s: finalized read crc %" PRIu32 + " != write crc %" PRIu32, + pass_str, read_crc, write_crc); + } + + r = rd_slice_remains(&slice); + RD_UT_ASSERT(r == 0, "slice remains %" PRIusz ", should be %" PRIusz, r, + (size_t)0); + + rd_buf_destroy(&b); + + RD_UT_PASS(); +} + +#define do_unittest_iov_verify(...) \ + do { \ + int __fail = do_unittest_iov_verify0(__VA_ARGS__); \ + RD_UT_ASSERT(!__fail, "iov_verify() failed"); \ + } while (0) +static int +do_unittest_iov_verify0(rd_buf_t *b, size_t exp_iovcnt, size_t exp_totsize) { +#define MY_IOV_MAX 16 + struct iovec iov[MY_IOV_MAX]; + size_t iovcnt; + size_t i; + size_t totsize, sum; + + rd_assert(exp_iovcnt <= MY_IOV_MAX); + + totsize = + rd_buf_get_write_iov(b, iov, &iovcnt, MY_IOV_MAX, exp_totsize); + RD_UT_ASSERT(totsize >= exp_totsize, + "iov total size %" PRIusz " expected >= %" PRIusz, totsize, + exp_totsize); + RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX, + "iovcnt %" PRIusz ", expected %" PRIusz + " < x <= MY_IOV_MAX", + iovcnt, exp_iovcnt); + + sum = 0; + for (i = 0; i < iovcnt; i++) { + RD_UT_ASSERT(iov[i].iov_base, + "iov #%" PRIusz " iov_base not set", i); + RD_UT_ASSERT(iov[i].iov_len, + "iov #%" PRIusz " iov_len %" PRIusz + " out of range", + i, iov[i].iov_len); + sum += iov[i].iov_len; + RD_UT_ASSERT(sum <= totsize, + "sum %" PRIusz " > totsize %" PRIusz, sum, + totsize); + } + + RD_UT_ASSERT(sum == totsize, "sum %" PRIusz " != totsize %" PRIusz, sum, + totsize); + + return 0; +} + + +/** + * @brief Verify that buffer to iovec conversion works. + */ +static int do_unittest_write_iov(void) { + rd_buf_t b; + + rd_buf_init(&b, 0, 0); + rd_buf_write_ensure(&b, 100, 100); + + do_unittest_iov_verify(&b, 1, 100); + + /* Add a secondary buffer */ + rd_buf_write_ensure(&b, 30000, 0); + + do_unittest_iov_verify(&b, 2, 100 + 30000); + + + rd_buf_destroy(&b); + + RD_UT_PASS(); +} + +/** + * @brief Verify that erasing parts of the buffer works. + */ +static int do_unittest_erase(void) { + static const struct { + const char *segs[4]; + const char *writes[4]; + struct { + size_t of; + size_t size; + size_t retsize; + } erasures[4]; + + const char *expect; + } in[] = {/* 12|3|45 + * x x xx */ + { + .segs = {"12", "3", "45"}, + .erasures = {{1, 4, 4}}, + .expect = "1", + }, + /* 12|3|45 + * xx */ + { + .segs = {"12", "3", "45"}, + .erasures = {{0, 2, 2}}, + .expect = "345", + }, + /* 12|3|45 + * xx */ + { + .segs = {"12", "3", "45"}, + .erasures = {{3, 2, 2}}, + .expect = "123", + }, + /* 12|3|45 + * x + * 1 |3|45 + * x + * 1 | 45 + * x */ + { + .segs = {"12", "3", "45"}, + .erasures = {{1, 1, 1}, {1, 1, 1}, {2, 1, 1}}, + .expect = "14", + }, + /* 12|3|45 + * xxxxxxx */ + { + .segs = {"12", "3", "45"}, + .erasures = {{0, 5, 5}}, + .expect = "", + }, + /* 12|3|45 + * x */ + { + .segs = {"12", "3", "45"}, + .erasures = {{0, 1, 1}}, + .expect = "2345", + }, + /* 12|3|45 + * x */ + { + .segs = {"12", "3", "45"}, + .erasures = {{4, 1, 1}}, + .expect = "1234", + }, + /* 12|3|45 + * x */ + { + .segs = {"12", "3", "45"}, + .erasures = {{5, 10, 0}}, + .expect = "12345", + }, + /* 12|3|45 + * xxx */ + { + .segs = {"12", "3", "45"}, + .erasures = {{4, 3, 1}, {4, 3, 0}, {4, 3, 0}}, + .expect = "1234", + }, + /* 1 + * xxx */ + { + .segs = {"1"}, + .erasures = {{0, 3, 1}}, + .expect = "", + }, + /* 123456 + * xxxxxx */ + { + .segs = {"123456"}, + .erasures = {{0, 6, 6}}, + .expect = "", + }, + /* 123456789a + * xxx */ + { + .segs = {"123456789a"}, + .erasures = {{4, 3, 3}}, + .expect = "123489a", + }, + /* 1234|5678 + * x xx */ + {.segs = {"1234", "5678"}, + .erasures = {{3, 3, 3}}, + .writes = {"9abc"}, + .expect = "123789abc"}, + + {.expect = NULL}}; + int i; + + for (i = 0; in[i].expect; i++) { + rd_buf_t b; + rd_slice_t s; + size_t expsz = strlen(in[i].expect); + char *out; + int j; + size_t r; + int r2; + + rd_buf_init(&b, 0, 0); + + /* Write segments to buffer */ + for (j = 0; in[i].segs[j]; j++) + rd_buf_push_writable(&b, rd_strdup(in[i].segs[j]), + strlen(in[i].segs[j]), rd_free); + + /* Perform erasures */ + for (j = 0; in[i].erasures[j].retsize; j++) { + r = rd_buf_erase(&b, in[i].erasures[j].of, + in[i].erasures[j].size); + RD_UT_ASSERT(r == in[i].erasures[j].retsize, + "expected retsize %" PRIusz + " for i=%d,j=%d" + ", not %" PRIusz, + in[i].erasures[j].retsize, i, j, r); + } + + /* Perform writes */ + for (j = 0; in[i].writes[j]; j++) + rd_buf_write(&b, in[i].writes[j], + strlen(in[i].writes[j])); + + RD_UT_ASSERT(expsz == rd_buf_len(&b), + "expected buffer to be %" PRIusz + " bytes, not " + "%" PRIusz " for i=%d", + expsz, rd_buf_len(&b), i); + + /* Read back and verify */ + r2 = rd_slice_init(&s, &b, 0, rd_buf_len(&b)); + RD_UT_ASSERT((r2 == -1 && rd_buf_len(&b) == 0) || + (r2 == 0 && rd_buf_len(&b) > 0), + "slice_init(%" PRIusz ") returned %d for i=%d", + rd_buf_len(&b), r2, i); + if (r2 == -1) + continue; /* Empty buffer */ + + RD_UT_ASSERT(expsz == rd_slice_size(&s), + "expected slice to be %" PRIusz + " bytes, not %" PRIusz " for i=%d", + expsz, rd_slice_size(&s), i); + + out = rd_malloc(expsz); + + r = rd_slice_read(&s, out, expsz); + RD_UT_ASSERT(r == expsz, + "expected to read %" PRIusz " bytes, not %" PRIusz + " for i=%d", + expsz, r, i); + + RD_UT_ASSERT(!memcmp(out, in[i].expect, expsz), + "Expected \"%.*s\", not \"%.*s\" for i=%d", + (int)expsz, in[i].expect, (int)r, out, i); + + rd_free(out); + + RD_UT_ASSERT(rd_slice_remains(&s) == 0, + "expected no remaining bytes in slice, but got " + "%" PRIusz " for i=%d", + rd_slice_remains(&s), i); + + rd_buf_destroy(&b); + } + + + RD_UT_PASS(); +} + + +int unittest_rdbuf(void) { + int fails = 0; + + fails += do_unittest_write_read(); + fails += do_unittest_write_split_seek(); + fails += do_unittest_write_read_payload_correctness(); + fails += do_unittest_write_iov(); + fails += do_unittest_erase(); + + return fails; +} |