summaryrefslogtreecommitdiffstats
path: root/src/ring.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
commitcff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch)
tree08c4fc3255483ad397d712edb4214ded49149fd9 /src/ring.c
parentAdding upstream version 2.9.7. (diff)
downloadhaproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.tar.xz
haproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.zip
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/ring.c')
-rw-r--r--src/ring.c682
1 files changed, 493 insertions, 189 deletions
diff --git a/src/ring.c b/src/ring.c
index 849221e..a580050 100644
--- a/src/ring.c
+++ b/src/ring.c
@@ -22,11 +22,13 @@
#include <haproxy/api.h>
#include <haproxy/applet.h>
#include <haproxy/buf.h>
+#include <haproxy/cfgparse.h>
#include <haproxy/cli.h>
#include <haproxy/ring.h>
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/thread.h>
+#include <haproxy/vecpair.h>
/* context used to dump the contents of a ring via "show events" or "show errors" */
struct show_ring_ctx {
@@ -35,117 +37,120 @@ struct show_ring_ctx {
uint flags; /* set of RING_WF_* */
};
-/* Initialize a pre-allocated ring with the buffer area
- * of size */
-void ring_init(struct ring *ring, void *area, size_t size)
+/* Initialize a pre-allocated ring with the buffer area of size <size>.
+ * Makes the storage point to the indicated area and adjusts the declared
+ * ring size according to the position of the area in the storage. If <reset>
+ * is non-zero, the storage area is reset, otherwise it's left intact (except
+ * for the area origin pointer which is updated so that the area can come from
+ * an mmap()).
+ */
+void ring_init(struct ring *ring, void *area, size_t size, int reset)
{
- HA_RWLOCK_INIT(&ring->lock);
- LIST_INIT(&ring->waiters);
+ MT_LIST_INIT(&ring->waiters);
ring->readers_count = 0;
- ring->buf = b_make(area, size, 0, 0);
- /* write the initial RC byte */
- b_putchr(&ring->buf, 0);
+ ring->flags = 0;
+ ring->storage = area;
+ ring->pending = 0;
+ ring->waking = 0;
+ memset(&ring->queue, 0, sizeof(ring->queue));
+
+ if (reset) {
+ ring->storage->size = size - sizeof(*ring->storage);
+ ring->storage->rsvd = sizeof(*ring->storage);
+ ring->storage->head = 0;
+ ring->storage->tail = 0;
+
+ /* write the initial RC byte */
+ *ring->storage->area = 0;
+ ring->storage->tail = 1;
+ }
}
-/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
- * allocation failure.
+/* Creates a ring and its storage area at address <area> for <size> bytes.
+ * If <area> is null, then it's allocated of the requested size. The ring
+ * storage struct is part of the area so the usable area is slightly reduced.
+ * However the storage is immediately adjacent to the struct so that the ring
+ * remains consistent on-disk. ring_free() will ignore such ring storages and
+ * will only release the ring part, so the caller is responsible for releasing
+ * them. If <reset> is non-zero, the storage area is reset, otherwise it's left
+ * intact.
*/
-struct ring *ring_new(size_t size)
+struct ring *ring_make_from_area(void *area, size_t size, int reset)
{
struct ring *ring = NULL;
- void *area = NULL;
+ uint flags = 0;
- if (size < 2)
- goto fail;
+ if (size < sizeof(*ring->storage) + 2)
+ return NULL;
ring = malloc(sizeof(*ring));
if (!ring)
goto fail;
- area = malloc(size);
+ if (!area)
+ area = malloc(size);
+ else
+ flags |= RING_FL_MAPPED;
+
if (!area)
goto fail;
- ring_init(ring, area, size);
+ ring_init(ring, area, size, reset);
+ ring->flags |= flags;
return ring;
fail:
- free(area);
free(ring);
return NULL;
}
-/* Creates a unified ring + storage area at address <area> for <size> bytes.
- * If <area> is null, then it's allocated of the requested size. The ring
- * struct is part of the area so the usable area is slightly reduced. However
- * the ring storage is immediately adjacent to the struct. ring_free() will
- * ignore such rings, so the caller is responsible for releasing them.
- */
-struct ring *ring_make_from_area(void *area, size_t size)
-{
- struct ring *ring = NULL;
-
- if (size < sizeof(*ring))
- return NULL;
-
- if (!area)
- area = malloc(size);
- if (!area)
- return NULL;
-
- ring = area;
- area += sizeof(*ring);
- ring_init(ring, area, size - sizeof(*ring));
- return ring;
-}
-
-/* Cast an unified ring + storage area to a ring from <area>, without
- * reinitializing the data buffer.
- *
- * Reinitialize the waiters and the lock.
+/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
+ * allocation failure. The size is the area size, not the usable size.
*/
-struct ring *ring_cast_from_area(void *area)
+struct ring *ring_new(size_t size)
{
- struct ring *ring = NULL;
-
- ring = area;
- ring->buf.area = area + sizeof(*ring);
-
- HA_RWLOCK_INIT(&ring->lock);
- LIST_INIT(&ring->waiters);
- ring->readers_count = 0;
-
- return ring;
+ return ring_make_from_area(NULL, size, 1);
}
/* Resizes existing ring <ring> to <size> which must be larger, without losing
* its contents. The new size must be at least as large as the previous one or
* no change will be performed. The pointer to the ring is returned on success,
- * or NULL on allocation failure. This will lock the ring for writes.
+ * or NULL on allocation failure. This will lock the ring for writes. The size
+ * is the allocated area size, and includes the ring_storage header.
*/
struct ring *ring_resize(struct ring *ring, size_t size)
{
- void *area;
+ struct ring_storage *old, *new;
- if (b_size(&ring->buf) >= size)
+ if (size <= ring_data(ring) + sizeof(*ring->storage))
return ring;
- area = malloc(size);
- if (!area)
+ old = ring->storage;
+ new = malloc(size);
+ if (!new)
return NULL;
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+ thread_isolate();
- /* recheck the buffer's size, it may have changed during the malloc */
- if (b_size(&ring->buf) < size) {
+ /* recheck the ring's size, it may have changed during the malloc */
+ if (size > ring_data(ring) + sizeof(*ring->storage)) {
/* copy old contents */
- b_getblk(&ring->buf, area, ring->buf.data, 0);
- area = HA_ATOMIC_XCHG(&ring->buf.area, area);
- ring->buf.size = size;
+ struct ist v1, v2;
+ size_t len;
+
+ vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail);
+ len = vp_size(v1, v2);
+ vp_peek_ofs(v1, v2, 0, new->area, len);
+ new->size = size - sizeof(*ring->storage);
+ new->rsvd = sizeof(*ring->storage);
+ new->head = 0;
+ new->tail = len;
+ new = HA_ATOMIC_XCHG(&ring->storage, new);
}
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ thread_release();
- free(area);
+ /* free the unused one */
+ free(new);
return ring;
}
@@ -156,10 +161,8 @@ void ring_free(struct ring *ring)
return;
/* make sure it was not allocated by ring_make_from_area */
- if (ring->buf.area == (void *)ring + sizeof(*ring))
- return;
-
- free(ring->buf.area);
+ if (!(ring->flags & RING_FL_MAPPED))
+ free(ring->storage);
free(ring);
}
@@ -173,12 +176,20 @@ void ring_free(struct ring *ring)
*/
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
- struct buffer *buf = &ring->buf;
- struct appctx *appctx;
- size_t totlen = 0;
+ struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr);
+ struct ring_wait_cell cell, *next_cell, *curr_cell;
+ size_t *tail_ptr = &ring->storage->tail;
+ size_t head_ofs, tail_ofs, new_tail_ofs;
+ size_t ring_size;
+ char *ring_area;
+ struct ist v1, v2;
+ size_t msglen = 0;
size_t lenlen;
+ size_t needed;
uint64_t dellen;
int dellenlen;
+ uint8_t *lock_ptr;
+ uint8_t readers;
ssize_t sent = 0;
int i;
@@ -191,20 +202,125 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
* copying due to the varint encoding of the length.
*/
for (i = 0; i < npfx; i++)
- totlen += pfx[i].len;
+ msglen += pfx[i].len;
for (i = 0; i < nmsg; i++)
- totlen += msg[i].len;
+ msglen += msg[i].len;
- if (totlen > maxlen)
- totlen = maxlen;
+ if (msglen > maxlen)
+ msglen = maxlen;
- lenlen = varint_bytes(totlen);
+ lenlen = varint_bytes(msglen);
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- if (lenlen + totlen + 1 + 1 > b_size(buf))
- goto done_buf;
+ /* We need:
+ * - lenlen bytes for the size encoding
+ * - msglen for the message
+ * - one byte for the new marker
+ *
+ * Note that we'll also reserve one extra byte to make sure we never
+ * leave a full buffer (the vec-to-ring conversion cannot be done if
+ * both areas are of size 0).
+ */
+ needed = lenlen + msglen + 1;
- while (b_room(buf) < lenlen + totlen + 1) {
+ /* these ones do not change under us (only resize affects them and it
+ * must be done under thread isolation).
+ */
+ ring_area = ring->storage->area;
+ ring_size = ring->storage->size;
+
+ if (needed + 1 > ring_size)
+ goto leave;
+
+ cell.to_send_self = needed;
+ cell.needed_tot = 0; // only when non-zero the cell is considered ready.
+ cell.maxlen = msglen;
+ cell.pfx = pfx;
+ cell.npfx = npfx;
+ cell.msg = msg;
+ cell.nmsg = nmsg;
+
+ /* insert our cell into the queue before the previous one. We may have
+ * to wait a bit if the queue's leader is attempting an election to win
+ * the tail, hence the busy value (should be rare enough).
+ */
+ next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell);
+
+ /* let's add the cumulated size of pending messages to ours */
+ cell.next = next_cell;
+ if (next_cell) {
+ size_t next_needed;
+
+ while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0)
+ __ha_cpu_relax_for_read();
+ needed += next_needed;
+ }
+
+ /* now <needed> will represent the size to store *all* messages. The
+ * atomic store may unlock a subsequent thread waiting for this one.
+ */
+ HA_ATOMIC_STORE(&cell.needed_tot, needed);
+
+ /* OK now we're the queue leader, it's our job to try to get ownership
+ * of the tail, if we succeeded above, we don't even enter the loop. If
+ * we failed, we set ourselves at the top the queue, waiting for the
+ * tail to be unlocked again. We stop doing that if another thread
+ * comes in and becomes the leader in turn.
+ */
+
+ /* Wait for another thread to take the lead or for the tail to
+ * be available again. It's critical to be read-only in this
+ * loop so as not to lose time synchronizing cache lines. Also,
+ * we must detect a new leader ASAP so that the fewest possible
+ * threads check the tail.
+ */
+
+ while (1) {
+ if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell)
+ goto wait_for_flush;
+ __ha_cpu_relax_for_read();
+
+#if !defined(__ARM_FEATURE_ATOMICS)
+ /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */
+ if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) {
+ __ha_cpu_relax_for_read();
+ continue;
+ }
+#endif
+ /* OK the queue is locked, let's attempt to get the tail lock */
+ tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
+
+ /* did we get it ? */
+ if (!(tail_ofs & RING_TAIL_LOCK)) {
+ /* Here we own the tail. We can go on if we're still the leader,
+ * which we'll confirm by trying to reset the queue. If we're
+ * still the leader, we're done.
+ */
+ if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL))
+ break; // Won!
+
+ /* oops, no, let's give it back to another thread and wait.
+ * This does not happen often enough to warrant more complex
+ * approaches (tried already).
+ */
+ HA_ATOMIC_STORE(tail_ptr, tail_ofs);
+ goto wait_for_flush;
+ }
+ __ha_cpu_relax_for_read();
+ }
+
+ head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
+
+ /* this is the byte before tail, it contains the users count */
+ lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
+
+ /* Take the lock on the area. We're guaranteed to be the only writer
+ * here.
+ */
+ readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
+
+ vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
+
+ while (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
/* we need to delete the oldest message (from the end),
* and we have to stop if there's a reader stuck there.
* Unless there's corruption in the buffer it's guaranteed
@@ -212,50 +328,142 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
* varint-encoded length (1 byte min) and the message
* payload (0 bytes min).
*/
- if (*b_head(buf))
- goto done_buf;
- dellenlen = b_peek_varint(buf, 1, &dellen);
+ if (*_vp_head(v1, v2))
+ break;
+ dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen);
if (!dellenlen)
- goto done_buf;
- BUG_ON(b_data(buf) < 1 + dellenlen + dellen);
-
- b_del(buf, 1 + dellenlen + dellen);
+ break;
+ BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen);
+ vp_skip(&v1, &v2, 1 + dellenlen + dellen);
}
- /* OK now we do have room */
- __b_put_varint(buf, totlen);
+ /* now let's update the buffer with the new tail if our message will fit */
+ new_tail_ofs = tail_ofs;
+ if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) {
+ vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+
+ /* update the new space in the buffer */
+ HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
- totlen = 0;
- for (i = 0; i < npfx; i++) {
- size_t len = pfx[i].len;
+ /* calculate next tail pointer */
+ new_tail_ofs += needed;
+ if (new_tail_ofs >= ring_size)
+ new_tail_ofs -= ring_size;
- if (len + totlen > maxlen)
- len = maxlen - totlen;
- if (len)
- __b_putblk(buf, pfx[i].ptr, len);
- totlen += len;
+ /* reset next read counter before releasing writers */
+ HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
+ }
+ else {
+ /* release readers right now, before writing the tail, so as
+ * not to expose the readers count byte to another writer.
+ */
+ HA_ATOMIC_STORE(lock_ptr, readers);
}
- for (i = 0; i < nmsg; i++) {
- size_t len = msg[i].len;
+ /* and release other writers */
+ HA_ATOMIC_STORE(tail_ptr, new_tail_ofs);
+
+ vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
+
+ if (likely(tail_ofs != new_tail_ofs)) {
+ /* the list stops on a NULL */
+ for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) {
+ maxlen = curr_cell->maxlen;
+ pfx = curr_cell->pfx;
+ npfx = curr_cell->npfx;
+ msg = curr_cell->msg;
+ nmsg = curr_cell->nmsg;
+
+ /* let's write the message size */
+ vp_put_varint(&v1, &v2, maxlen);
+
+ /* then write the messages */
+ msglen = 0;
+ for (i = 0; i < npfx; i++) {
+ size_t len = pfx[i].len;
+
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, pfx[i].ptr, len);
+ msglen += len;
+ }
+
+ for (i = 0; i < nmsg; i++) {
+ size_t len = msg[i].len;
+
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, msg[i].ptr, len);
+ msglen += len;
+ }
+
+ /* for all but the last message we need to write the
+ * readers count byte.
+ */
+ if (curr_cell->next)
+ vp_putchr(&v1, &v2, 0);
+ }
+
+ /* now release */
+ for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+ next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+ _HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
+ }
- if (len + totlen > maxlen)
- len = maxlen - totlen;
- if (len)
- __b_putblk(buf, msg[i].ptr, len);
- totlen += len;
+ /* unlock the message area */
+ HA_ATOMIC_STORE(lock_ptr, readers);
+ } else {
+ /* messages were dropped, notify about this and release them */
+ for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+ next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+ HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
+ _HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
+ }
}
- *b_tail(buf) = 0; buf->data++; // new read counter
- sent = lenlen + totlen + 1;
+ /* we must not write the trailing read counter, it was already done,
+ * plus we could ruin the one of the next writer. And the front was
+ * unlocked either at the top if the ring was full, or just above if it
+ * could be properly filled.
+ */
+
+ sent = cell.to_send_self;
/* notify potential readers */
- list_for_each_entry(appctx, &ring->waiters, wait_entry)
- appctx_wakeup(appctx);
+ if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
+ HA_ATOMIC_INC(&ring->pending);
+ while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) {
+ struct mt_list *elt1, elt2;
+ struct appctx *appctx;
+
+ HA_ATOMIC_STORE(&ring->pending, 0);
+ mt_list_for_each_entry_safe(appctx, &ring->waiters, wait_entry, elt1, elt2)
+ appctx_wakeup(appctx);
+ HA_ATOMIC_STORE(&ring->waking, 0);
+ }
+ }
- done_buf:
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ leave:
return sent;
+
+ wait_for_flush:
+ /* if we arrive here, it means we found another leader */
+
+ /* The leader will write our own pointer in the cell's next to
+ * mark it as released. Let's wait for this.
+ */
+ do {
+ next_cell = HA_ATOMIC_LOAD(&cell.next);
+ } while (next_cell != &cell && __ha_cpu_relax_for_read());
+
+ /* OK our message was queued. Retrieving the sent size in the ring cell
+ * allows another leader thread to zero it if it finally couldn't send
+ * it (should only happen when using too small ring buffers to store
+ * all competing threads' messages at once).
+ */
+ return HA_ATOMIC_LOAD(&cell.to_send_self);
}
/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is
@@ -270,7 +478,7 @@ int ring_attach(struct ring *ring)
int users = ring->readers_count;
do {
- if (users >= 255)
+ if (users >= RING_MAX_READERS)
return 0;
} while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
return 1;
@@ -285,20 +493,22 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
if (!ring)
return;
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+ HA_ATOMIC_DEC(&ring->readers_count);
+
if (ofs != ~0) {
/* reader was still attached */
- if (ofs < b_head_ofs(&ring->buf))
- ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
- else
- ofs -= b_head_ofs(&ring->buf);
-
- BUG_ON(ofs >= b_size(&ring->buf));
- LIST_DEL_INIT(&appctx->wait_entry);
- HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
+ uint8_t *area = (uint8_t *)ring_area(ring);
+ uint8_t readers;
+
+ BUG_ON(ofs >= ring_size(ring));
+ MT_LIST_DELETE(&appctx->wait_entry);
+
+ /* dec readers count */
+ do {
+ readers = _HA_ATOMIC_LOAD(area + ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
}
- HA_ATOMIC_DEC(&ring->readers_count);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
}
/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
@@ -313,7 +523,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
if (!ring_attach(ring))
return cli_err(appctx,
- "Sorry, too many watchers (255) on this ring buffer. "
+ "Sorry, too many watchers (" TOSTR(RING_MAX_READERS) ") on this ring buffer. "
"What could it have so interesting to attract so many watchers ?");
if (!appctx->io_handler)
@@ -328,36 +538,29 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
return 0;
}
-/* This function dumps all events from the ring whose pointer is in <p0> into
- * the appctx's output buffer, and takes from <o0> the seek offset into the
- * buffer's history (0 for oldest known event). It looks at <i0> for boolean
- * options: bit0 means it must wait for new data or any key to be pressed. Bit1
- * means it must seek directly to the end to wait for new contents. It returns
- * 0 if the output buffer or events are missing is full and it needs to be
- * called again, otherwise non-zero. It is meant to be used with
- * cli_release_show_ring() to clean up.
+
+/* parses as many messages as possible from ring <ring>, starting at the offset
+ * stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to
+ * the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of
+ * the last known tail pointer will be copied there so that the caller may use
+ * this to detect new data have arrived since we left the function. Returns 0
+ * if it needs to pause, 1 once finished.
*/
-int cli_io_handler_show_ring(struct appctx *appctx)
+int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
- struct show_ring_ctx *ctx = appctx->svcctx;
- struct stconn *sc = appctx_sc(appctx);
- struct ring *ring = ctx->ring;
- struct buffer *buf = &ring->buf;
- size_t ofs;
- size_t last_ofs;
+ size_t head_ofs, tail_ofs, prev_ofs;
+ size_t ring_size;
+ uint8_t *ring_area;
+ struct ist v1, v2;
uint64_t msg_len;
size_t len, cnt;
+ ssize_t copied;
+ uint8_t readers;
int ret;
- /* FIXME: Don't watch the other side !*/
- if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE))
- return 1;
-
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_DEL_INIT(&appctx->wait_entry);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
-
- HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
+ ring_area = (uint8_t *)ring->storage->area;
+ ring_size = ring->storage->size;
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
@@ -365,59 +568,134 @@ int cli_io_handler_show_ring(struct appctx *appctx)
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location. This
- * value cannot be produced after initialization.
+ * value cannot be produced after initialization. The first offset
+ * needs to be taken under isolation as it must not move while we're
+ * trying to catch it.
*/
- if (unlikely(ctx->ofs == ~0)) {
- /* going to the end means looking at tail-1 */
- ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
- HA_ATOMIC_INC(b_orig(buf) + ctx->ofs);
+ if (unlikely(*ofs_ptr == ~0)) {
+ thread_isolate();
+
+ head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
+ tail_ofs = ring_tail(ring);
+
+ if (flags & RING_WF_SEEK_NEW) {
+ /* going to the end means looking at tail-1 */
+ head_ofs = tail_ofs + ring_size - 1;
+ if (head_ofs >= ring_size)
+ head_ofs -= ring_size;
+ }
+
+ /* reserve our slot here (inc readers count) */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
+
+ thread_release();
+
+ /* store this precious offset in our context, and we're done */
+ *ofs_ptr = head_ofs;
}
- /* we were already there, adjust the offset to be relative to
- * the buffer's head and remove us from the counter.
+ /* we have the guarantee we can restart from our own head */
+ head_ofs = *ofs_ptr;
+ BUG_ON(head_ofs >= ring_size);
+
+ /* the tail will continue to move but we're getting a safe value
+ * here that will continue to work.
*/
- ofs = ctx->ofs - b_head_ofs(buf);
- if (ctx->ofs < b_head_ofs(buf))
- ofs += b_size(buf);
+ tail_ofs = ring_tail(ring);
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_peek(buf, ofs));
+ /* we keep track of where we were and we don't release it before
+ * we've protected the next place.
+ */
+ prev_ofs = head_ofs;
- /* in this loop, ofs always points to the counter byte that precedes
+ /* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
+ * stop before the end (ret=0). The reference is relative to the ring's
+ * origin, while pos is relative to the ring's head.
*/
ret = 1;
- while (ofs + 1 < b_data(buf)) {
+ vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
+
+ while (1) {
+ if (vp_size(v1, v2) <= 1) {
+ /* no more data */
+ break;
+ }
+
+ readers = _HA_ATOMIC_LOAD(_vp_addr(v1, v2, 0));
+ if (readers > RING_MAX_READERS) {
+ /* we just met a writer which hasn't finished */
+ break;
+ }
+
cnt = 1;
- len = b_peek_varint(buf, ofs + cnt, &msg_len);
+ len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
if (!len)
break;
cnt += len;
- BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
- if (unlikely(msg_len + 1 > b_size(&trash))) {
+ BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
+
+ copied = msg_handler(ctx, v1, v2, cnt, msg_len);
+ if (copied == -2) {
/* too large a message to ever fit, let's skip it */
- ofs += cnt + msg_len;
- continue;
+ goto skip;
}
-
- chunk_reset(&trash);
- len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
- trash.data += len;
- trash.area[trash.data++] = '\n';
-
- if (applet_putchk(appctx, &trash) == -1) {
+ else if (copied == -1) {
+ /* output full */
ret = 0;
break;
}
- ofs += cnt + msg_len;
+ skip:
+ vp_skip(&v1, &v2, cnt + msg_len);
+ }
+
+ vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
+
+ if (head_ofs != prev_ofs) {
+ /* inc readers count on new place */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
+
+ /* dec readers count on old place */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
}
- HA_ATOMIC_INC(b_peek(buf, ofs));
- last_ofs = b_tail_ofs(buf);
- ctx->ofs = b_peek_ofs(buf, ofs);
- HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
+ if (last_ofs_ptr)
+ *last_ofs_ptr = tail_ofs;
+ *ofs_ptr = head_ofs;
+ return ret;
+}
+
+/* This function dumps all events from the ring whose pointer is in <p0> into
+ * the appctx's output buffer, and takes from <o0> the seek offset into the
+ * buffer's history (0 for oldest known event). It looks at <i0> for boolean
+ * options: bit0 means it must wait for new data or any key to be pressed. Bit1
+ * means it must seek directly to the end to wait for new contents. It returns
+ * 0 if the output buffer or events are missing is full and it needs to be
+ * called again, otherwise non-zero. It is meant to be used with
+ * cli_release_show_ring() to clean up.
+ */
+int cli_io_handler_show_ring(struct appctx *appctx)
+{
+ struct show_ring_ctx *ctx = appctx->svcctx;
+ struct stconn *sc = appctx_sc(appctx);
+ struct ring *ring = ctx->ring;
+ size_t last_ofs;
+ size_t ofs;
+ int ret;
+
+ MT_LIST_DELETE(&appctx->wait_entry);
+
+ ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line);
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
/* we've drained everything and are configured to wait for more
@@ -425,10 +703,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
*/
if (!sc_oc(sc)->output && !(sc->flags & SC_FL_SHUT_DONE)) {
/* let's be woken up once new data arrive */
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_APPEND(&ring->waiters, &appctx->wait_entry);
- ofs = b_tail_ofs(&ring->buf);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+ ofs = ring_tail(ring);
if (ofs != last_ofs) {
/* more data was added into the ring between the
* unlock and the lock, and the writer might not
@@ -467,13 +743,41 @@ size_t ring_max_payload(const struct ring *ring)
size_t max;
/* initial max = bufsize - 1 (initial RC) - 1 (payload RC) */
- max = b_size(&ring->buf) - 1 - 1;
+ max = ring_size(ring) - 1 - 1;
/* subtract payload VI (varint-encoded size) */
max -= varint_bytes(max);
return max;
}
+/* config parser for global "tune.ring.queues", accepts a number from 0 to RING_WAIT_QUEUES */
+static int cfg_parse_tune_ring_queues(char **args, int section_type, struct proxy *curpx,
+ const struct proxy *defpx, const char *file, int line,
+ char **err)
+{
+ int queues;
+
+ if (too_many_args(1, args, err, NULL))
+ return -1;
+
+ queues = atoi(args[1]);
+ if (queues < 0 || queues > RING_WAIT_QUEUES) {
+ memprintf(err, "'%s' expects a number between 0 and %d but got '%s'.", args[0], RING_WAIT_QUEUES, args[1]);
+ return -1;
+ }
+
+ global.tune.ring_queues = queues;
+ return 0;
+}
+
+/* config keyword parsers */
+static struct cfg_kw_list cfg_kws = {ILH, {
+ { CFG_GLOBAL, "tune.ring.queues", cfg_parse_tune_ring_queues },
+ { 0, NULL, NULL }
+}};
+
+INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
+
/*
* Local variables:
* c-indent-level: 8