diff options
Diffstat (limited to 'lib/stream.c')
-rw-r--r-- | lib/stream.c | 1389 |
1 files changed, 1389 insertions, 0 deletions
diff --git a/lib/stream.c b/lib/stream.c new file mode 100644 index 0000000..2de3abd --- /dev/null +++ b/lib/stream.c @@ -0,0 +1,1389 @@ +/* + * Packet interface + * Copyright (C) 1999 Kunihiro Ishiguro + * + * This file is part of GNU Zebra. + * + * GNU Zebra is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * GNU Zebra is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; see the file COPYING; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <zebra.h> +#include <stddef.h> +#include <pthread.h> + +#include "stream.h" +#include "memory.h" +#include "network.h" +#include "prefix.h" +#include "log.h" +#include "frr_pthread.h" +#include "lib_errors.h" + +DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream"); +DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO"); + +/* Tests whether a position is valid */ +#define GETP_VALID(S, G) ((G) <= (S)->endp) +#define PUT_AT_VALID(S,G) GETP_VALID(S,G) +#define ENDP_VALID(S, E) ((E) <= (S)->size) + +/* asserting sanity checks. Following must be true before + * stream functions are called: + * + * Following must always be true of stream elements + * before and after calls to stream functions: + * + * getp <= endp <= size + * + * Note that after a stream function is called following may be true: + * if (getp == endp) then stream is no longer readable + * if (endp == size) then stream is no longer writeable + * + * It is valid to put to anywhere within the size of the stream, but only + * using stream_put..._at() functions. + */ +#define STREAM_WARN_OFFSETS(S) \ + do { \ + flog_warn(EC_LIB_STREAM, \ + "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \ + (void *)(S), (unsigned long)(S)->size, \ + (unsigned long)(S)->getp, (unsigned long)(S)->endp); \ + zlog_backtrace(LOG_WARNING); \ + } while (0) + +#define STREAM_VERIFY_SANE(S) \ + do { \ + if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \ + STREAM_WARN_OFFSETS(S); \ + } \ + assert(GETP_VALID(S, (S)->getp)); \ + assert(ENDP_VALID(S, (S)->endp)); \ + } while (0) + +#define STREAM_BOUND_WARN(S, WHAT) \ + do { \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ + STREAM_WARN_OFFSETS(S); \ + assert(0); \ + } while (0) + +#define STREAM_BOUND_WARN2(S, WHAT) \ + do { \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ + STREAM_WARN_OFFSETS(S); \ + } while (0) + +/* XXX: Deprecated macro: do not use */ +#define CHECK_SIZE(S, Z) \ + do { \ + if (((S)->endp + (Z)) > (S)->size) { \ + flog_warn( \ + EC_LIB_STREAM, \ + "CHECK_SIZE: truncating requested size %lu", \ + (unsigned long)(Z)); \ + STREAM_WARN_OFFSETS(S); \ + (Z) = (S)->size - (S)->endp; \ + } \ + } while (0); + +/* Make stream buffer. */ +struct stream *stream_new(size_t size) +{ + struct stream *s; + + assert(size > 0); + + s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); + + s->getp = s->endp = 0; + s->next = NULL; + s->size = size; + return s; +} + +/* Free it now. */ +void stream_free(struct stream *s) +{ + if (!s) + return; + + XFREE(MTYPE_STREAM, s); +} + +struct stream *stream_copy(struct stream *dest, const struct stream *src) +{ + STREAM_VERIFY_SANE(src); + + assert(dest != NULL); + assert(STREAM_SIZE(dest) >= src->endp); + + dest->endp = src->endp; + dest->getp = src->getp; + + memcpy(dest->data, src->data, src->endp); + + return dest; +} + +struct stream *stream_dup(const struct stream *s) +{ + struct stream *snew; + + STREAM_VERIFY_SANE(s); + + snew = stream_new(s->endp); + + return (stream_copy(snew, s)); +} + +struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2, + size_t offset) +{ + struct stream *new; + + STREAM_VERIFY_SANE(s1); + STREAM_VERIFY_SANE(s2); + + if ((new = stream_new(s1->endp + s2->endp)) == NULL) + return NULL; + + memcpy(new->data, s1->data, offset); + memcpy(new->data + offset, s2->data, s2->endp); + memcpy(new->data + offset + s2->endp, s1->data + offset, + (s1->endp - offset)); + new->endp = s1->endp + s2->endp; + return new; +} + +size_t stream_resize_inplace(struct stream **sptr, size_t newsize) +{ + struct stream *orig = *sptr; + + STREAM_VERIFY_SANE(orig); + + orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); + + orig->size = newsize; + + if (orig->endp > orig->size) + orig->endp = orig->size; + if (orig->getp > orig->endp) + orig->getp = orig->endp; + + STREAM_VERIFY_SANE(orig); + + *sptr = orig; + return orig->size; +} + +size_t stream_get_getp(const struct stream *s) +{ + STREAM_VERIFY_SANE(s); + return s->getp; +} + +size_t stream_get_endp(const struct stream *s) +{ + STREAM_VERIFY_SANE(s); + return s->endp; +} + +size_t stream_get_size(const struct stream *s) +{ + STREAM_VERIFY_SANE(s); + return s->size; +} + +/* Stream structre' stream pointer related functions. */ +void stream_set_getp(struct stream *s, size_t pos) +{ + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, pos)) { + STREAM_BOUND_WARN(s, "set getp"); + pos = s->endp; + } + + s->getp = pos; +} + +void stream_set_endp(struct stream *s, size_t pos) +{ + STREAM_VERIFY_SANE(s); + + if (!ENDP_VALID(s, pos)) { + STREAM_BOUND_WARN(s, "set endp"); + return; + } + + /* + * Make sure the current read pointer is not beyond the new endp. + */ + if (s->getp > pos) { + STREAM_BOUND_WARN(s, "set endp"); + return; + } + + s->endp = pos; + STREAM_VERIFY_SANE(s); +} + +/* Forward pointer. */ +void stream_forward_getp(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, s->getp + size)) { + STREAM_BOUND_WARN(s, "seek getp"); + return; + } + + s->getp += size; +} + +bool stream_forward_getp2(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, s->getp + size)) + return false; + + s->getp += size; + + return true; +} + +void stream_rewind_getp(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (size > s->getp || !GETP_VALID(s, s->getp - size)) { + STREAM_BOUND_WARN(s, "rewind getp"); + return; + } + + s->getp -= size; +} + +bool stream_rewind_getp2(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (size > s->getp || !GETP_VALID(s, s->getp - size)) + return false; + + s->getp -= size; + + return true; +} + +void stream_forward_endp(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (!ENDP_VALID(s, s->endp + size)) { + STREAM_BOUND_WARN(s, "seek endp"); + return; + } + + s->endp += size; +} + +bool stream_forward_endp2(struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (!ENDP_VALID(s, s->endp + size)) + return false; + + s->endp += size; + + return true; +} + +/* Copy from stream to destination. */ +bool stream_get2(void *dst, struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < size) { + STREAM_BOUND_WARN2(s, "get"); + return false; + } + + memcpy(dst, s->data + s->getp, size); + s->getp += size; + + return true; +} + +void stream_get(void *dst, struct stream *s, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < size) { + STREAM_BOUND_WARN(s, "get"); + return; + } + + memcpy(dst, s->data + s->getp, size); + s->getp += size; +} + +/* Get next character from the stream. */ +bool stream_getc2(struct stream *s, uint8_t *byte) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint8_t)) { + STREAM_BOUND_WARN2(s, "get char"); + return false; + } + *byte = s->data[s->getp++]; + + return true; +} + +uint8_t stream_getc(struct stream *s) +{ + uint8_t c; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint8_t)) { + STREAM_BOUND_WARN(s, "get char"); + return 0; + } + c = s->data[s->getp++]; + + return c; +} + +/* Get next character from the stream. */ +uint8_t stream_getc_from(struct stream *s, size_t from) +{ + uint8_t c; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + sizeof(uint8_t))) { + STREAM_BOUND_WARN(s, "get char"); + return 0; + } + + c = s->data[from]; + + return c; +} + +bool stream_getw2(struct stream *s, uint16_t *word) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint16_t)) { + STREAM_BOUND_WARN2(s, "get "); + return false; + } + + *word = s->data[s->getp++] << 8; + *word |= s->data[s->getp++]; + + return true; +} + +/* Get next word from the stream. */ +uint16_t stream_getw(struct stream *s) +{ + uint16_t w; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint16_t)) { + STREAM_BOUND_WARN(s, "get "); + return 0; + } + + w = s->data[s->getp++] << 8; + w |= s->data[s->getp++]; + + return w; +} + +/* Get next word from the stream. */ +uint16_t stream_getw_from(struct stream *s, size_t from) +{ + uint16_t w; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + sizeof(uint16_t))) { + STREAM_BOUND_WARN(s, "get "); + return 0; + } + + w = s->data[from++] << 8; + w |= s->data[from]; + + return w; +} + +/* Get next 3-byte from the stream. */ +uint32_t stream_get3_from(struct stream *s, size_t from) +{ + uint32_t l; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + 3)) { + STREAM_BOUND_WARN(s, "get 3byte"); + return 0; + } + + l = s->data[from++] << 16; + l |= s->data[from++] << 8; + l |= s->data[from]; + + return l; +} + +uint32_t stream_get3(struct stream *s) +{ + uint32_t l; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < 3) { + STREAM_BOUND_WARN(s, "get 3byte"); + return 0; + } + + l = s->data[s->getp++] << 16; + l |= s->data[s->getp++] << 8; + l |= s->data[s->getp++]; + + return l; +} + +/* Get next long word from the stream. */ +uint32_t stream_getl_from(struct stream *s, size_t from) +{ + uint32_t l; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + sizeof(uint32_t))) { + STREAM_BOUND_WARN(s, "get long"); + return 0; + } + + l = (unsigned)(s->data[from++]) << 24; + l |= s->data[from++] << 16; + l |= s->data[from++] << 8; + l |= s->data[from]; + + return l; +} + +/* Copy from stream at specific location to destination. */ +void stream_get_from(void *dst, struct stream *s, size_t from, size_t size) +{ + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + size)) { + STREAM_BOUND_WARN(s, "get from"); + return; + } + + memcpy(dst, s->data + from, size); +} + +bool stream_getl2(struct stream *s, uint32_t *l) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN2(s, "get long"); + return false; + } + + *l = (unsigned int)(s->data[s->getp++]) << 24; + *l |= s->data[s->getp++] << 16; + *l |= s->data[s->getp++] << 8; + *l |= s->data[s->getp++]; + + return true; +} + +uint32_t stream_getl(struct stream *s) +{ + uint32_t l; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN(s, "get long"); + return 0; + } + + l = (unsigned)(s->data[s->getp++]) << 24; + l |= s->data[s->getp++] << 16; + l |= s->data[s->getp++] << 8; + l |= s->data[s->getp++]; + + return l; +} + +/* Get next quad word from the stream. */ +uint64_t stream_getq_from(struct stream *s, size_t from) +{ + uint64_t q; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID(s, from + sizeof(uint64_t))) { + STREAM_BOUND_WARN(s, "get quad"); + return 0; + } + + q = ((uint64_t)s->data[from++]) << 56; + q |= ((uint64_t)s->data[from++]) << 48; + q |= ((uint64_t)s->data[from++]) << 40; + q |= ((uint64_t)s->data[from++]) << 32; + q |= ((uint64_t)s->data[from++]) << 24; + q |= ((uint64_t)s->data[from++]) << 16; + q |= ((uint64_t)s->data[from++]) << 8; + q |= ((uint64_t)s->data[from++]); + + return q; +} + +uint64_t stream_getq(struct stream *s) +{ + uint64_t q; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint64_t)) { + STREAM_BOUND_WARN(s, "get quad"); + return 0; + } + + q = ((uint64_t)s->data[s->getp++]) << 56; + q |= ((uint64_t)s->data[s->getp++]) << 48; + q |= ((uint64_t)s->data[s->getp++]) << 40; + q |= ((uint64_t)s->data[s->getp++]) << 32; + q |= ((uint64_t)s->data[s->getp++]) << 24; + q |= ((uint64_t)s->data[s->getp++]) << 16; + q |= ((uint64_t)s->data[s->getp++]) << 8; + q |= ((uint64_t)s->data[s->getp++]); + + return q; +} + +bool stream_getq2(struct stream *s, uint64_t *q) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint64_t)) { + STREAM_BOUND_WARN2(s, "get uint64"); + return false; + } + + *q = ((uint64_t)s->data[s->getp++]) << 56; + *q |= ((uint64_t)s->data[s->getp++]) << 48; + *q |= ((uint64_t)s->data[s->getp++]) << 40; + *q |= ((uint64_t)s->data[s->getp++]) << 32; + *q |= ((uint64_t)s->data[s->getp++]) << 24; + *q |= ((uint64_t)s->data[s->getp++]) << 16; + *q |= ((uint64_t)s->data[s->getp++]) << 8; + *q |= ((uint64_t)s->data[s->getp++]); + + return true; +} + +/* Get next long word from the stream. */ +uint32_t stream_get_ipv4(struct stream *s) +{ + uint32_t l; + + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN(s, "get ipv4"); + return 0; + } + + memcpy(&l, s->data + s->getp, sizeof(uint32_t)); + s->getp += sizeof(uint32_t); + + return l; +} + +bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip) +{ + uint16_t ipa_len; + + STREAM_VERIFY_SANE(s); + + /* Get address type. */ + if (STREAM_READABLE(s) < sizeof(uint16_t)) { + STREAM_BOUND_WARN2(s, "get ipaddr"); + return false; + } + ip->ipa_type = stream_getw(s); + + /* Get address value. */ + switch (ip->ipa_type) { + case IPADDR_V4: + ipa_len = IPV4_MAX_BYTELEN; + break; + case IPADDR_V6: + ipa_len = IPV6_MAX_BYTELEN; + break; + default: + flog_err(EC_LIB_DEVELOPMENT, + "%s: unknown ip address-family: %u", __func__, + ip->ipa_type); + return false; + } + if (STREAM_READABLE(s) < ipa_len) { + STREAM_BOUND_WARN2(s, "get ipaddr"); + return false; + } + memcpy(&ip->ip, s->data + s->getp, ipa_len); + s->getp += ipa_len; + + return true; +} + +float stream_getf(struct stream *s) +{ + union { + float r; + uint32_t d; + } u; + u.d = stream_getl(s); + return u.r; +} + +double stream_getd(struct stream *s) +{ + union { + double r; + uint64_t d; + } u; + u.d = stream_getq(s); + return u.r; +} + +/* Copy from source to stream. + * + * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap + * around. This should be fixed once the stream updates are working. + * + * stream_write() is saner + */ +void stream_put(struct stream *s, const void *src, size_t size) +{ + + /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */ + CHECK_SIZE(s, size); + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + return; + } + + if (src) + memcpy(s->data + s->endp, src, size); + else + memset(s->data + s->endp, 0, size); + + s->endp += size; +} + +/* Put character to the stream. */ +int stream_putc(struct stream *s, uint8_t c) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[s->endp++] = c; + return sizeof(uint8_t); +} + +/* Put word to the stream. */ +int stream_putw(struct stream *s, uint16_t w) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[s->endp++] = (uint8_t)(w >> 8); + s->data[s->endp++] = (uint8_t)w; + + return 2; +} + +/* Put long word to the stream. */ +int stream_put3(struct stream *s, uint32_t l) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < 3) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[s->endp++] = (uint8_t)(l >> 16); + s->data[s->endp++] = (uint8_t)(l >> 8); + s->data[s->endp++] = (uint8_t)l; + + return 3; +} + +/* Put long word to the stream. */ +int stream_putl(struct stream *s, uint32_t l) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[s->endp++] = (uint8_t)(l >> 24); + s->data[s->endp++] = (uint8_t)(l >> 16); + s->data[s->endp++] = (uint8_t)(l >> 8); + s->data[s->endp++] = (uint8_t)l; + + return 4; +} + +/* Put quad word to the stream. */ +int stream_putq(struct stream *s, uint64_t q) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) { + STREAM_BOUND_WARN(s, "put quad"); + return 0; + } + + s->data[s->endp++] = (uint8_t)(q >> 56); + s->data[s->endp++] = (uint8_t)(q >> 48); + s->data[s->endp++] = (uint8_t)(q >> 40); + s->data[s->endp++] = (uint8_t)(q >> 32); + s->data[s->endp++] = (uint8_t)(q >> 24); + s->data[s->endp++] = (uint8_t)(q >> 16); + s->data[s->endp++] = (uint8_t)(q >> 8); + s->data[s->endp++] = (uint8_t)q; + + return 8; +} + +int stream_putf(struct stream *s, float f) +{ + union { + float i; + uint32_t o; + } u; + u.i = f; + return stream_putl(s, u.o); +} + +int stream_putd(struct stream *s, double d) +{ + union { + double i; + uint64_t o; + } u; + u.i = d; + return stream_putq(s, u.o); +} + +int stream_putc_at(struct stream *s, size_t putp, uint8_t c) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[putp] = c; + + return 1; +} + +int stream_putw_at(struct stream *s, size_t putp, uint16_t w) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + s->data[putp] = (uint8_t)(w >> 8); + s->data[putp + 1] = (uint8_t)w; + + return 2; +} + +int stream_put3_at(struct stream *s, size_t putp, uint32_t l) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + 3)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + s->data[putp] = (uint8_t)(l >> 16); + s->data[putp + 1] = (uint8_t)(l >> 8); + s->data[putp + 2] = (uint8_t)l; + + return 3; +} + +int stream_putl_at(struct stream *s, size_t putp, uint32_t l) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + s->data[putp] = (uint8_t)(l >> 24); + s->data[putp + 1] = (uint8_t)(l >> 16); + s->data[putp + 2] = (uint8_t)(l >> 8); + s->data[putp + 3] = (uint8_t)l; + + return 4; +} + +int stream_putq_at(struct stream *s, size_t putp, uint64_t q) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + s->data[putp] = (uint8_t)(q >> 56); + s->data[putp + 1] = (uint8_t)(q >> 48); + s->data[putp + 2] = (uint8_t)(q >> 40); + s->data[putp + 3] = (uint8_t)(q >> 32); + s->data[putp + 4] = (uint8_t)(q >> 24); + s->data[putp + 5] = (uint8_t)(q >> 16); + s->data[putp + 6] = (uint8_t)(q >> 8); + s->data[putp + 7] = (uint8_t)q; + + return 8; +} + +/* Put long word to the stream. */ +int stream_put_ipv4(struct stream *s, uint32_t l) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + memcpy(s->data + s->endp, &l, sizeof(uint32_t)); + s->endp += sizeof(uint32_t); + + return sizeof(uint32_t); +} + +/* Put long word to the stream. */ +int stream_put_in_addr(struct stream *s, const struct in_addr *addr) +{ + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + memcpy(s->data + s->endp, addr, sizeof(uint32_t)); + s->endp += sizeof(uint32_t); + + return sizeof(uint32_t); +} + +bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip) +{ + stream_putw(s, ip->ipa_type); + + switch (ip->ipa_type) { + case IPADDR_V4: + stream_put_in_addr(s, &ip->ipaddr_v4); + break; + case IPADDR_V6: + stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16); + break; + default: + flog_err(EC_LIB_DEVELOPMENT, + "%s: unknown ip address-family: %u", __func__, + ip->ipa_type); + return false; + } + + return true; +} + +/* Put in_addr at location in the stream. */ +int stream_put_in_addr_at(struct stream *s, size_t putp, + const struct in_addr *addr) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + 4)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + memcpy(&s->data[putp], addr, 4); + return 4; +} + +/* Put in6_addr at location in the stream. */ +int stream_put_in6_addr_at(struct stream *s, size_t putp, + const struct in6_addr *addr) +{ + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID(s, putp + 16)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + memcpy(&s->data[putp], addr, 16); + return 16; +} + +/* Put prefix by nlri type format. */ +int stream_put_prefix_addpath(struct stream *s, const struct prefix *p, + bool addpath_capable, uint32_t addpath_tx_id) +{ + size_t psize; + size_t psize_with_addpath; + + STREAM_VERIFY_SANE(s); + + psize = PSIZE(p->prefixlen); + + if (addpath_capable) + psize_with_addpath = psize + 4; + else + psize_with_addpath = psize; + + if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + if (addpath_capable) { + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8); + s->data[s->endp++] = (uint8_t)addpath_tx_id; + } + + s->data[s->endp++] = p->prefixlen; + memcpy(s->data + s->endp, &p->u.prefix, psize); + s->endp += psize; + + return psize; +} + +int stream_put_prefix(struct stream *s, const struct prefix *p) +{ + return stream_put_prefix_addpath(s, p, 0, 0); +} + +/* Put NLRI with label */ +int stream_put_labeled_prefix(struct stream *s, const struct prefix *p, + mpls_label_t *label, bool addpath_capable, + uint32_t addpath_tx_id) +{ + size_t psize; + size_t psize_with_addpath; + uint8_t *label_pnt = (uint8_t *)label; + + STREAM_VERIFY_SANE(s); + + psize = PSIZE(p->prefixlen); + psize_with_addpath = psize + (addpath_capable ? 4 : 0); + + if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + if (addpath_capable) { + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8); + s->data[s->endp++] = (uint8_t)addpath_tx_id; + } + + stream_putc(s, (p->prefixlen + 24)); + stream_putc(s, label_pnt[0]); + stream_putc(s, label_pnt[1]); + stream_putc(s, label_pnt[2]); + memcpy(s->data + s->endp, &p->u.prefix, psize); + s->endp += psize; + + return (psize + 3); +} + +/* Read size from fd. */ +int stream_read(struct stream *s, int fd, size_t size) +{ + int nbytes; + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + nbytes = readn(fd, s->data + s->endp, size); + + if (nbytes > 0) + s->endp += nbytes; + + return nbytes; +} + +ssize_t stream_read_try(struct stream *s, int fd, size_t size) +{ + ssize_t nbytes; + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + /* Fatal (not transient) error, since retrying will not help + (stream is too small to contain the desired data). */ + return -1; + } + + nbytes = read(fd, s->data + s->endp, size); + if (nbytes >= 0) { + s->endp += nbytes; + return nbytes; + } + /* Error: was it transient (return -2) or fatal (return -1)? */ + if (ERRNO_IO_RETRY(errno)) + return -2; + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); + return -1; +} + +/* Read up to size bytes into the stream from the fd, using recvmsgfrom + * whose arguments match the remaining arguments to this function + */ +ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags, + struct sockaddr *from, socklen_t *fromlen) +{ + ssize_t nbytes; + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + /* Fatal (not transient) error, since retrying will not help + (stream is too small to contain the desired data). */ + return -1; + } + + nbytes = recvfrom(fd, s->data + s->endp, size, flags, from, fromlen); + if (nbytes >= 0) { + s->endp += nbytes; + return nbytes; + } + /* Error: was it transient (return -2) or fatal (return -1)? */ + if (ERRNO_IO_RETRY(errno)) + return -2; + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); + return -1; +} + +/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting + * from endp. + * First iovec will be used to receive the data. + * Stream need not be empty. + */ +ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags, + size_t size) +{ + int nbytes; + struct iovec *iov; + + STREAM_VERIFY_SANE(s); + assert(msgh->msg_iovlen > 0); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + /* This is a logic error in the calling code: the stream is too + small + to hold the desired data! */ + return -1; + } + + iov = &(msgh->msg_iov[0]); + iov->iov_base = (s->data + s->endp); + iov->iov_len = size; + + nbytes = recvmsg(fd, msgh, flags); + + if (nbytes > 0) + s->endp += nbytes; + + return nbytes; +} + +/* Write data to buffer. */ +size_t stream_write(struct stream *s, const void *ptr, size_t size) +{ + + CHECK_SIZE(s, size); + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE(s) < size) { + STREAM_BOUND_WARN(s, "put"); + return 0; + } + + memcpy(s->data + s->endp, ptr, size); + s->endp += size; + + return size; +} + +/* Return current read pointer. + * DEPRECATED! + * Use stream_get_pnt_to if you must, but decoding streams properly + * is preferred + */ +uint8_t *stream_pnt(struct stream *s) +{ + STREAM_VERIFY_SANE(s); + return s->data + s->getp; +} + +/* Check does this stream empty? */ +int stream_empty(struct stream *s) +{ + STREAM_VERIFY_SANE(s); + + return (s->endp == 0); +} + +/* Reset stream. */ +void stream_reset(struct stream *s) +{ + STREAM_VERIFY_SANE(s); + + s->getp = s->endp = 0; +} + +/* Write stream contens to the file discriptor. */ +int stream_flush(struct stream *s, int fd) +{ + int nbytes; + + STREAM_VERIFY_SANE(s); + + nbytes = write(fd, s->data + s->getp, s->endp - s->getp); + + return nbytes; +} + +void stream_hexdump(const struct stream *s) +{ + zlog_hexdump(s->data, s->endp); +} + +/* Stream first in first out queue. */ + +struct stream_fifo *stream_fifo_new(void) +{ + struct stream_fifo *new; + + new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo)); + stream_fifo_init(new); + return new; +} + +void stream_fifo_init(struct stream_fifo *fifo) +{ + memset(fifo, 0, sizeof(struct stream_fifo)); + pthread_mutex_init(&fifo->mtx, NULL); +} + +/* Add new stream to fifo. */ +void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) +{ +#if defined DEV_BUILD + size_t max, curmax; +#endif + + if (fifo->tail) + fifo->tail->next = s; + else + fifo->head = s; + + fifo->tail = s; + fifo->tail->next = NULL; +#if !defined DEV_BUILD + atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); +#else + max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); + curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed); + if (max > curmax) + atomic_store_explicit(&fifo->max_count, max, + memory_order_relaxed); +#endif +} + +void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s) +{ + frr_with_mutex (&fifo->mtx) { + stream_fifo_push(fifo, s); + } +} + +/* Delete first stream from fifo. */ +struct stream *stream_fifo_pop(struct stream_fifo *fifo) +{ + struct stream *s; + + s = fifo->head; + + if (s) { + fifo->head = s->next; + + if (fifo->head == NULL) + fifo->tail = NULL; + + atomic_fetch_sub_explicit(&fifo->count, 1, + memory_order_release); + + /* ensure stream is scrubbed of references to this fifo */ + s->next = NULL; + } + + return s; +} + +struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + frr_with_mutex (&fifo->mtx) { + ret = stream_fifo_pop(fifo); + } + + return ret; +} + +struct stream *stream_fifo_head(struct stream_fifo *fifo) +{ + return fifo->head; +} + +struct stream *stream_fifo_head_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + frr_with_mutex (&fifo->mtx) { + ret = stream_fifo_head(fifo); + } + + return ret; +} + +void stream_fifo_clean(struct stream_fifo *fifo) +{ + struct stream *s; + struct stream *next; + + for (s = fifo->head; s; s = next) { + next = s->next; + stream_free(s); + } + fifo->head = fifo->tail = NULL; + atomic_store_explicit(&fifo->count, 0, memory_order_release); +} + +void stream_fifo_clean_safe(struct stream_fifo *fifo) +{ + frr_with_mutex (&fifo->mtx) { + stream_fifo_clean(fifo); + } +} + +size_t stream_fifo_count_safe(struct stream_fifo *fifo) +{ + return atomic_load_explicit(&fifo->count, memory_order_acquire); +} + +void stream_fifo_deinit(struct stream_fifo *fifo) +{ + stream_fifo_clean(fifo); + pthread_mutex_destroy(&fifo->mtx); +} + +void stream_fifo_free(struct stream_fifo *fifo) +{ + stream_fifo_deinit(fifo); + XFREE(MTYPE_STREAM_FIFO, fifo); +} + +void stream_pulldown(struct stream *s) +{ + size_t rlen = STREAM_READABLE(s); + + /* No more data, so just move the pointers. */ + if (rlen == 0) { + stream_reset(s); + return; + } + + /* Move the available data to the beginning. */ + memmove(s->data, &s->data[s->getp], rlen); + s->getp = 0; + s->endp = rlen; +} |