summaryrefslogtreecommitdiffstats
path: root/src/sink.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/sink.c283
1 files changed, 105 insertions, 178 deletions
diff --git a/src/sink.c b/src/sink.c
index 58adfcd..1a9165e 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -87,7 +87,6 @@ static struct sink *__sink_new(const char *name, const char *desc, int fmt)
/* address will be filled by the caller if needed */
sink->ctx.fd = -1;
sink->ctx.dropped = 0;
- HA_RWLOCK_INIT(&sink->ctx.lock);
LIST_APPEND(&sink_list, &sink->sink_list);
end:
return sink;
@@ -206,30 +205,79 @@ send:
* here with the only difference that we override the log level. This is
* possible since the announce message will be sent from the same context.
*
- * In case of success, the amount of drops is reduced by as much. It's supposed
- * to be called under an exclusive lock on the sink to avoid multiple producers
- * doing the same. On success, >0 is returned, otherwise <=0 on failure.
+ * In case of success, the amount of drops is reduced by as much.
+ * The function ensures that a single thread will do that work at once, other
+ * ones will only report a failure if a thread is dumping, so that no thread
+ * waits. A pair od atomic OR and AND is performed around the code so the
+ * caller would be advised to only call this function AFTER having verified
+ * that sink->ctx.dropped is not zero in order to avoid a memory write. On
+ * success, >0 is returned, otherwise <=0 on failure, indicating that it could
+ * not eliminate the pending drop counter. It may loop up to 10 times trying
+ * to catch up with failing competing threads.
*/
int sink_announce_dropped(struct sink *sink, struct log_header hdr)
{
- unsigned int dropped;
- struct buffer msg;
+ static THREAD_LOCAL char msg_dropped1[] = "1 event dropped";
+ static THREAD_LOCAL char msg_dropped2[] = "0000000000 events dropped";
+ uint dropped, last_dropped;
struct ist msgvec[1];
- char logbuf[64];
+ uint retries = 10;
+ int ret = 0;
+
+ /* Explanation. ctx.dropped is made of:
+ * bit0 = 1 if dropped dump in progress
+ * bit1..31 = dropped counter
+ * If non-zero there have been some drops. If not &1, it means
+ * nobody's taking care of them and we'll have to, otherwise
+ * another thread is already on them and we can just pass and
+ * count another drop (hence add 2).
+ */
+ dropped = HA_ATOMIC_FETCH_OR(&sink->ctx.dropped, 1);
+ if (dropped & 1) {
+ /* another thread was already on it */
+ goto leave;
+ }
- while (unlikely((dropped = sink->ctx.dropped) > 0)) {
- chunk_init(&msg, logbuf, sizeof(logbuf));
- chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
- msgvec[0] = ist2(msg.area, msg.data);
+ last_dropped = 0;
+ dropped >>= 1;
+ while (1) {
+ while (unlikely(dropped > last_dropped) && retries-- > 0) {
+ /* try to aggregate multiple messages if other threads arrive while
+ * we're producing the dropped message.
+ */
+ uint msglen = sizeof(msg_dropped1);
+ const char *msg = msg_dropped1;
+
+ last_dropped = dropped;
+ if (dropped > 1) {
+ msg = ultoa_r(dropped, msg_dropped2, 11);
+ msg_dropped2[10] = ' ';
+ msglen = msg_dropped2 + sizeof(msg_dropped2) - msg;
+ }
+ msgvec[0] = ist2(msg, msglen);
+ dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1;
+ }
+ if (!dropped)
+ break;
+
+ last_dropped = 0;
hdr.level = LOG_NOTICE; /* override level but keep original log header data */
if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0)
- return 0;
+ goto done;
+
/* success! */
- HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
+ HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1);
}
- return 1;
+
+ /* done! */
+ ret = 1;
+done:
+ /* unlock the counter */
+ HA_ATOMIC_AND(&sink->ctx.dropped, ~1);
+leave:
+ return ret;
}
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
@@ -284,7 +332,7 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc
/* Pre-configures a ring proxy to emit connections */
void sink_setup_proxy(struct proxy *px)
{
- px->last_change = ns_to_sec(now_ns);
+ px->be_counters.last_change = ns_to_sec(now_ns);
px->cap = PR_CAP_BE;
px->maxconn = 0;
px->conn_retries = 1;
@@ -307,13 +355,12 @@ static void sink_forward_io_handler(struct appctx *appctx)
struct sink_forward_target *sft = appctx->svcctx;
struct sink *sink = sft->sink;
struct ring *ring = sink->ctx.ring;
- struct buffer *buf = &ring->buf;
- uint64_t msg_len;
- size_t len, cnt, ofs, last_ofs;
+ size_t ofs, last_ofs;
int ret = 0;
- if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW))))
+ if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
goto out;
+ }
/* if stopping was requested, close immediately */
if (unlikely(stopping))
@@ -335,77 +382,14 @@ static void sink_forward_io_handler(struct appctx *appctx)
goto close;
}
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_DEL_INIT(&appctx->wait_entry);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
-
- HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
-
- /* explanation for the initialization below: it would be better to do
- * this in the parsing function but this would occasionally result in
- * dropped events because we'd take a reference on the oldest message
- * and keep it while being scheduled. Thus instead let's take it the
- * first time we enter here so that we have a chance to pass many
- * existing messages before grabbing a reference to a location. This
- * value cannot be produced after initialization.
- */
- if (unlikely(sft->ofs == ~0)) {
- sft->ofs = b_peek_ofs(buf, 0);
- HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
- }
-
- /* we were already there, adjust the offset to be relative to
- * the buffer's head and remove us from the counter.
- */
- ofs = sft->ofs - b_head_ofs(buf);
- if (sft->ofs < b_head_ofs(buf))
- ofs += b_size(buf);
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_peek(buf, ofs));
-
- /* in this loop, ofs always points to the counter byte that precedes
- * the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
- */
- ret = 1;
- while (ofs + 1 < b_data(buf)) {
- cnt = 1;
- len = b_peek_varint(buf, ofs + cnt, &msg_len);
- if (!len)
- break;
- cnt += len;
- BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
-
- if (unlikely(msg_len + 1 > b_size(&trash))) {
- /* too large a message to ever fit, let's skip it */
- ofs += cnt + msg_len;
- continue;
- }
-
- chunk_reset(&trash);
- len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
- trash.data += len;
- trash.area[trash.data++] = '\n';
-
- if (applet_putchk(appctx, &trash) == -1) {
- ret = 0;
- break;
- }
- ofs += cnt + msg_len;
- }
-
- HA_ATOMIC_INC(b_peek(buf, ofs));
- last_ofs = b_tail_ofs(buf);
- sft->ofs = b_peek_ofs(buf, ofs);
+ MT_LIST_DELETE(&appctx->wait_entry);
- HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
+ ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
if (ret) {
/* let's be woken up once new data arrive */
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_APPEND(&ring->waiters, &appctx->wait_entry);
- ofs = b_tail_ofs(buf);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+ ofs = ring_tail(ring);
if (ofs != last_ofs) {
/* more data was added into the ring between the
* unlock and the lock, and the writer might not
@@ -437,11 +421,8 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
struct sink_forward_target *sft = appctx->svcctx;
struct sink *sink = sft->sink;
struct ring *ring = sink->ctx.ring;
- struct buffer *buf = &ring->buf;
- uint64_t msg_len;
- size_t len, cnt, ofs, last_ofs;
+ size_t ofs, last_ofs;
int ret = 0;
- char *p;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW))))
goto out;
@@ -466,80 +447,13 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
goto close;
}
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_DEL_INIT(&appctx->wait_entry);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
-
- HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
-
- /* explanation for the initialization below: it would be better to do
- * this in the parsing function but this would occasionally result in
- * dropped events because we'd take a reference on the oldest message
- * and keep it while being scheduled. Thus instead let's take it the
- * first time we enter here so that we have a chance to pass many
- * existing messages before grabbing a reference to a location. This
- * value cannot be produced after initialization.
- */
- if (unlikely(sft->ofs == ~0)) {
- sft->ofs = b_peek_ofs(buf, 0);
- HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
- }
-
- /* we were already there, adjust the offset to be relative to
- * the buffer's head and remove us from the counter.
- */
- ofs = sft->ofs - b_head_ofs(buf);
- if (sft->ofs < b_head_ofs(buf))
- ofs += b_size(buf);
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_peek(buf, ofs));
-
- /* in this loop, ofs always points to the counter byte that precedes
- * the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
- */
- ret = 1;
- while (ofs + 1 < b_data(buf)) {
- cnt = 1;
- len = b_peek_varint(buf, ofs + cnt, &msg_len);
- if (!len)
- break;
- cnt += len;
- BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
-
- chunk_reset(&trash);
- p = ulltoa(msg_len, trash.area, b_size(&trash));
- if (p) {
- trash.data = (p - trash.area) + 1;
- *p = ' ';
- }
-
- if (!p || (trash.data + msg_len > b_size(&trash))) {
- /* too large a message to ever fit, let's skip it */
- ofs += cnt + msg_len;
- continue;
- }
-
- trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
-
- if (applet_putchk(appctx, &trash) == -1) {
- ret = 0;
- break;
- }
- ofs += cnt + msg_len;
- }
-
- HA_ATOMIC_INC(b_peek(buf, ofs));
- last_ofs = b_tail_ofs(buf);
- sft->ofs = b_peek_ofs(buf, ofs);
- HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
+ MT_LIST_DELETE(&appctx->wait_entry);
+ ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event);
if (ret) {
/* let's be woken up once new data arrive */
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
- LIST_APPEND(&ring->waiters, &appctx->wait_entry);
- ofs = b_tail_ofs(buf);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+ ofs = ring_tail(ring);
if (ofs != last_ofs) {
/* more data was added into the ring between the
* unlock and the lock, and the writer might not
@@ -569,9 +483,7 @@ void __sink_forward_session_deinit(struct sink_forward_target *sft)
if (!sink)
return;
- HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.ring->lock);
- LIST_DEL_INIT(&sft->appctx->wait_entry);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.ring->lock);
+ MT_LIST_DELETE(&sft->appctx->wait_entry);
sft->appctx = NULL;
task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
@@ -728,7 +640,7 @@ int sink_init_forward(struct sink *sink)
*/
void sink_rotate_file_backed_ring(const char *name)
{
- struct ring ring;
+ struct ring_storage storage;
char *oldback;
int ret;
int fd;
@@ -738,16 +650,20 @@ void sink_rotate_file_backed_ring(const char *name)
return;
/* check for contents validity */
- ret = read(fd, &ring, sizeof(ring));
+ ret = read(fd, &storage, sizeof(storage));
close(fd);
- if (ret != sizeof(ring))
+ if (ret != sizeof(storage))
goto rotate;
+ /* check that it's the expected format before touching it */
+ if (storage.rsvd != sizeof(storage))
+ return;
+
/* contents are present, we want to keep them => rotate. Note that
* an empty ring buffer has one byte (the marker).
*/
- if (ring.buf.data > 1)
+ if (storage.head != 0 || storage.tail != 1)
goto rotate;
/* nothing to keep, let's scratch the file and preserve the backup */
@@ -779,15 +695,14 @@ static void sink_free(struct sink *sink)
return;
if (sink->type == SINK_TYPE_BUFFER) {
if (sink->store) {
- size_t size = (sink->ctx.ring->buf.size + 4095UL) & -4096UL;
- void *area = (sink->ctx.ring->buf.area - sizeof(*sink->ctx.ring));
+ size_t size = (ring_allocated_size(sink->ctx.ring) + 4095UL) & -4096UL;
+ void *area = ring_allocated_area(sink->ctx.ring);
msync(area, size, MS_SYNC);
munmap(area, size);
ha_free(&sink->store);
}
- else
- ring_free(sink->ctx.ring);
+ ring_free(sink->ctx.ring);
}
LIST_DEL_INIT(&sink->sink_list); // remove from parent list
task_destroy(sink->forward_task);
@@ -914,6 +829,12 @@ static int sink_finalize(struct sink *sink)
ha_alert("error when trying to initialize sink buffer forwarding.\n");
err_code |= ERR_ALERT | ERR_FATAL;
}
+ if (!sink->store) {
+ /* virtual memory backed sink */
+ vma_set_name(ring_allocated_area(sink->ctx.ring),
+ ring_allocated_size(sink->ctx.ring),
+ "ring", sink->name);
+ }
}
return err_code;
}
@@ -979,22 +900,28 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
goto err;
}
+ if (size > RING_TAIL_LOCK) {
+ ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+
if (cfg_sink->store) {
ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
- if (size < cfg_sink->ctx.ring->buf.size) {
- ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
- file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
+ if (size < ring_data(cfg_sink->ctx.ring)) {
+ ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than contents '%llu' for ring '%s'.\n",
+ file, linenum, (ullong)size, (ullong)ring_data(cfg_sink->ctx.ring), cfg_sink->name);
err_code |= ERR_WARN;
goto err;
}
if (!ring_resize(cfg_sink->ctx.ring, size)) {
ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
- (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
+ (ullong)ring_size(cfg_sink->ctx.ring), cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
@@ -1034,7 +961,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
goto err;
}
- size = (cfg_sink->ctx.ring->buf.size + 4095UL) & -4096UL;
+ size = (ring_size(cfg_sink->ctx.ring) + 4095UL) & -4096UL;
if (ftruncate(fd, size) != 0) {
close(fd);
ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno));
@@ -1056,7 +983,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
/* never fails */
ring_free(cfg_sink->ctx.ring);
- cfg_sink->ctx.ring = ring_make_from_area(area, size);
+ cfg_sink->ctx.ring = ring_make_from_area(area, size, 1);
}
else if (strcmp(args[0],"server") == 0) {
if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {