diff options
Diffstat (limited to 'src/ring.c')
-rw-r--r-- | src/ring.c | 682 |
1 files changed, 493 insertions, 189 deletions
@@ -22,11 +22,13 @@ #include <haproxy/api.h> #include <haproxy/applet.h> #include <haproxy/buf.h> +#include <haproxy/cfgparse.h> #include <haproxy/cli.h> #include <haproxy/ring.h> #include <haproxy/sc_strm.h> #include <haproxy/stconn.h> #include <haproxy/thread.h> +#include <haproxy/vecpair.h> /* context used to dump the contents of a ring via "show events" or "show errors" */ struct show_ring_ctx { @@ -35,117 +37,120 @@ struct show_ring_ctx { uint flags; /* set of RING_WF_* */ }; -/* Initialize a pre-allocated ring with the buffer area - * of size */ -void ring_init(struct ring *ring, void *area, size_t size) +/* Initialize a pre-allocated ring with the buffer area of size <size>. + * Makes the storage point to the indicated area and adjusts the declared + * ring size according to the position of the area in the storage. If <reset> + * is non-zero, the storage area is reset, otherwise it's left intact (except + * for the area origin pointer which is updated so that the area can come from + * an mmap()). + */ +void ring_init(struct ring *ring, void *area, size_t size, int reset) { - HA_RWLOCK_INIT(&ring->lock); - LIST_INIT(&ring->waiters); + MT_LIST_INIT(&ring->waiters); ring->readers_count = 0; - ring->buf = b_make(area, size, 0, 0); - /* write the initial RC byte */ - b_putchr(&ring->buf, 0); + ring->flags = 0; + ring->storage = area; + ring->pending = 0; + ring->waking = 0; + memset(&ring->queue, 0, sizeof(ring->queue)); + + if (reset) { + ring->storage->size = size - sizeof(*ring->storage); + ring->storage->rsvd = sizeof(*ring->storage); + ring->storage->head = 0; + ring->storage->tail = 0; + + /* write the initial RC byte */ + *ring->storage->area = 0; + ring->storage->tail = 1; + } } -/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on - * allocation failure. +/* Creates a ring and its storage area at address <area> for <size> bytes. + * If <area> is null, then it's allocated of the requested size. The ring + * storage struct is part of the area so the usable area is slightly reduced. + * However the storage is immediately adjacent to the struct so that the ring + * remains consistent on-disk. ring_free() will ignore such ring storages and + * will only release the ring part, so the caller is responsible for releasing + * them. If <reset> is non-zero, the storage area is reset, otherwise it's left + * intact. */ -struct ring *ring_new(size_t size) +struct ring *ring_make_from_area(void *area, size_t size, int reset) { struct ring *ring = NULL; - void *area = NULL; + uint flags = 0; - if (size < 2) - goto fail; + if (size < sizeof(*ring->storage) + 2) + return NULL; ring = malloc(sizeof(*ring)); if (!ring) goto fail; - area = malloc(size); + if (!area) + area = malloc(size); + else + flags |= RING_FL_MAPPED; + if (!area) goto fail; - ring_init(ring, area, size); + ring_init(ring, area, size, reset); + ring->flags |= flags; return ring; fail: - free(area); free(ring); return NULL; } -/* Creates a unified ring + storage area at address <area> for <size> bytes. - * If <area> is null, then it's allocated of the requested size. The ring - * struct is part of the area so the usable area is slightly reduced. However - * the ring storage is immediately adjacent to the struct. ring_free() will - * ignore such rings, so the caller is responsible for releasing them. - */ -struct ring *ring_make_from_area(void *area, size_t size) -{ - struct ring *ring = NULL; - - if (size < sizeof(*ring)) - return NULL; - - if (!area) - area = malloc(size); - if (!area) - return NULL; - - ring = area; - area += sizeof(*ring); - ring_init(ring, area, size - sizeof(*ring)); - return ring; -} - -/* Cast an unified ring + storage area to a ring from <area>, without - * reinitializing the data buffer. - * - * Reinitialize the waiters and the lock. +/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on + * allocation failure. The size is the area size, not the usable size. */ -struct ring *ring_cast_from_area(void *area) +struct ring *ring_new(size_t size) { - struct ring *ring = NULL; - - ring = area; - ring->buf.area = area + sizeof(*ring); - - HA_RWLOCK_INIT(&ring->lock); - LIST_INIT(&ring->waiters); - ring->readers_count = 0; - - return ring; + return ring_make_from_area(NULL, size, 1); } /* Resizes existing ring <ring> to <size> which must be larger, without losing * its contents. The new size must be at least as large as the previous one or * no change will be performed. The pointer to the ring is returned on success, - * or NULL on allocation failure. This will lock the ring for writes. + * or NULL on allocation failure. This will lock the ring for writes. The size + * is the allocated area size, and includes the ring_storage header. */ struct ring *ring_resize(struct ring *ring, size_t size) { - void *area; + struct ring_storage *old, *new; - if (b_size(&ring->buf) >= size) + if (size <= ring_data(ring) + sizeof(*ring->storage)) return ring; - area = malloc(size); - if (!area) + old = ring->storage; + new = malloc(size); + if (!new) return NULL; - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + thread_isolate(); - /* recheck the buffer's size, it may have changed during the malloc */ - if (b_size(&ring->buf) < size) { + /* recheck the ring's size, it may have changed during the malloc */ + if (size > ring_data(ring) + sizeof(*ring->storage)) { /* copy old contents */ - b_getblk(&ring->buf, area, ring->buf.data, 0); - area = HA_ATOMIC_XCHG(&ring->buf.area, area); - ring->buf.size = size; + struct ist v1, v2; + size_t len; + + vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail); + len = vp_size(v1, v2); + vp_peek_ofs(v1, v2, 0, new->area, len); + new->size = size - sizeof(*ring->storage); + new->rsvd = sizeof(*ring->storage); + new->head = 0; + new->tail = len; + new = HA_ATOMIC_XCHG(&ring->storage, new); } - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + thread_release(); - free(area); + /* free the unused one */ + free(new); return ring; } @@ -156,10 +161,8 @@ void ring_free(struct ring *ring) return; /* make sure it was not allocated by ring_make_from_area */ - if (ring->buf.area == (void *)ring + sizeof(*ring)) - return; - - free(ring->buf.area); + if (!(ring->flags & RING_FL_MAPPED)) + free(ring->storage); free(ring); } @@ -173,12 +176,20 @@ void ring_free(struct ring *ring) */ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) { - struct buffer *buf = &ring->buf; - struct appctx *appctx; - size_t totlen = 0; + struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr); + struct ring_wait_cell cell, *next_cell, *curr_cell; + size_t *tail_ptr = &ring->storage->tail; + size_t head_ofs, tail_ofs, new_tail_ofs; + size_t ring_size; + char *ring_area; + struct ist v1, v2; + size_t msglen = 0; size_t lenlen; + size_t needed; uint64_t dellen; int dellenlen; + uint8_t *lock_ptr; + uint8_t readers; ssize_t sent = 0; int i; @@ -191,20 +202,125 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz * copying due to the varint encoding of the length. */ for (i = 0; i < npfx; i++) - totlen += pfx[i].len; + msglen += pfx[i].len; for (i = 0; i < nmsg; i++) - totlen += msg[i].len; + msglen += msg[i].len; - if (totlen > maxlen) - totlen = maxlen; + if (msglen > maxlen) + msglen = maxlen; - lenlen = varint_bytes(totlen); + lenlen = varint_bytes(msglen); - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - if (lenlen + totlen + 1 + 1 > b_size(buf)) - goto done_buf; + /* We need: + * - lenlen bytes for the size encoding + * - msglen for the message + * - one byte for the new marker + * + * Note that we'll also reserve one extra byte to make sure we never + * leave a full buffer (the vec-to-ring conversion cannot be done if + * both areas are of size 0). + */ + needed = lenlen + msglen + 1; - while (b_room(buf) < lenlen + totlen + 1) { + /* these ones do not change under us (only resize affects them and it + * must be done under thread isolation). + */ + ring_area = ring->storage->area; + ring_size = ring->storage->size; + + if (needed + 1 > ring_size) + goto leave; + + cell.to_send_self = needed; + cell.needed_tot = 0; // only when non-zero the cell is considered ready. + cell.maxlen = msglen; + cell.pfx = pfx; + cell.npfx = npfx; + cell.msg = msg; + cell.nmsg = nmsg; + + /* insert our cell into the queue before the previous one. We may have + * to wait a bit if the queue's leader is attempting an election to win + * the tail, hence the busy value (should be rare enough). + */ + next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell); + + /* let's add the cumulated size of pending messages to ours */ + cell.next = next_cell; + if (next_cell) { + size_t next_needed; + + while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0) + __ha_cpu_relax_for_read(); + needed += next_needed; + } + + /* now <needed> will represent the size to store *all* messages. The + * atomic store may unlock a subsequent thread waiting for this one. + */ + HA_ATOMIC_STORE(&cell.needed_tot, needed); + + /* OK now we're the queue leader, it's our job to try to get ownership + * of the tail, if we succeeded above, we don't even enter the loop. If + * we failed, we set ourselves at the top the queue, waiting for the + * tail to be unlocked again. We stop doing that if another thread + * comes in and becomes the leader in turn. + */ + + /* Wait for another thread to take the lead or for the tail to + * be available again. It's critical to be read-only in this + * loop so as not to lose time synchronizing cache lines. Also, + * we must detect a new leader ASAP so that the fewest possible + * threads check the tail. + */ + + while (1) { + if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell) + goto wait_for_flush; + __ha_cpu_relax_for_read(); + +#if !defined(__ARM_FEATURE_ATOMICS) + /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */ + if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) { + __ha_cpu_relax_for_read(); + continue; + } +#endif + /* OK the queue is locked, let's attempt to get the tail lock */ + tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); + + /* did we get it ? */ + if (!(tail_ofs & RING_TAIL_LOCK)) { + /* Here we own the tail. We can go on if we're still the leader, + * which we'll confirm by trying to reset the queue. If we're + * still the leader, we're done. + */ + if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) + break; // Won! + + /* oops, no, let's give it back to another thread and wait. + * This does not happen often enough to warrant more complex + * approaches (tried already). + */ + HA_ATOMIC_STORE(tail_ptr, tail_ofs); + goto wait_for_flush; + } + __ha_cpu_relax_for_read(); + } + + head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); + + /* this is the byte before tail, it contains the users count */ + lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); + + /* Take the lock on the area. We're guaranteed to be the only writer + * here. + */ + readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE); + + vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + + while (vp_size(v1, v2) > ring_size - needed - 1 - 1) { /* we need to delete the oldest message (from the end), * and we have to stop if there's a reader stuck there. * Unless there's corruption in the buffer it's guaranteed @@ -212,50 +328,142 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz * varint-encoded length (1 byte min) and the message * payload (0 bytes min). */ - if (*b_head(buf)) - goto done_buf; - dellenlen = b_peek_varint(buf, 1, &dellen); + if (*_vp_head(v1, v2)) + break; + dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen); if (!dellenlen) - goto done_buf; - BUG_ON(b_data(buf) < 1 + dellenlen + dellen); - - b_del(buf, 1 + dellenlen + dellen); + break; + BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen); + vp_skip(&v1, &v2, 1 + dellenlen + dellen); } - /* OK now we do have room */ - __b_put_varint(buf, totlen); + /* now let's update the buffer with the new tail if our message will fit */ + new_tail_ofs = tail_ofs; + if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) { + vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + + /* update the new space in the buffer */ + HA_ATOMIC_STORE(&ring->storage->head, head_ofs); - totlen = 0; - for (i = 0; i < npfx; i++) { - size_t len = pfx[i].len; + /* calculate next tail pointer */ + new_tail_ofs += needed; + if (new_tail_ofs >= ring_size) + new_tail_ofs -= ring_size; - if (len + totlen > maxlen) - len = maxlen - totlen; - if (len) - __b_putblk(buf, pfx[i].ptr, len); - totlen += len; + /* reset next read counter before releasing writers */ + HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0); + } + else { + /* release readers right now, before writing the tail, so as + * not to expose the readers count byte to another writer. + */ + HA_ATOMIC_STORE(lock_ptr, readers); } - for (i = 0; i < nmsg; i++) { - size_t len = msg[i].len; + /* and release other writers */ + HA_ATOMIC_STORE(tail_ptr, new_tail_ofs); + + vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); + + if (likely(tail_ofs != new_tail_ofs)) { + /* the list stops on a NULL */ + for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) { + maxlen = curr_cell->maxlen; + pfx = curr_cell->pfx; + npfx = curr_cell->npfx; + msg = curr_cell->msg; + nmsg = curr_cell->nmsg; + + /* let's write the message size */ + vp_put_varint(&v1, &v2, maxlen); + + /* then write the messages */ + msglen = 0; + for (i = 0; i < npfx; i++) { + size_t len = pfx[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, pfx[i].ptr, len); + msglen += len; + } + + for (i = 0; i < nmsg; i++) { + size_t len = msg[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, msg[i].ptr, len); + msglen += len; + } + + /* for all but the last message we need to write the + * readers count byte. + */ + if (curr_cell->next) + vp_putchr(&v1, &v2, 0); + } + + /* now release */ + for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { + next_cell = HA_ATOMIC_LOAD(&curr_cell->next); + _HA_ATOMIC_STORE(&curr_cell->next, curr_cell); + } - if (len + totlen > maxlen) - len = maxlen - totlen; - if (len) - __b_putblk(buf, msg[i].ptr, len); - totlen += len; + /* unlock the message area */ + HA_ATOMIC_STORE(lock_ptr, readers); + } else { + /* messages were dropped, notify about this and release them */ + for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { + next_cell = HA_ATOMIC_LOAD(&curr_cell->next); + HA_ATOMIC_STORE(&curr_cell->to_send_self, 0); + _HA_ATOMIC_STORE(&curr_cell->next, curr_cell); + } } - *b_tail(buf) = 0; buf->data++; // new read counter - sent = lenlen + totlen + 1; + /* we must not write the trailing read counter, it was already done, + * plus we could ruin the one of the next writer. And the front was + * unlocked either at the top if the ring was full, or just above if it + * could be properly filled. + */ + + sent = cell.to_send_self; /* notify potential readers */ - list_for_each_entry(appctx, &ring->waiters, wait_entry) - appctx_wakeup(appctx); + if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) { + HA_ATOMIC_INC(&ring->pending); + while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) { + struct mt_list *elt1, elt2; + struct appctx *appctx; + + HA_ATOMIC_STORE(&ring->pending, 0); + mt_list_for_each_entry_safe(appctx, &ring->waiters, wait_entry, elt1, elt2) + appctx_wakeup(appctx); + HA_ATOMIC_STORE(&ring->waking, 0); + } + } - done_buf: - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + leave: return sent; + + wait_for_flush: + /* if we arrive here, it means we found another leader */ + + /* The leader will write our own pointer in the cell's next to + * mark it as released. Let's wait for this. + */ + do { + next_cell = HA_ATOMIC_LOAD(&cell.next); + } while (next_cell != &cell && __ha_cpu_relax_for_read()); + + /* OK our message was queued. Retrieving the sent size in the ring cell + * allows another leader thread to zero it if it finally couldn't send + * it (should only happen when using too small ring buffers to store + * all competing threads' messages at once). + */ + return HA_ATOMIC_LOAD(&cell.to_send_self); } /* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is @@ -270,7 +478,7 @@ int ring_attach(struct ring *ring) int users = ring->readers_count; do { - if (users >= 255) + if (users >= RING_MAX_READERS) return 0; } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1)); return 1; @@ -285,20 +493,22 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) if (!ring) return; - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + HA_ATOMIC_DEC(&ring->readers_count); + if (ofs != ~0) { /* reader was still attached */ - if (ofs < b_head_ofs(&ring->buf)) - ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf); - else - ofs -= b_head_ofs(&ring->buf); - - BUG_ON(ofs >= b_size(&ring->buf)); - LIST_DEL_INIT(&appctx->wait_entry); - HA_ATOMIC_DEC(b_peek(&ring->buf, ofs)); + uint8_t *area = (uint8_t *)ring_area(ring); + uint8_t readers; + + BUG_ON(ofs >= ring_size(ring)); + MT_LIST_DELETE(&appctx->wait_entry); + + /* dec readers count */ + do { + readers = _HA_ATOMIC_LOAD(area + ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax()); } - HA_ATOMIC_DEC(&ring->readers_count); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); } /* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is @@ -313,7 +523,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) if (!ring_attach(ring)) return cli_err(appctx, - "Sorry, too many watchers (255) on this ring buffer. " + "Sorry, too many watchers (" TOSTR(RING_MAX_READERS) ") on this ring buffer. " "What could it have so interesting to attract so many watchers ?"); if (!appctx->io_handler) @@ -328,36 +538,29 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) return 0; } -/* This function dumps all events from the ring whose pointer is in <p0> into - * the appctx's output buffer, and takes from <o0> the seek offset into the - * buffer's history (0 for oldest known event). It looks at <i0> for boolean - * options: bit0 means it must wait for new data or any key to be pressed. Bit1 - * means it must seek directly to the end to wait for new contents. It returns - * 0 if the output buffer or events are missing is full and it needs to be - * called again, otherwise non-zero. It is meant to be used with - * cli_release_show_ring() to clean up. + +/* parses as many messages as possible from ring <ring>, starting at the offset + * stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to + * the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of + * the last known tail pointer will be copied there so that the caller may use + * this to detect new data have arrived since we left the function. Returns 0 + * if it needs to pause, 1 once finished. */ -int cli_io_handler_show_ring(struct appctx *appctx) +int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, + ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)) { - struct show_ring_ctx *ctx = appctx->svcctx; - struct stconn *sc = appctx_sc(appctx); - struct ring *ring = ctx->ring; - struct buffer *buf = &ring->buf; - size_t ofs; - size_t last_ofs; + size_t head_ofs, tail_ofs, prev_ofs; + size_t ring_size; + uint8_t *ring_area; + struct ist v1, v2; uint64_t msg_len; size_t len, cnt; + ssize_t copied; + uint8_t readers; int ret; - /* FIXME: Don't watch the other side !*/ - if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) - return 1; - - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_DEL_INIT(&appctx->wait_entry); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); - - HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); + ring_area = (uint8_t *)ring->storage->area; + ring_size = ring->storage->size; /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in @@ -365,59 +568,134 @@ int cli_io_handler_show_ring(struct appctx *appctx) * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This - * value cannot be produced after initialization. + * value cannot be produced after initialization. The first offset + * needs to be taken under isolation as it must not move while we're + * trying to catch it. */ - if (unlikely(ctx->ofs == ~0)) { - /* going to the end means looking at tail-1 */ - ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0); - HA_ATOMIC_INC(b_orig(buf) + ctx->ofs); + if (unlikely(*ofs_ptr == ~0)) { + thread_isolate(); + + head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); + tail_ofs = ring_tail(ring); + + if (flags & RING_WF_SEEK_NEW) { + /* going to the end means looking at tail-1 */ + head_ofs = tail_ofs + ring_size - 1; + if (head_ofs >= ring_size) + head_ofs -= ring_size; + } + + /* reserve our slot here (inc readers count) */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); + + thread_release(); + + /* store this precious offset in our context, and we're done */ + *ofs_ptr = head_ofs; } - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. + /* we have the guarantee we can restart from our own head */ + head_ofs = *ofs_ptr; + BUG_ON(head_ofs >= ring_size); + + /* the tail will continue to move but we're getting a safe value + * here that will continue to work. */ - ofs = ctx->ofs - b_head_ofs(buf); - if (ctx->ofs < b_head_ofs(buf)) - ofs += b_size(buf); + tail_ofs = ring_tail(ring); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); + /* we keep track of where we were and we don't release it before + * we've protected the next place. + */ + prev_ofs = head_ofs; - /* in this loop, ofs always points to the counter byte that precedes + /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to - * stop before the end (ret=0). + * stop before the end (ret=0). The reference is relative to the ring's + * origin, while pos is relative to the ring's head. */ ret = 1; - while (ofs + 1 < b_data(buf)) { + vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs); + + while (1) { + if (vp_size(v1, v2) <= 1) { + /* no more data */ + break; + } + + readers = _HA_ATOMIC_LOAD(_vp_addr(v1, v2, 0)); + if (readers > RING_MAX_READERS) { + /* we just met a writer which hasn't finished */ + break; + } + cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); + len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len); if (!len) break; cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - if (unlikely(msg_len + 1 > b_size(&trash))) { + BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2)); + + copied = msg_handler(ctx, v1, v2, cnt, msg_len); + if (copied == -2) { /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; + goto skip; } - - chunk_reset(&trash); - len = b_getblk(buf, trash.area, msg_len, ofs + cnt); - trash.data += len; - trash.area[trash.data++] = '\n'; - - if (applet_putchk(appctx, &trash) == -1) { + else if (copied == -1) { + /* output full */ ret = 0; break; } - ofs += cnt + msg_len; + skip: + vp_skip(&v1, &v2, cnt + msg_len); + } + + vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); + + if (head_ofs != prev_ofs) { + /* inc readers count on new place */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); + + /* dec readers count on old place */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax()); } - HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - ctx->ofs = b_peek_ofs(buf, ofs); - HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); + if (last_ofs_ptr) + *last_ofs_ptr = tail_ofs; + *ofs_ptr = head_ofs; + return ret; +} + +/* This function dumps all events from the ring whose pointer is in <p0> into + * the appctx's output buffer, and takes from <o0> the seek offset into the + * buffer's history (0 for oldest known event). It looks at <i0> for boolean + * options: bit0 means it must wait for new data or any key to be pressed. Bit1 + * means it must seek directly to the end to wait for new contents. It returns + * 0 if the output buffer or events are missing is full and it needs to be + * called again, otherwise non-zero. It is meant to be used with + * cli_release_show_ring() to clean up. + */ +int cli_io_handler_show_ring(struct appctx *appctx) +{ + struct show_ring_ctx *ctx = appctx->svcctx; + struct stconn *sc = appctx_sc(appctx); + struct ring *ring = ctx->ring; + size_t last_ofs; + size_t ofs; + int ret; + + MT_LIST_DELETE(&appctx->wait_entry); + + ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line); if (ret && (ctx->flags & RING_WF_WAIT_MODE)) { /* we've drained everything and are configured to wait for more @@ -425,10 +703,8 @@ int cli_io_handler_show_ring(struct appctx *appctx) */ if (!sc_oc(sc)->output && !(sc->flags & SC_FL_SHUT_DONE)) { /* let's be woken up once new data arrive */ - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_APPEND(&ring->waiters, &appctx->wait_entry); - ofs = b_tail_ofs(&ring->buf); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry); + ofs = ring_tail(ring); if (ofs != last_ofs) { /* more data was added into the ring between the * unlock and the lock, and the writer might not @@ -467,13 +743,41 @@ size_t ring_max_payload(const struct ring *ring) size_t max; /* initial max = bufsize - 1 (initial RC) - 1 (payload RC) */ - max = b_size(&ring->buf) - 1 - 1; + max = ring_size(ring) - 1 - 1; /* subtract payload VI (varint-encoded size) */ max -= varint_bytes(max); return max; } +/* config parser for global "tune.ring.queues", accepts a number from 0 to RING_WAIT_QUEUES */ +static int cfg_parse_tune_ring_queues(char **args, int section_type, struct proxy *curpx, + const struct proxy *defpx, const char *file, int line, + char **err) +{ + int queues; + + if (too_many_args(1, args, err, NULL)) + return -1; + + queues = atoi(args[1]); + if (queues < 0 || queues > RING_WAIT_QUEUES) { + memprintf(err, "'%s' expects a number between 0 and %d but got '%s'.", args[0], RING_WAIT_QUEUES, args[1]); + return -1; + } + + global.tune.ring_queues = queues; + return 0; +} + +/* config keyword parsers */ +static struct cfg_kw_list cfg_kws = {ILH, { + { CFG_GLOBAL, "tune.ring.queues", cfg_parse_tune_ring_queues }, + { 0, NULL, NULL } +}}; + +INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); + /* * Local variables: * c-indent-level: 8 |