/* * librdkafka - Apache Kafka C library * * Copyright (c) 2017 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rd.h" #include "rdbuf.h" #include "rdunittest.h" #include "rdlog.h" #include "rdcrc32.h" #include "crc32c.h" static size_t rd_buf_get_writable0(rd_buf_t *rbuf, rd_segment_t **segp, void **p); /** * @brief Destroy the segment and free its payload. * * @remark Will NOT unlink from buffer. */ static void rd_segment_destroy(rd_segment_t *seg) { /* Free payload */ if (seg->seg_free && seg->seg_p) seg->seg_free(seg->seg_p); if (seg->seg_flags & RD_SEGMENT_F_FREE) rd_free(seg); } /** * @brief Initialize segment with absolute offset, backing memory pointer, * and backing memory size. * @remark The segment is NOT linked. */ static void rd_segment_init(rd_segment_t *seg, void *mem, size_t size) { memset(seg, 0, sizeof(*seg)); seg->seg_p = mem; seg->seg_size = size; } /** * @brief Append segment to buffer * * @remark Will set the buffer position to the new \p seg if no existing wpos. * @remark Will set the segment seg_absof to the current length of the buffer. */ static rd_segment_t *rd_buf_append_segment(rd_buf_t *rbuf, rd_segment_t *seg) { TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link); rbuf->rbuf_segment_cnt++; seg->seg_absof = rbuf->rbuf_len; rbuf->rbuf_len += seg->seg_of; rbuf->rbuf_size += seg->seg_size; /* Update writable position */ if (!rbuf->rbuf_wpos) rbuf->rbuf_wpos = seg; else rd_buf_get_writable0(rbuf, NULL, NULL); return seg; } /** * @brief Attempt to allocate \p size bytes from the buffers extra buffers. * @returns the allocated pointer which MUST NOT be freed, or NULL if * not enough memory. * @remark the returned pointer is memory-aligned to be safe. */ static void *extra_alloc(rd_buf_t *rbuf, size_t size) { size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */ void *p; if (of + size > rbuf->rbuf_extra_size) return NULL; p = rbuf->rbuf_extra + of; /* Aligned pointer */ rbuf->rbuf_extra_len = of + size; return p; } /** * @brief Get a pre-allocated segment if available, or allocate a new * segment with the extra amount of \p size bytes allocated for payload. * * Will not append the segment to the buffer. */ static rd_segment_t *rd_buf_alloc_segment0(rd_buf_t *rbuf, size_t size) { rd_segment_t *seg; /* See if there is enough room in the extra buffer for * allocating the segment header and the buffer, * or just the segment header, else fall back to malloc. */ if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) { rd_segment_init(seg, size > 0 ? seg + 1 : NULL, size); } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) { rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size); if (size > 0) seg->seg_free = rd_free; } else if ((seg = rd_malloc(sizeof(*seg) + size))) { rd_segment_init(seg, size > 0 ? seg + 1 : NULL, size); seg->seg_flags |= RD_SEGMENT_F_FREE; } else rd_assert(!*"segment allocation failure"); return seg; } /** * @brief Allocate between \p min_size .. \p max_size of backing memory * and add it as a new segment to the buffer. * * The buffer position is updated to point to the new segment. * * The segment will be over-allocated if permitted by max_size * (max_size == 0 or max_size > min_size). */ static rd_segment_t * rd_buf_alloc_segment(rd_buf_t *rbuf, size_t min_size, size_t max_size) { rd_segment_t *seg; /* Over-allocate if allowed. */ if (min_size != max_size || max_size == 0) max_size = RD_MAX(sizeof(*seg) * 4, RD_MAX(min_size * 2, rbuf->rbuf_size / 2)); seg = rd_buf_alloc_segment0(rbuf, max_size); rd_buf_append_segment(rbuf, seg); return seg; } /** * @brief Ensures that \p size bytes will be available * for writing and the position will be updated to point to the * start of this contiguous block. */ void rd_buf_write_ensure_contig(rd_buf_t *rbuf, size_t size) { rd_segment_t *seg = rbuf->rbuf_wpos; if (seg) { void *p; size_t remains = rd_segment_write_remains(seg, &p); if (remains >= size) return; /* Existing segment has enough space. */ /* Future optimization: * If existing segment has enough remaining space to warrant * a split, do it, before allocating a new one. */ } /* Allocate new segment */ rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size); } /** * @brief Ensures that at least \p size bytes will be available for * a future write. * * Typically used prior to a call to rd_buf_get_write_iov() */ void rd_buf_write_ensure(rd_buf_t *rbuf, size_t min_size, size_t max_size) { size_t remains; while ((remains = rd_buf_write_remains(rbuf)) < min_size) rd_buf_alloc_segment(rbuf, min_size - remains, max_size ? max_size - remains : 0); } /** * @returns the segment at absolute offset \p absof, or NULL if out of range. * * @remark \p hint is an optional segment where to start looking, such as * the current write or read position. */ rd_segment_t *rd_buf_get_segment_at_offset(const rd_buf_t *rbuf, const rd_segment_t *hint, size_t absof) { const rd_segment_t *seg = hint; if (unlikely(absof >= rbuf->rbuf_len)) return NULL; /* Only use current write position if possible and if it helps */ if (!seg || absof < seg->seg_absof) seg = TAILQ_FIRST(&rbuf->rbuf_segments); do { if (absof >= seg->seg_absof && absof < seg->seg_absof + seg->seg_of) { rd_dassert(seg->seg_absof <= rd_buf_len(rbuf)); return (rd_segment_t *)seg; } } while ((seg = TAILQ_NEXT(seg, seg_link))); return NULL; } /** * @brief Split segment \p seg at absolute offset \p absof, appending * a new segment after \p seg with its memory pointing to the * memory starting at \p absof. * \p seg 's memory will be shorted to the \p absof. * * The new segment is NOT appended to the buffer. * * @warning MUST ONLY be used on the LAST segment * * @warning if a segment is inserted between these two splitted parts * it is imperative that the later segment's absof is corrected. * * @remark The seg_free callback is retained on the original \p seg * and is not copied to the new segment, but flags are copied. */ static rd_segment_t * rd_segment_split(rd_buf_t *rbuf, rd_segment_t *seg, size_t absof) { rd_segment_t *newseg; size_t relof; rd_assert(seg == rbuf->rbuf_wpos); rd_assert(absof >= seg->seg_absof && absof <= seg->seg_absof + seg->seg_of); relof = absof - seg->seg_absof; newseg = rd_buf_alloc_segment0(rbuf, 0); /* Add later part of split bytes to new segment */ newseg->seg_p = seg->seg_p + relof; newseg->seg_of = seg->seg_of - relof; newseg->seg_size = seg->seg_size - relof; newseg->seg_absof = SIZE_MAX; /* Invalid */ newseg->seg_flags |= seg->seg_flags; /* Remove earlier part of split bytes from previous segment */ seg->seg_of = relof; seg->seg_size = relof; /* newseg's length will be added to rbuf_len in append_segment(), * so shave it off here from seg's perspective. */ rbuf->rbuf_len -= newseg->seg_of; rbuf->rbuf_size -= newseg->seg_size; return newseg; } /** * @brief Unlink and destroy a segment, updating the \p rbuf * with the decrease in length and capacity. */ static void rd_buf_destroy_segment(rd_buf_t *rbuf, rd_segment_t *seg) { rd_assert(rbuf->rbuf_segment_cnt > 0 && rbuf->rbuf_len >= seg->seg_of && rbuf->rbuf_size >= seg->seg_size); TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link); rbuf->rbuf_segment_cnt--; rbuf->rbuf_len -= seg->seg_of; rbuf->rbuf_size -= seg->seg_size; if (rbuf->rbuf_wpos == seg) rbuf->rbuf_wpos = NULL; rd_segment_destroy(seg); } /** * @brief Free memory associated with the \p rbuf, but not the rbuf itself. * Segments will be destroyed. */ void rd_buf_destroy(rd_buf_t *rbuf) { rd_segment_t *seg, *tmp; #if ENABLE_DEVEL /* FIXME */ if (rbuf->rbuf_len > 0 && 0) { size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len; float fill_grade = (float)rbuf->rbuf_len / (float)rbuf->rbuf_size; printf("fill grade: %.2f%% (%" PRIusz " bytes over-allocated)\n", fill_grade * 100.0f, overalloc); } #endif TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) { rd_segment_destroy(seg); } if (rbuf->rbuf_extra) rd_free(rbuf->rbuf_extra); } /** * @brief Same as rd_buf_destroy() but also frees the \p rbuf itself. */ void rd_buf_destroy_free(rd_buf_t *rbuf) { rd_buf_destroy(rbuf); rd_free(rbuf); } /** * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments * where the first segment will have a \p buf_size of backing memory. * * The caller may rearrange the backing memory as it see fits. */ void rd_buf_init(rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) { size_t totalloc = 0; memset(rbuf, 0, sizeof(*rbuf)); TAILQ_INIT(&rbuf->rbuf_segments); if (!fixed_seg_cnt) { assert(!buf_size); return; } /* Pre-allocate memory for a fixed set of segments that are known * before-hand, to minimize the number of extra allocations * needed for well-known layouts (such as headers, etc) */ totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt; /* Pre-allocate extra space for the backing buffer. */ totalloc += buf_size; rbuf->rbuf_extra_size = totalloc; rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size); } /** * @brief Allocates a buffer object and initializes it. * @sa rd_buf_init() */ rd_buf_t *rd_buf_new(size_t fixed_seg_cnt, size_t buf_size) { rd_buf_t *rbuf = rd_malloc(sizeof(*rbuf)); rd_buf_init(rbuf, fixed_seg_cnt, buf_size); return rbuf; } /** * @brief Convenience writer iterator interface. * * After writing to \p p the caller must update the written length * by calling rd_buf_write(rbuf, NULL, written_length) * * @returns the number of contiguous writable bytes in segment * and sets \p *p to point to the start of the memory region. */ static size_t rd_buf_get_writable0(rd_buf_t *rbuf, rd_segment_t **segp, void **p) { rd_segment_t *seg; for (seg = rbuf->rbuf_wpos; seg; seg = TAILQ_NEXT(seg, seg_link)) { size_t len = rd_segment_write_remains(seg, p); /* Even though the write offset hasn't changed we * avoid future segment scans by adjusting the * wpos here to the first writable segment. */ rbuf->rbuf_wpos = seg; if (segp) *segp = seg; if (unlikely(len == 0)) continue; /* Also adjust absof if the segment was allocated * before the previous segment's memory was exhausted * and thus now might have a lower absolute offset * than the previos segment's now higher relative offset. */ if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len) seg->seg_absof = rbuf->rbuf_len; return len; } return 0; } size_t rd_buf_get_writable(rd_buf_t *rbuf, void **p) { rd_segment_t *seg; return rd_buf_get_writable0(rbuf, &seg, p); } /** * @brief Write \p payload of \p size bytes to current position * in buffer. A new segment will be allocated and appended * if needed. * * @returns the write position where payload was written (pre-write). * Returning the pre-positition allows write_update() to later * update the same location, effectively making write()s * also a place-holder mechanism. * * @remark If \p payload is NULL only the write position is updated, * in this mode it is required for the buffer to have enough * memory for the NULL write (as it would otherwise cause * uninitialized memory in any new segments allocated from this * function). */ size_t rd_buf_write(rd_buf_t *rbuf, const void *payload, size_t size) { size_t remains = size; size_t initial_absof; const char *psrc = (const char *)payload; initial_absof = rbuf->rbuf_len; /* Ensure enough space by pre-allocating segments. */ rd_buf_write_ensure(rbuf, size, 0); while (remains > 0) { void *p = NULL; rd_segment_t *seg = NULL; size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p); size_t wlen = RD_MIN(remains, segremains); rd_dassert(seg == rbuf->rbuf_wpos); rd_dassert(wlen > 0); rd_dassert(seg->seg_p + seg->seg_of <= (char *)p && (char *)p < seg->seg_p + seg->seg_size); if (payload) { memcpy(p, psrc, wlen); psrc += wlen; } seg->seg_of += wlen; rbuf->rbuf_len += wlen; remains -= wlen; } rd_assert(remains == 0); return initial_absof; } /** * @brief Write \p slice to \p rbuf * * @remark The slice position will be updated. * * @returns the number of bytes witten (always slice length) */ size_t rd_buf_write_slice(rd_buf_t *rbuf, rd_slice_t *slice) { const void *p; size_t rlen; size_t sum = 0; while ((rlen = rd_slice_reader(slice, &p))) { size_t r; r = rd_buf_write(rbuf, p, rlen); rd_dassert(r != 0); sum += r; } return sum; } /** * @brief Write \p payload of \p size at absolute offset \p absof * WITHOUT updating the total buffer length. * * This is used to update a previously written region, such * as updating the header length. * * @returns the number of bytes written, which may be less than \p size * if the update spans multiple segments. */ static size_t rd_segment_write_update(rd_segment_t *seg, size_t absof, const void *payload, size_t size) { size_t relof; size_t wlen; rd_dassert(absof >= seg->seg_absof); relof = absof - seg->seg_absof; rd_assert(relof <= seg->seg_of); wlen = RD_MIN(size, seg->seg_of - relof); rd_dassert(relof + wlen <= seg->seg_of); memcpy(seg->seg_p + relof, payload, wlen); return wlen; } /** * @brief Write \p payload of \p size at absolute offset \p absof * WITHOUT updating the total buffer length. * * This is used to update a previously written region, such * as updating the header length. */ size_t rd_buf_write_update(rd_buf_t *rbuf, size_t absof, const void *payload, size_t size) { rd_segment_t *seg; const char *psrc = (const char *)payload; size_t of; /* Find segment for offset */ seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); rd_assert(seg && *"invalid absolute offset"); for (of = 0; of < size; seg = TAILQ_NEXT(seg, seg_link)) { rd_assert(seg->seg_absof <= rd_buf_len(rbuf)); size_t wlen = rd_segment_write_update(seg, absof + of, psrc + of, size - of); of += wlen; } rd_dassert(of == size); return of; } /** * @brief Push reference memory segment to current write position. */ void rd_buf_push0(rd_buf_t *rbuf, const void *payload, size_t size, void (*free_cb)(void *), rd_bool_t writable) { rd_segment_t *prevseg, *seg, *tailseg = NULL; if ((prevseg = rbuf->rbuf_wpos) && rd_segment_write_remains(prevseg, NULL) > 0) { /* If the current segment still has room in it split it * and insert the pushed segment in the middle (below). */ tailseg = rd_segment_split( rbuf, prevseg, prevseg->seg_absof + prevseg->seg_of); } seg = rd_buf_alloc_segment0(rbuf, 0); seg->seg_p = (char *)payload; seg->seg_size = size; seg->seg_of = size; seg->seg_free = free_cb; if (!writable) seg->seg_flags |= RD_SEGMENT_F_RDONLY; rd_buf_append_segment(rbuf, seg); if (tailseg) rd_buf_append_segment(rbuf, tailseg); } /** * @brief Erase \p size bytes at \p absof from buffer. * * @returns the number of bytes erased. * * @remark This is costly since it forces a memory move. */ size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size) { rd_segment_t *seg, *next = NULL; size_t of; /* Find segment for offset */ seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof); /* Adjust segments until size is exhausted, then continue scanning to * update the absolute offset. */ for (of = 0; seg && of < size; seg = next) { /* Example: * seg_absof = 10 * seg_of = 7 * absof = 12 * of = 1 * size = 4 * * rof = 3 relative segment offset where to erase * eraseremains = 3 remaining bytes to erase * toerase = 3 available bytes to erase in segment * segremains = 1 remaining bytes in segment after to * the right of the erased part, i.e., * the memory that needs to be moved to the * left. */ /** Relative offset in segment for the absolute offset */ size_t rof = (absof + of) - seg->seg_absof; /** How much remains to be erased */ size_t eraseremains = size - of; /** How much can be erased from this segment */ size_t toerase = RD_MIN(seg->seg_of - rof, eraseremains); /** How much remains in the segment after the erased part */ size_t segremains = seg->seg_of - (rof + toerase); next = TAILQ_NEXT(seg, seg_link); seg->seg_absof -= of; if (unlikely(toerase == 0)) continue; if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY))) RD_BUG("rd_buf_erase() called on read-only segment"); if (likely(segremains > 0)) memmove(seg->seg_p + rof, seg->seg_p + rof + toerase, segremains); seg->seg_of -= toerase; rbuf->rbuf_len -= toerase; of += toerase; /* If segment is now empty, remove it */ if (seg->seg_of == 0) rd_buf_destroy_segment(rbuf, seg); } /* Update absolute offset of remaining segments */ for (seg = next; seg; seg = TAILQ_NEXT(seg, seg_link)) { rd_assert(seg->seg_absof >= of); seg->seg_absof -= of; } rbuf->rbuf_erased += of; return of; } /** * @brief Do a write-seek, updating the write position to the given * absolute \p absof. * * @warning Any sub-sequent segments will be destroyed. * * @returns -1 if the offset is out of bounds, else 0. */ int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof) { rd_segment_t *seg, *next; size_t relof; seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); if (unlikely(!seg)) return -1; relof = absof - seg->seg_absof; if (unlikely(relof > seg->seg_of)) return -1; /* Destroy sub-sequent segments in reverse order so that * destroy_segment() length checks are correct. * Will decrement rbuf_len et.al. */ for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head); next != seg;) { rd_segment_t *this = next; next = TAILQ_PREV(this, rd_segment_head, seg_link); rd_buf_destroy_segment(rbuf, this); } /* Update relative write offset */ seg->seg_of = relof; rbuf->rbuf_wpos = seg; rbuf->rbuf_len = seg->seg_absof + seg->seg_of; rd_assert(rbuf->rbuf_len == absof); return 0; } /** * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable * segments from the buffer's current write position. * * @param iovcntp will be set to the number of populated \p iovs[] * @param size_max limits the total number of bytes made available. * Note: this value may be overshot with the size of one * segment. * * @returns the total number of bytes in the represented segments. * * @remark the write position will NOT be updated. */ size_t rd_buf_get_write_iov(const rd_buf_t *rbuf, struct iovec *iovs, size_t *iovcntp, size_t iov_max, size_t size_max) { const rd_segment_t *seg; size_t iovcnt = 0; size_t sum = 0; for (seg = rbuf->rbuf_wpos; seg && iovcnt < iov_max && sum < size_max; seg = TAILQ_NEXT(seg, seg_link)) { size_t len; void *p; len = rd_segment_write_remains(seg, &p); if (unlikely(len == 0)) continue; iovs[iovcnt].iov_base = p; iovs[iovcnt++].iov_len = len; sum += len; } *iovcntp = iovcnt; return sum; } /** * @name Slice reader interface * * @{ */ /** * @brief Initialize a new slice of \p size bytes starting at \p seg with * relative offset \p rof. * * @returns 0 on success or -1 if there is not at least \p size bytes available * in the buffer. */ int rd_slice_init_seg(rd_slice_t *slice, const rd_buf_t *rbuf, const rd_segment_t *seg, size_t rof, size_t size) { /* Verify that \p size bytes are indeed available in the buffer. */ if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size))) return -1; slice->buf = rbuf; slice->seg = seg; slice->rof = rof; slice->start = seg->seg_absof + rof; slice->end = slice->start + size; rd_assert(seg->seg_absof + rof >= slice->start && seg->seg_absof + rof <= slice->end); rd_assert(slice->end <= rd_buf_len(rbuf)); return 0; } /** * @brief Initialize new slice of \p size bytes starting at offset \p absof * * @returns 0 on success or -1 if there is not at least \p size bytes available * in the buffer. */ int rd_slice_init(rd_slice_t *slice, const rd_buf_t *rbuf, size_t absof, size_t size) { const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof); if (unlikely(!seg)) return -1; return rd_slice_init_seg(slice, rbuf, seg, absof - seg->seg_absof, size); } /** * @brief Initialize new slice covering the full buffer \p rbuf */ void rd_slice_init_full(rd_slice_t *slice, const rd_buf_t *rbuf) { int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf)); rd_assert(r == 0); } /** * @sa rd_slice_reader() rd_slice_peeker() */ size_t rd_slice_reader0(rd_slice_t *slice, const void **p, int update_pos) { size_t rof = slice->rof; size_t rlen; const rd_segment_t *seg; /* Find segment with non-zero payload */ for (seg = slice->seg; seg && seg->seg_absof + rof < slice->end && seg->seg_of == rof; seg = TAILQ_NEXT(seg, seg_link)) rof = 0; if (unlikely(!seg || seg->seg_absof + rof >= slice->end)) return 0; *p = (const void *)(seg->seg_p + rof); rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice)); if (update_pos) { if (slice->seg != seg) { rd_assert(seg->seg_absof + rof >= slice->start && seg->seg_absof + rof + rlen <= slice->end); slice->seg = seg; slice->rof = rlen; } else { slice->rof += rlen; } } return rlen; } /** * @brief Convenience reader iterator interface. * * Call repeatedly from while loop until it returns 0. * * @param slice slice to read from, position will be updated. * @param p will be set to the start of \p *rlenp contiguous bytes of memory * @param rlenp will be set to the number of bytes available in \p p * * @returns the number of bytes read, or 0 if slice is empty. */ size_t rd_slice_reader(rd_slice_t *slice, const void **p) { return rd_slice_reader0(slice, p, 1 /*update_pos*/); } /** * @brief Identical to rd_slice_reader() but does NOT update the read position */ size_t rd_slice_peeker(const rd_slice_t *slice, const void **p) { return rd_slice_reader0((rd_slice_t *)slice, p, 0 /*dont update_pos*/); } /** * @brief Read \p size bytes from current read position, * advancing the read offset by the number of bytes copied to \p dst. * * If there are less than \p size remaining in the buffer * then 0 is returned and no bytes are copied. * * @returns \p size, or 0 if \p size bytes are not available in buffer. * * @remark This performs a complete read, no partitial reads. * * @remark If \p dst is NULL only the read position is updated. */ size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) { size_t remains = size; char *d = (char *)dst; /* Possibly NULL */ size_t rlen; const void *p; size_t orig_end = slice->end; if (unlikely(rd_slice_remains(slice) < size)) return 0; /* Temporarily shrink slice to offset + \p size */ slice->end = rd_slice_abs_offset(slice) + size; while ((rlen = rd_slice_reader(slice, &p))) { rd_dassert(remains >= rlen); if (dst) { memcpy(d, p, rlen); d += rlen; } remains -= rlen; } rd_dassert(remains == 0); /* Restore original size */ slice->end = orig_end; return size; } /** * @brief Read \p size bytes from absolute slice offset \p offset * and store in \p dst, without updating the slice read position. * * @returns \p size if the offset and size was within the slice, else 0. */ size_t rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size) { rd_slice_t sub = *slice; if (unlikely(rd_slice_seek(&sub, offset) == -1)) return 0; return rd_slice_read(&sub, dst, size); } /** * @brief Read a varint-encoded unsigned integer from \p slice, * storing the decoded number in \p nump on success (return value > 0). * * @returns the number of bytes read on success or 0 in case of * buffer underflow. */ size_t rd_slice_read_uvarint(rd_slice_t *slice, uint64_t *nump) { uint64_t num = 0; int shift = 0; size_t rof = slice->rof; const rd_segment_t *seg; /* Traverse segments, byte for byte, until varint is decoded * or no more segments available (underflow). */ for (seg = slice->seg; seg; seg = TAILQ_NEXT(seg, seg_link)) { for (; rof < seg->seg_of; rof++) { unsigned char oct; if (unlikely(seg->seg_absof + rof >= slice->end)) return 0; /* Underflow */ oct = *(const unsigned char *)(seg->seg_p + rof); num |= (uint64_t)(oct & 0x7f) << shift; shift += 7; if (!(oct & 0x80)) { /* Done: no more bytes expected */ *nump = num; /* Update slice's read pointer and offset */ if (slice->seg != seg) slice->seg = seg; slice->rof = rof + 1; /* including the +1 byte * that was just read */ return shift / 7; } } rof = 0; } return 0; /* Underflow */ } /** * @returns a pointer to \p size contiguous bytes at the current read offset. * If there isn't \p size contiguous bytes available NULL will * be returned. * * @remark The read position is updated to point past \p size. */ const void *rd_slice_ensure_contig(rd_slice_t *slice, size_t size) { void *p; if (unlikely(rd_slice_remains(slice) < size || slice->rof + size > slice->seg->seg_of)) return NULL; p = slice->seg->seg_p + slice->rof; rd_slice_read(slice, NULL, size); return p; } /** * @brief Sets the slice's read position. The offset is the slice offset, * not buffer offset. * * @returns 0 if offset was within range, else -1 in which case the position * is not changed. */ int rd_slice_seek(rd_slice_t *slice, size_t offset) { const rd_segment_t *seg; size_t absof = slice->start + offset; if (unlikely(absof >= slice->end)) return -1; seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof); rd_assert(seg); slice->seg = seg; slice->rof = absof - seg->seg_absof; rd_assert(seg->seg_absof + slice->rof >= slice->start && seg->seg_absof + slice->rof <= slice->end); return 0; } /** * @brief Narrow the current slice to \p size, saving * the original slice state info \p save_slice. * * Use rd_slice_widen() to restore the saved slice * with the read count updated from the narrowed slice. * * This is useful for reading a sub-slice of a larger slice * without having to pass the lesser length around. * * @returns 1 if enough underlying slice buffer memory is available, else 0. */ int rd_slice_narrow(rd_slice_t *slice, rd_slice_t *save_slice, size_t size) { if (unlikely(slice->start + size > slice->end)) return 0; *save_slice = *slice; slice->end = slice->start + size; rd_assert(rd_slice_abs_offset(slice) <= slice->end); return 1; } /** * @brief Same as rd_slice_narrow() but using a relative size \p relsize * from the current read position. */ int rd_slice_narrow_relative(rd_slice_t *slice, rd_slice_t *save_slice, size_t relsize) { return rd_slice_narrow(slice, save_slice, rd_slice_offset(slice) + relsize); } /** * @brief Restore the original \p save_slice size from a previous call to * rd_slice_narrow(), while keeping the updated read pointer from * \p slice. */ void rd_slice_widen(rd_slice_t *slice, const rd_slice_t *save_slice) { slice->end = save_slice->end; } /** * @brief Copy the original slice \p orig to \p new_slice and adjust * the new slice length to \p size. * * This is a side-effect free form of rd_slice_narrow() which is not to * be used with rd_slice_widen(). * * @returns 1 if enough underlying slice buffer memory is available, else 0. */ int rd_slice_narrow_copy(const rd_slice_t *orig, rd_slice_t *new_slice, size_t size) { if (unlikely(orig->start + size > orig->end)) return 0; *new_slice = *orig; new_slice->end = orig->start + size; rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end); return 1; } /** * @brief Same as rd_slice_narrow_copy() but with a relative size from * the current read position. */ int rd_slice_narrow_copy_relative(const rd_slice_t *orig, rd_slice_t *new_slice, size_t relsize) { return rd_slice_narrow_copy(orig, new_slice, rd_slice_offset(orig) + relsize); } /** * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable * segments from the slice's current read position. * * @param iovcntp will be set to the number of populated \p iovs[] * @param size_max limits the total number of bytes made available. * Note: this value may be overshot with the size of one * segment. * * @returns the total number of bytes in the represented segments. * * @remark will NOT update the read position. */ size_t rd_slice_get_iov(const rd_slice_t *slice, struct iovec *iovs, size_t *iovcntp, size_t iov_max, size_t size_max) { const void *p; size_t rlen; size_t iovcnt = 0; size_t sum = 0; rd_slice_t copy = *slice; /* Use a copy of the slice so we dont * update the position for the caller. */ while (sum < size_max && iovcnt < iov_max && (rlen = rd_slice_reader(©, &p))) { iovs[iovcnt].iov_base = (void *)p; iovs[iovcnt++].iov_len = rlen; sum += rlen; } *iovcntp = iovcnt; return sum; } /** * @brief CRC32 calculation of slice. * * @returns the calculated CRC * * @remark the slice's position is updated. */ uint32_t rd_slice_crc32(rd_slice_t *slice) { rd_crc32_t crc; const void *p; size_t rlen; crc = rd_crc32_init(); while ((rlen = rd_slice_reader(slice, &p))) crc = rd_crc32_update(crc, p, rlen); return (uint32_t)rd_crc32_finalize(crc); } /** * @brief Compute CRC-32C of segments starting at at buffer position \p absof, * also supporting the case where the position/offset is not at the * start of the first segment. * * @remark the slice's position is updated. */ uint32_t rd_slice_crc32c(rd_slice_t *slice) { const void *p; size_t rlen; uint32_t crc = 0; while ((rlen = rd_slice_reader(slice, &p))) crc = rd_crc32c(crc, (const char *)p, rlen); return crc; } /** * @name Debugging dumpers * * */ static void rd_segment_dump(const rd_segment_t *seg, const char *ind, size_t relof, int do_hexdump) { fprintf(stderr, "%s((rd_segment_t *)%p): " "p %p, of %" PRIusz ", " "absof %" PRIusz ", size %" PRIusz ", free %p, flags 0x%x\n", ind, seg, seg->seg_p, seg->seg_of, seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags); rd_assert(relof <= seg->seg_of); if (do_hexdump) rd_hexdump(stderr, "segment", seg->seg_p + relof, seg->seg_of - relof); } void rd_buf_dump(const rd_buf_t *rbuf, int do_hexdump) { const rd_segment_t *seg; fprintf(stderr, "((rd_buf_t *)%p):\n" " len %" PRIusz " size %" PRIusz ", %" PRIusz "/%" PRIusz " extra memory used\n", rbuf, rbuf->rbuf_len, rbuf->rbuf_size, rbuf->rbuf_extra_len, rbuf->rbuf_extra_size); if (rbuf->rbuf_wpos) { fprintf(stderr, " wpos:\n"); rd_segment_dump(rbuf->rbuf_wpos, " ", 0, 0); } if (rbuf->rbuf_segment_cnt > 0) { size_t segcnt = 0; fprintf(stderr, " %" PRIusz " linked segments:\n", rbuf->rbuf_segment_cnt); TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) { rd_segment_dump(seg, " ", 0, do_hexdump); segcnt++; rd_assert(segcnt <= rbuf->rbuf_segment_cnt); } } } void rd_slice_dump(const rd_slice_t *slice, int do_hexdump) { const rd_segment_t *seg; size_t relof; fprintf(stderr, "((rd_slice_t *)%p):\n" " buf %p (len %" PRIusz "), seg %p (absof %" PRIusz "), " "rof %" PRIusz ", start %" PRIusz ", end %" PRIusz ", size %" PRIusz ", offset %" PRIusz "\n", slice, slice->buf, rd_buf_len(slice->buf), slice->seg, slice->seg ? slice->seg->seg_absof : 0, slice->rof, slice->start, slice->end, rd_slice_size(slice), rd_slice_offset(slice)); relof = slice->rof; for (seg = slice->seg; seg; seg = TAILQ_NEXT(seg, seg_link)) { rd_segment_dump(seg, " ", relof, do_hexdump); relof = 0; } } /** * @name Unit-tests * * * */ /** * @brief Basic write+read test */ static int do_unittest_write_read(void) { rd_buf_t b; char ones[1024]; char twos[1024]; char threes[1024]; char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ char buf[1024 * 3]; rd_slice_t slice; size_t r, pos; memset(ones, 0x1, sizeof(ones)); memset(twos, 0x2, sizeof(twos)); memset(threes, 0x3, sizeof(threes)); memset(fiftyfives, 0x55, sizeof(fiftyfives)); memset(buf, 0x55, sizeof(buf)); rd_buf_init(&b, 2, 1000); /* * Verify write */ r = rd_buf_write(&b, ones, 200); RD_UT_ASSERT(r == 0, "write() returned position %" PRIusz, r); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200, "pos() returned position %" PRIusz, pos); r = rd_buf_write(&b, twos, 800); RD_UT_ASSERT(r == 200, "write() returned position %" PRIusz, r); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200 + 800, "pos() returned position %" PRIusz, pos); /* Buffer grows here */ r = rd_buf_write(&b, threes, 1); RD_UT_ASSERT(pos == 200 + 800, "write() returned position %" PRIusz, r); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200 + 800 + 1, "pos() returned position %" PRIusz, pos); /* * Verify read */ /* Get full slice. */ rd_slice_init_full(&slice, &b); r = rd_slice_read(&slice, buf, 200 + 800 + 2); RD_UT_ASSERT(r == 0, "read() > remaining should have failed, gave %" PRIusz, r); r = rd_slice_read(&slice, buf, 200 + 800 + 1); RD_UT_ASSERT(r == 200 + 800 + 1, "read() returned %" PRIusz " (%" PRIusz " remains)", r, rd_slice_remains(&slice)); RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones"); RD_UT_ASSERT(!memcmp(buf + 200, twos, 800), "verify twos"); RD_UT_ASSERT(!memcmp(buf + 200 + 800, threes, 1), "verify threes"); RD_UT_ASSERT(!memcmp(buf + 200 + 800 + 1, fiftyfives, 100), "verify 55s"); rd_buf_destroy(&b); RD_UT_PASS(); } /** * @brief Helper read verifier, not a unit-test itself. */ #define do_unittest_read_verify(b, absof, len, verify) \ do { \ int __fail = do_unittest_read_verify0(b, absof, len, verify); \ RD_UT_ASSERT(!__fail, \ "read_verify(absof=%" PRIusz ",len=%" PRIusz \ ") " \ "failed", \ (size_t)absof, (size_t)len); \ } while (0) static int do_unittest_read_verify0(const rd_buf_t *b, size_t absof, size_t len, const char *verify) { rd_slice_t slice, sub; char buf[1024]; size_t half; size_t r; int i; rd_assert(sizeof(buf) >= len); /* Get reader slice */ i = rd_slice_init(&slice, b, absof, len); RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i); r = rd_slice_read(&slice, buf, len); RD_UT_ASSERT(r == len, "read() returned %" PRIusz " expected %" PRIusz " (%" PRIusz " remains)", r, len, rd_slice_remains(&slice)); RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); r = rd_slice_offset(&slice); RD_UT_ASSERT(r == len, "offset() returned %" PRIusz ", not %" PRIusz, r, len); half = len / 2; i = rd_slice_seek(&slice, half); RD_UT_ASSERT(i == 0, "seek(%" PRIusz ") returned %d", half, i); r = rd_slice_offset(&slice); RD_UT_ASSERT(r == half, "offset() returned %" PRIusz ", not %" PRIusz, r, half); /* Get a sub-slice covering the later half. */ sub = rd_slice_pos(&slice); r = rd_slice_offset(&sub); RD_UT_ASSERT(r == 0, "sub: offset() returned %" PRIusz ", not %" PRIusz, r, (size_t)0); r = rd_slice_size(&sub); RD_UT_ASSERT(r == half, "sub: size() returned %" PRIusz ", not %" PRIusz, r, half); r = rd_slice_remains(&sub); RD_UT_ASSERT(r == half, "sub: remains() returned %" PRIusz ", not %" PRIusz, r, half); /* Read half */ r = rd_slice_read(&sub, buf, half); RD_UT_ASSERT(r == half, "sub read() returned %" PRIusz " expected %" PRIusz " (%" PRIusz " remains)", r, len, rd_slice_remains(&sub)); RD_UT_ASSERT(!memcmp(buf, verify, len), "verify"); r = rd_slice_offset(&sub); RD_UT_ASSERT(r == rd_slice_size(&sub), "sub offset() returned %" PRIusz ", not %" PRIusz, r, rd_slice_size(&sub)); r = rd_slice_remains(&sub); RD_UT_ASSERT(r == 0, "sub: remains() returned %" PRIusz ", not %" PRIusz, r, (size_t)0); return 0; } /** * @brief write_seek() and split() test */ static int do_unittest_write_split_seek(void) { rd_buf_t b; char ones[1024]; char twos[1024]; char threes[1024]; char fiftyfives[100]; /* 0x55 indicates "untouched" memory */ char buf[1024 * 3]; size_t r, pos; rd_segment_t *seg, *newseg; memset(ones, 0x1, sizeof(ones)); memset(twos, 0x2, sizeof(twos)); memset(threes, 0x3, sizeof(threes)); memset(fiftyfives, 0x55, sizeof(fiftyfives)); memset(buf, 0x55, sizeof(buf)); rd_buf_init(&b, 0, 0); /* * Verify write */ r = rd_buf_write(&b, ones, 400); RD_UT_ASSERT(r == 0, "write() returned position %" PRIusz, r); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 400, "pos() returned position %" PRIusz, pos); do_unittest_read_verify(&b, 0, 400, ones); /* * Seek and re-write */ r = rd_buf_write_seek(&b, 200); RD_UT_ASSERT(r == 0, "seek() failed"); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200, "pos() returned position %" PRIusz, pos); r = rd_buf_write(&b, twos, 100); RD_UT_ASSERT(pos == 200, "write() returned position %" PRIusz, r); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); do_unittest_read_verify(&b, 0, 200, ones); do_unittest_read_verify(&b, 200, 100, twos); /* Make sure read() did not modify the write position. */ pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); /* Split buffer, write position is now at split where writes * are not allowed (mid buffer). */ seg = rd_buf_get_segment_at_offset(&b, NULL, 50); RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment"); newseg = rd_segment_split(&b, seg, 50); rd_buf_append_segment(&b, newseg); seg = rd_buf_get_segment_at_offset(&b, NULL, 50); RD_UT_ASSERT(seg != NULL, "seg"); RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg); RD_UT_ASSERT(seg->seg_of > 0, "assumed beginning of segment, got %" PRIusz, seg->seg_of); pos = rd_buf_write_pos(&b); RD_UT_ASSERT(pos == 200 + 100, "pos() returned position %" PRIusz, pos); /* Re-verify that nothing changed */ do_unittest_read_verify(&b, 0, 200, ones); do_unittest_read_verify(&b, 200, 100, twos); /* Do a write seek at buffer boundary, sub-sequent buffers should * be destroyed. */ r = rd_buf_write_seek(&b, 50); RD_UT_ASSERT(r == 0, "seek() failed"); do_unittest_read_verify(&b, 0, 50, ones); rd_buf_destroy(&b); RD_UT_PASS(); } /** * @brief Unittest to verify payload is correctly written and read. * Each written u32 word is the running CRC of the word count. */ static int do_unittest_write_read_payload_correctness(void) { uint32_t crc; uint32_t write_crc, read_crc; const int seed = 12345; rd_buf_t b; const size_t max_cnt = 20000; rd_slice_t slice; size_t r; size_t i; int pass; crc = rd_crc32_init(); crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); rd_buf_init(&b, 0, 0); for (i = 0; i < max_cnt; i++) { crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); rd_buf_write(&b, &crc, sizeof(crc)); } write_crc = rd_crc32_finalize(crc); r = rd_buf_len(&b); RD_UT_ASSERT(r == max_cnt * sizeof(crc), "expected length %" PRIusz ", not %" PRIusz, r, max_cnt * sizeof(crc)); /* * Now verify the contents with a reader. */ rd_slice_init_full(&slice, &b); r = rd_slice_remains(&slice); RD_UT_ASSERT(r == rd_buf_len(&b), "slice remains %" PRIusz ", should be %" PRIusz, r, rd_buf_len(&b)); for (pass = 0; pass < 2; pass++) { /* Two passes: * - pass 1: using peek() * - pass 2: using read() */ const char *pass_str = pass == 0 ? "peek" : "read"; crc = rd_crc32_init(); crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed)); for (i = 0; i < max_cnt; i++) { uint32_t buf_crc; crc = rd_crc32_update(crc, (void *)&i, sizeof(i)); if (pass == 0) r = rd_slice_peek(&slice, i * sizeof(buf_crc), &buf_crc, sizeof(buf_crc)); else r = rd_slice_read(&slice, &buf_crc, sizeof(buf_crc)); RD_UT_ASSERT(r == sizeof(buf_crc), "%s() at #%" PRIusz " failed: " "r is %" PRIusz " not %" PRIusz, pass_str, i, r, sizeof(buf_crc)); RD_UT_ASSERT(buf_crc == crc, "%s: invalid crc at #%" PRIusz ": expected %" PRIu32 ", read %" PRIu32, pass_str, i, crc, buf_crc); } read_crc = rd_crc32_finalize(crc); RD_UT_ASSERT(read_crc == write_crc, "%s: finalized read crc %" PRIu32 " != write crc %" PRIu32, pass_str, read_crc, write_crc); } r = rd_slice_remains(&slice); RD_UT_ASSERT(r == 0, "slice remains %" PRIusz ", should be %" PRIusz, r, (size_t)0); rd_buf_destroy(&b); RD_UT_PASS(); } #define do_unittest_iov_verify(...) \ do { \ int __fail = do_unittest_iov_verify0(__VA_ARGS__); \ RD_UT_ASSERT(!__fail, "iov_verify() failed"); \ } while (0) static int do_unittest_iov_verify0(rd_buf_t *b, size_t exp_iovcnt, size_t exp_totsize) { #define MY_IOV_MAX 16 struct iovec iov[MY_IOV_MAX]; size_t iovcnt; size_t i; size_t totsize, sum; rd_assert(exp_iovcnt <= MY_IOV_MAX); totsize = rd_buf_get_write_iov(b, iov, &iovcnt, MY_IOV_MAX, exp_totsize); RD_UT_ASSERT(totsize >= exp_totsize, "iov total size %" PRIusz " expected >= %" PRIusz, totsize, exp_totsize); RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX, "iovcnt %" PRIusz ", expected %" PRIusz " < x <= MY_IOV_MAX", iovcnt, exp_iovcnt); sum = 0; for (i = 0; i < iovcnt; i++) { RD_UT_ASSERT(iov[i].iov_base, "iov #%" PRIusz " iov_base not set", i); RD_UT_ASSERT(iov[i].iov_len, "iov #%" PRIusz " iov_len %" PRIusz " out of range", i, iov[i].iov_len); sum += iov[i].iov_len; RD_UT_ASSERT(sum <= totsize, "sum %" PRIusz " > totsize %" PRIusz, sum, totsize); } RD_UT_ASSERT(sum == totsize, "sum %" PRIusz " != totsize %" PRIusz, sum, totsize); return 0; } /** * @brief Verify that buffer to iovec conversion works. */ static int do_unittest_write_iov(void) { rd_buf_t b; rd_buf_init(&b, 0, 0); rd_buf_write_ensure(&b, 100, 100); do_unittest_iov_verify(&b, 1, 100); /* Add a secondary buffer */ rd_buf_write_ensure(&b, 30000, 0); do_unittest_iov_verify(&b, 2, 100 + 30000); rd_buf_destroy(&b); RD_UT_PASS(); } /** * @brief Verify that erasing parts of the buffer works. */ static int do_unittest_erase(void) { static const struct { const char *segs[4]; const char *writes[4]; struct { size_t of; size_t size; size_t retsize; } erasures[4]; const char *expect; } in[] = {/* 12|3|45 * x x xx */ { .segs = {"12", "3", "45"}, .erasures = {{1, 4, 4}}, .expect = "1", }, /* 12|3|45 * xx */ { .segs = {"12", "3", "45"}, .erasures = {{0, 2, 2}}, .expect = "345", }, /* 12|3|45 * xx */ { .segs = {"12", "3", "45"}, .erasures = {{3, 2, 2}}, .expect = "123", }, /* 12|3|45 * x * 1 |3|45 * x * 1 | 45 * x */ { .segs = {"12", "3", "45"}, .erasures = {{1, 1, 1}, {1, 1, 1}, {2, 1, 1}}, .expect = "14", }, /* 12|3|45 * xxxxxxx */ { .segs = {"12", "3", "45"}, .erasures = {{0, 5, 5}}, .expect = "", }, /* 12|3|45 * x */ { .segs = {"12", "3", "45"}, .erasures = {{0, 1, 1}}, .expect = "2345", }, /* 12|3|45 * x */ { .segs = {"12", "3", "45"}, .erasures = {{4, 1, 1}}, .expect = "1234", }, /* 12|3|45 * x */ { .segs = {"12", "3", "45"}, .erasures = {{5, 10, 0}}, .expect = "12345", }, /* 12|3|45 * xxx */ { .segs = {"12", "3", "45"}, .erasures = {{4, 3, 1}, {4, 3, 0}, {4, 3, 0}}, .expect = "1234", }, /* 1 * xxx */ { .segs = {"1"}, .erasures = {{0, 3, 1}}, .expect = "", }, /* 123456 * xxxxxx */ { .segs = {"123456"}, .erasures = {{0, 6, 6}}, .expect = "", }, /* 123456789a * xxx */ { .segs = {"123456789a"}, .erasures = {{4, 3, 3}}, .expect = "123489a", }, /* 1234|5678 * x xx */ {.segs = {"1234", "5678"}, .erasures = {{3, 3, 3}}, .writes = {"9abc"}, .expect = "123789abc"}, {.expect = NULL}}; int i; for (i = 0; in[i].expect; i++) { rd_buf_t b; rd_slice_t s; size_t expsz = strlen(in[i].expect); char *out; int j; size_t r; int r2; rd_buf_init(&b, 0, 0); /* Write segments to buffer */ for (j = 0; in[i].segs[j]; j++) rd_buf_push_writable(&b, rd_strdup(in[i].segs[j]), strlen(in[i].segs[j]), rd_free); /* Perform erasures */ for (j = 0; in[i].erasures[j].retsize; j++) { r = rd_buf_erase(&b, in[i].erasures[j].of, in[i].erasures[j].size); RD_UT_ASSERT(r == in[i].erasures[j].retsize, "expected retsize %" PRIusz " for i=%d,j=%d" ", not %" PRIusz, in[i].erasures[j].retsize, i, j, r); } /* Perform writes */ for (j = 0; in[i].writes[j]; j++) rd_buf_write(&b, in[i].writes[j], strlen(in[i].writes[j])); RD_UT_ASSERT(expsz == rd_buf_len(&b), "expected buffer to be %" PRIusz " bytes, not " "%" PRIusz " for i=%d", expsz, rd_buf_len(&b), i); /* Read back and verify */ r2 = rd_slice_init(&s, &b, 0, rd_buf_len(&b)); RD_UT_ASSERT((r2 == -1 && rd_buf_len(&b) == 0) || (r2 == 0 && rd_buf_len(&b) > 0), "slice_init(%" PRIusz ") returned %d for i=%d", rd_buf_len(&b), r2, i); if (r2 == -1) continue; /* Empty buffer */ RD_UT_ASSERT(expsz == rd_slice_size(&s), "expected slice to be %" PRIusz " bytes, not %" PRIusz " for i=%d", expsz, rd_slice_size(&s), i); out = rd_malloc(expsz); r = rd_slice_read(&s, out, expsz); RD_UT_ASSERT(r == expsz, "expected to read %" PRIusz " bytes, not %" PRIusz " for i=%d", expsz, r, i); RD_UT_ASSERT(!memcmp(out, in[i].expect, expsz), "Expected \"%.*s\", not \"%.*s\" for i=%d", (int)expsz, in[i].expect, (int)r, out, i); rd_free(out); RD_UT_ASSERT(rd_slice_remains(&s) == 0, "expected no remaining bytes in slice, but got " "%" PRIusz " for i=%d", rd_slice_remains(&s), i); rd_buf_destroy(&b); } RD_UT_PASS(); } int unittest_rdbuf(void) { int fails = 0; fails += do_unittest_write_read(); fails += do_unittest_write_split_seek(); fails += do_unittest_write_read_payload_correctness(); fails += do_unittest_write_iov(); fails += do_unittest_erase(); return fails; }