diff options
Diffstat (limited to 'src/sink.c')
-rw-r--r-- | src/sink.c | 283 |
1 files changed, 105 insertions, 178 deletions
@@ -87,7 +87,6 @@ static struct sink *__sink_new(const char *name, const char *desc, int fmt) /* address will be filled by the caller if needed */ sink->ctx.fd = -1; sink->ctx.dropped = 0; - HA_RWLOCK_INIT(&sink->ctx.lock); LIST_APPEND(&sink_list, &sink->sink_list); end: return sink; @@ -206,30 +205,79 @@ send: * here with the only difference that we override the log level. This is * possible since the announce message will be sent from the same context. * - * In case of success, the amount of drops is reduced by as much. It's supposed - * to be called under an exclusive lock on the sink to avoid multiple producers - * doing the same. On success, >0 is returned, otherwise <=0 on failure. + * In case of success, the amount of drops is reduced by as much. + * The function ensures that a single thread will do that work at once, other + * ones will only report a failure if a thread is dumping, so that no thread + * waits. A pair od atomic OR and AND is performed around the code so the + * caller would be advised to only call this function AFTER having verified + * that sink->ctx.dropped is not zero in order to avoid a memory write. On + * success, >0 is returned, otherwise <=0 on failure, indicating that it could + * not eliminate the pending drop counter. It may loop up to 10 times trying + * to catch up with failing competing threads. */ int sink_announce_dropped(struct sink *sink, struct log_header hdr) { - unsigned int dropped; - struct buffer msg; + static THREAD_LOCAL char msg_dropped1[] = "1 event dropped"; + static THREAD_LOCAL char msg_dropped2[] = "0000000000 events dropped"; + uint dropped, last_dropped; struct ist msgvec[1]; - char logbuf[64]; + uint retries = 10; + int ret = 0; + + /* Explanation. ctx.dropped is made of: + * bit0 = 1 if dropped dump in progress + * bit1..31 = dropped counter + * If non-zero there have been some drops. If not &1, it means + * nobody's taking care of them and we'll have to, otherwise + * another thread is already on them and we can just pass and + * count another drop (hence add 2). + */ + dropped = HA_ATOMIC_FETCH_OR(&sink->ctx.dropped, 1); + if (dropped & 1) { + /* another thread was already on it */ + goto leave; + } - while (unlikely((dropped = sink->ctx.dropped) > 0)) { - chunk_init(&msg, logbuf, sizeof(logbuf)); - chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); - msgvec[0] = ist2(msg.area, msg.data); + last_dropped = 0; + dropped >>= 1; + while (1) { + while (unlikely(dropped > last_dropped) && retries-- > 0) { + /* try to aggregate multiple messages if other threads arrive while + * we're producing the dropped message. + */ + uint msglen = sizeof(msg_dropped1); + const char *msg = msg_dropped1; + + last_dropped = dropped; + if (dropped > 1) { + msg = ultoa_r(dropped, msg_dropped2, 11); + msg_dropped2[10] = ' '; + msglen = msg_dropped2 + sizeof(msg_dropped2) - msg; + } + msgvec[0] = ist2(msg, msglen); + dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1; + } + if (!dropped) + break; + + last_dropped = 0; hdr.level = LOG_NOTICE; /* override level but keep original log header data */ if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0) - return 0; + goto done; + /* success! */ - HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); + HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1); } - return 1; + + /* done! */ + ret = 1; +done: + /* unlock the counter */ + HA_ATOMIC_AND(&sink->ctx.dropped, ~1); +leave: + return ret; } /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ @@ -284,7 +332,7 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc /* Pre-configures a ring proxy to emit connections */ void sink_setup_proxy(struct proxy *px) { - px->last_change = ns_to_sec(now_ns); + px->be_counters.last_change = ns_to_sec(now_ns); px->cap = PR_CAP_BE; px->maxconn = 0; px->conn_retries = 1; @@ -307,13 +355,12 @@ static void sink_forward_io_handler(struct appctx *appctx) struct sink_forward_target *sft = appctx->svcctx; struct sink *sink = sft->sink; struct ring *ring = sink->ctx.ring; - struct buffer *buf = &ring->buf; - uint64_t msg_len; - size_t len, cnt, ofs, last_ofs; + size_t ofs, last_ofs; int ret = 0; - if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) + if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { goto out; + } /* if stopping was requested, close immediately */ if (unlikely(stopping)) @@ -335,77 +382,14 @@ static void sink_forward_io_handler(struct appctx *appctx) goto close; } - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_DEL_INIT(&appctx->wait_entry); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); - - HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); - - /* explanation for the initialization below: it would be better to do - * this in the parsing function but this would occasionally result in - * dropped events because we'd take a reference on the oldest message - * and keep it while being scheduled. Thus instead let's take it the - * first time we enter here so that we have a chance to pass many - * existing messages before grabbing a reference to a location. This - * value cannot be produced after initialization. - */ - if (unlikely(sft->ofs == ~0)) { - sft->ofs = b_peek_ofs(buf, 0); - HA_ATOMIC_INC(b_orig(buf) + sft->ofs); - } - - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = sft->ofs - b_head_ofs(buf); - if (sft->ofs < b_head_ofs(buf)) - ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); - - /* in this loop, ofs always points to the counter byte that precedes - * the message so that we can take our reference there if we have to - * stop before the end (ret=0). - */ - ret = 1; - while (ofs + 1 < b_data(buf)) { - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - - if (unlikely(msg_len + 1 > b_size(&trash))) { - /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; - } - - chunk_reset(&trash); - len = b_getblk(buf, trash.area, msg_len, ofs + cnt); - trash.data += len; - trash.area[trash.data++] = '\n'; - - if (applet_putchk(appctx, &trash) == -1) { - ret = 0; - break; - } - ofs += cnt + msg_len; - } - - HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - sft->ofs = b_peek_ofs(buf, ofs); + MT_LIST_DELETE(&appctx->wait_entry); - HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); + ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line); if (ret) { /* let's be woken up once new data arrive */ - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_APPEND(&ring->waiters, &appctx->wait_entry); - ofs = b_tail_ofs(buf); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry); + ofs = ring_tail(ring); if (ofs != last_ofs) { /* more data was added into the ring between the * unlock and the lock, and the writer might not @@ -437,11 +421,8 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) struct sink_forward_target *sft = appctx->svcctx; struct sink *sink = sft->sink; struct ring *ring = sink->ctx.ring; - struct buffer *buf = &ring->buf; - uint64_t msg_len; - size_t len, cnt, ofs, last_ofs; + size_t ofs, last_ofs; int ret = 0; - char *p; if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) goto out; @@ -466,80 +447,13 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) goto close; } - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_DEL_INIT(&appctx->wait_entry); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); - - HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); - - /* explanation for the initialization below: it would be better to do - * this in the parsing function but this would occasionally result in - * dropped events because we'd take a reference on the oldest message - * and keep it while being scheduled. Thus instead let's take it the - * first time we enter here so that we have a chance to pass many - * existing messages before grabbing a reference to a location. This - * value cannot be produced after initialization. - */ - if (unlikely(sft->ofs == ~0)) { - sft->ofs = b_peek_ofs(buf, 0); - HA_ATOMIC_INC(b_orig(buf) + sft->ofs); - } - - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = sft->ofs - b_head_ofs(buf); - if (sft->ofs < b_head_ofs(buf)) - ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); - - /* in this loop, ofs always points to the counter byte that precedes - * the message so that we can take our reference there if we have to - * stop before the end (ret=0). - */ - ret = 1; - while (ofs + 1 < b_data(buf)) { - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - - chunk_reset(&trash); - p = ulltoa(msg_len, trash.area, b_size(&trash)); - if (p) { - trash.data = (p - trash.area) + 1; - *p = ' '; - } - - if (!p || (trash.data + msg_len > b_size(&trash))) { - /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; - } - - trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); - - if (applet_putchk(appctx, &trash) == -1) { - ret = 0; - break; - } - ofs += cnt + msg_len; - } - - HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - sft->ofs = b_peek_ofs(buf, ofs); - HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); + MT_LIST_DELETE(&appctx->wait_entry); + ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event); if (ret) { /* let's be woken up once new data arrive */ - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_APPEND(&ring->waiters, &appctx->wait_entry); - ofs = b_tail_ofs(buf); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry); + ofs = ring_tail(ring); if (ofs != last_ofs) { /* more data was added into the ring between the * unlock and the lock, and the writer might not @@ -569,9 +483,7 @@ void __sink_forward_session_deinit(struct sink_forward_target *sft) if (!sink) return; - HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.ring->lock); - LIST_DEL_INIT(&sft->appctx->wait_entry); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.ring->lock); + MT_LIST_DELETE(&sft->appctx->wait_entry); sft->appctx = NULL; task_wakeup(sink->forward_task, TASK_WOKEN_MSG); @@ -728,7 +640,7 @@ int sink_init_forward(struct sink *sink) */ void sink_rotate_file_backed_ring(const char *name) { - struct ring ring; + struct ring_storage storage; char *oldback; int ret; int fd; @@ -738,16 +650,20 @@ void sink_rotate_file_backed_ring(const char *name) return; /* check for contents validity */ - ret = read(fd, &ring, sizeof(ring)); + ret = read(fd, &storage, sizeof(storage)); close(fd); - if (ret != sizeof(ring)) + if (ret != sizeof(storage)) goto rotate; + /* check that it's the expected format before touching it */ + if (storage.rsvd != sizeof(storage)) + return; + /* contents are present, we want to keep them => rotate. Note that * an empty ring buffer has one byte (the marker). */ - if (ring.buf.data > 1) + if (storage.head != 0 || storage.tail != 1) goto rotate; /* nothing to keep, let's scratch the file and preserve the backup */ @@ -779,15 +695,14 @@ static void sink_free(struct sink *sink) return; if (sink->type == SINK_TYPE_BUFFER) { if (sink->store) { - size_t size = (sink->ctx.ring->buf.size + 4095UL) & -4096UL; - void *area = (sink->ctx.ring->buf.area - sizeof(*sink->ctx.ring)); + size_t size = (ring_allocated_size(sink->ctx.ring) + 4095UL) & -4096UL; + void *area = ring_allocated_area(sink->ctx.ring); msync(area, size, MS_SYNC); munmap(area, size); ha_free(&sink->store); } - else - ring_free(sink->ctx.ring); + ring_free(sink->ctx.ring); } LIST_DEL_INIT(&sink->sink_list); // remove from parent list task_destroy(sink->forward_task); @@ -914,6 +829,12 @@ static int sink_finalize(struct sink *sink) ha_alert("error when trying to initialize sink buffer forwarding.\n"); err_code |= ERR_ALERT | ERR_FATAL; } + if (!sink->store) { + /* virtual memory backed sink */ + vma_set_name(ring_allocated_area(sink->ctx.ring), + ring_allocated_size(sink->ctx.ring), + "ring", sink->name); + } } return err_code; } @@ -979,22 +900,28 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) goto err; } + if (size > RING_TAIL_LOCK) { + ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK); + err_code |= ERR_ALERT | ERR_FATAL; + goto err; + } + if (cfg_sink->store) { ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } - if (size < cfg_sink->ctx.ring->buf.size) { - ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n", - file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); + if (size < ring_data(cfg_sink->ctx.ring)) { + ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than contents '%llu' for ring '%s'.\n", + file, linenum, (ullong)size, (ullong)ring_data(cfg_sink->ctx.ring), cfg_sink->name); err_code |= ERR_WARN; goto err; } if (!ring_resize(cfg_sink->ctx.ring, size)) { ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum, - (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); + (ullong)ring_size(cfg_sink->ctx.ring), cfg_sink->name); err_code |= ERR_ALERT | ERR_FATAL; goto err; } @@ -1034,7 +961,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) goto err; } - size = (cfg_sink->ctx.ring->buf.size + 4095UL) & -4096UL; + size = (ring_size(cfg_sink->ctx.ring) + 4095UL) & -4096UL; if (ftruncate(fd, size) != 0) { close(fd); ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno)); @@ -1056,7 +983,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) /* never fails */ ring_free(cfg_sink->ctx.ring); - cfg_sink->ctx.ring = ring_make_from_area(area, size); + cfg_sink->ctx.ring = ring_make_from_area(area, size, 1); } else if (strcmp(args[0],"server") == 0) { if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { |