/* * Event sink management * * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, version 2.1 * exclusively. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include struct list sink_list = LIST_HEAD_INIT(sink_list); /* sink proxies list */ struct proxy *sink_proxies_list; struct sink *cfg_sink; struct sink *sink_find(const char *name) { struct sink *sink; list_for_each_entry(sink, &sink_list, sink_list) if (strcmp(sink->name, name) == 0) return sink; return NULL; } /* creates a new sink and adds it to the list, it's still generic and not fully * initialized. Returns NULL on allocation failure. If another one already * exists with the same name, it will be returned. The caller can detect it as * a newly created one has type SINK_TYPE_NEW. */ static struct sink *__sink_new(const char *name, const char *desc, int fmt) { struct sink *sink; sink = sink_find(name); if (sink) goto end; sink = calloc(1, sizeof(*sink)); if (!sink) goto end; sink->name = strdup(name); if (!sink->name) goto err; sink->desc = strdup(desc); if (!sink->desc) goto err; sink->fmt = fmt; sink->type = SINK_TYPE_NEW; sink->maxlen = BUFSIZE; /* address will be filled by the caller if needed */ sink->ctx.fd = -1; sink->ctx.dropped = 0; HA_RWLOCK_INIT(&sink->ctx.lock); LIST_APPEND(&sink_list, &sink->sink_list); end: return sink; err: ha_free(&sink->name); ha_free(&sink->desc); ha_free(&sink); return NULL; } /* creates a sink called of type FD associated to fd , format , * and description . Returns NULL on allocation failure or conflict. * Perfect duplicates are merged (same type, fd, and name). */ struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd) { struct sink *sink; sink = __sink_new(name, desc, fmt); if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd)) goto end; if (sink->type != SINK_TYPE_NEW) { sink = NULL; goto end; } sink->type = SINK_TYPE_FD; sink->ctx.fd = fd; end: return sink; } /* creates a sink called of type BUF of size , format , * and description . Returns NULL on allocation failure or conflict. * Perfect duplicates are merged (same type and name). If sizes differ, the * largest one is kept. */ struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size) { struct sink *sink; sink = __sink_new(name, desc, fmt); if (!sink) goto fail; if (sink->type == SINK_TYPE_BUFFER) { /* such a buffer already exists, we may have to resize it */ if (!ring_resize(sink->ctx.ring, size)) goto fail; goto end; } if (sink->type != SINK_TYPE_NEW) { /* already exists of another type */ goto fail; } sink->ctx.ring = ring_new(size); if (!sink->ctx.ring) { LIST_DELETE(&sink->sink_list); free(sink->name); free(sink->desc); free(sink); goto fail; } sink->type = SINK_TYPE_BUFFER; end: return sink; fail: return NULL; } /* tries to send message parts from message array to sink . * Formatting according to the sink's preference is done here, unless sink->fmt * is unspecified, in which case the caller formatting will be used instead. * Lost messages are NOT accounted for. It is preferable to call sink_write() * instead which will also try to emit the number of dropped messages when there * are any. * * It will stop writing at instead of sink->maxlen if is * positive and inferior to sink->maxlen. * * It returns >0 if it could write anything, <=0 otherwise. */ ssize_t __sink_write(struct sink *sink, struct log_header hdr, size_t maxlen, const struct ist msg[], size_t nmsg) { struct ist *pfx = NULL; size_t npfx = 0; if (sink->fmt == LOG_FORMAT_RAW) goto send; if (sink->fmt != LOG_FORMAT_UNSPEC) hdr.format = sink->fmt; /* sink format prevails over log one */ pfx = build_log_header(hdr, &npfx); send: if (!maxlen) maxlen = ~0; if (sink->type == SINK_TYPE_FD) { return fd_write_frag_line(sink->ctx.fd, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg, 1); } else if (sink->type == SINK_TYPE_BUFFER) { return ring_write(sink->ctx.ring, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg); } return 0; } /* Tries to emit a message indicating the number of dropped events. * The log header of the original message that we tried to emit is reused * here with the only difference that we override the log level. This is * possible since the announce message will be sent from the same context. * * In case of success, the amount of drops is reduced by as much. It's supposed * to be called under an exclusive lock on the sink to avoid multiple producers * doing the same. On success, >0 is returned, otherwise <=0 on failure. */ int sink_announce_dropped(struct sink *sink, struct log_header hdr) { unsigned int dropped; struct buffer msg; struct ist msgvec[1]; char logbuf[64]; while (unlikely((dropped = sink->ctx.dropped) > 0)) { chunk_init(&msg, logbuf, sizeof(logbuf)); chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); msgvec[0] = ist2(msg.area, msg.data); hdr.level = LOG_NOTICE; /* override level but keep original log header data */ if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0) return 0; /* success! */ HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); } return 1; } /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private) { struct sink *sink; uint ring_flags; int arg; args++; // make args[1] the 1st arg if (!*args[1]) { /* no arg => report the list of supported sink */ chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n"); list_for_each_entry(sink, &sink_list, sink_list) { chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n", sink->name, sink->type == SINK_TYPE_NEW ? "init" : sink->type == SINK_TYPE_FD ? "fd" : sink->type == SINK_TYPE_BUFFER ? "buffer" : "?", sink->ctx.dropped, sink->desc); } trash.area[trash.data] = 0; return cli_msg(appctx, LOG_WARNING, trash.area); } if (!cli_has_level(appctx, ACCESS_LVL_OPER)) return 1; sink = sink_find(args[1]); if (!sink) return cli_err(appctx, "No such event sink"); if (sink->type != SINK_TYPE_BUFFER) return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink"); ring_flags = 0; for (arg = 2; *args[arg]; arg++) { if (strcmp(args[arg], "-w") == 0) ring_flags |= RING_WF_WAIT_MODE; else if (strcmp(args[arg], "-n") == 0) ring_flags |= RING_WF_SEEK_NEW; else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0) ring_flags |= RING_WF_WAIT_MODE | RING_WF_SEEK_NEW; else return cli_err(appctx, "unknown option"); } return ring_attach_cli(sink->ctx.ring, appctx, ring_flags); } /* Pre-configures a ring proxy to emit connections */ void sink_setup_proxy(struct proxy *px) { px->last_change = ns_to_sec(now_ns); px->cap = PR_CAP_BE; px->maxconn = 0; px->conn_retries = 1; px->timeout.server = TICK_ETERNITY; px->timeout.client = TICK_ETERNITY; px->timeout.connect = TICK_ETERNITY; px->accept = NULL; px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; px->next = sink_proxies_list; sink_proxies_list = px; } /* * IO Handler to handle message push to syslog tcp server. * It takes its context from appctx->svcctx. */ static void sink_forward_io_handler(struct appctx *appctx) { struct stconn *sc = appctx_sc(appctx); struct sink_forward_target *sft = appctx->svcctx; struct sink *sink = sft->sink; struct ring *ring = sink->ctx.ring; struct buffer *buf = &ring->buf; uint64_t msg_len; size_t len, cnt, ofs, last_ofs; int ret = 0; if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) goto out; /* if stopping was requested, close immediately */ if (unlikely(stopping)) goto close; /* if the connection is not established, inform the stream that we want * to be notified whenever the connection completes. */ if (sc_opposite(sc)->state < SC_ST_EST) { applet_need_more_data(appctx); se_need_remote_conn(appctx->sedesc); applet_have_more_data(appctx); goto out; } HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (appctx != sft->appctx) { HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); goto close; } HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); LIST_DEL_INIT(&appctx->wait_entry); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This * value cannot be produced after initialization. */ if (unlikely(sft->ofs == ~0)) { sft->ofs = b_peek_ofs(buf, 0); HA_ATOMIC_INC(b_orig(buf) + sft->ofs); } /* we were already there, adjust the offset to be relative to * the buffer's head and remove us from the counter. */ ofs = sft->ofs - b_head_ofs(buf); if (sft->ofs < b_head_ofs(buf)) ofs += b_size(buf); BUG_ON(ofs >= buf->size); HA_ATOMIC_DEC(b_peek(buf, ofs)); /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ ret = 1; while (ofs + 1 < b_data(buf)) { cnt = 1; len = b_peek_varint(buf, ofs + cnt, &msg_len); if (!len) break; cnt += len; BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); if (unlikely(msg_len + 1 > b_size(&trash))) { /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; continue; } chunk_reset(&trash); len = b_getblk(buf, trash.area, msg_len, ofs + cnt); trash.data += len; trash.area[trash.data++] = '\n'; if (applet_putchk(appctx, &trash) == -1) { ret = 0; break; } ofs += cnt + msg_len; } HA_ATOMIC_INC(b_peek(buf, ofs)); last_ofs = b_tail_ofs(buf); sft->ofs = b_peek_ofs(buf, ofs); HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); if (ret) { /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); ofs = b_tail_ofs(buf); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); if (ofs != last_ofs) { /* more data was added into the ring between the * unlock and the lock, and the writer might not * have seen us. We need to reschedule a read. */ applet_have_more_data(appctx); } else applet_have_no_more_data(appctx); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); out: /* always drain data from server */ co_skip(sc_oc(sc), sc_oc(sc)->output); return; close: se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); } /* * IO Handler to handle message push to syslog tcp server * using octet counting frames * It takes its context from appctx->svcctx. */ static void sink_forward_oc_io_handler(struct appctx *appctx) { struct stconn *sc = appctx_sc(appctx); struct sink_forward_target *sft = appctx->svcctx; struct sink *sink = sft->sink; struct ring *ring = sink->ctx.ring; struct buffer *buf = &ring->buf; uint64_t msg_len; size_t len, cnt, ofs, last_ofs; int ret = 0; char *p; if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) goto out; /* if stopping was requested, close immediately */ if (unlikely(stopping)) goto close; /* if the connection is not established, inform the stream that we want * to be notified whenever the connection completes. */ if (sc_opposite(sc)->state < SC_ST_EST) { applet_need_more_data(appctx); se_need_remote_conn(appctx->sedesc); applet_have_more_data(appctx); goto out; } HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (appctx != sft->appctx) { HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); goto close; } HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); LIST_DEL_INIT(&appctx->wait_entry); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This * value cannot be produced after initialization. */ if (unlikely(sft->ofs == ~0)) { sft->ofs = b_peek_ofs(buf, 0); HA_ATOMIC_INC(b_orig(buf) + sft->ofs); } /* we were already there, adjust the offset to be relative to * the buffer's head and remove us from the counter. */ ofs = sft->ofs - b_head_ofs(buf); if (sft->ofs < b_head_ofs(buf)) ofs += b_size(buf); BUG_ON(ofs >= buf->size); HA_ATOMIC_DEC(b_peek(buf, ofs)); /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ ret = 1; while (ofs + 1 < b_data(buf)) { cnt = 1; len = b_peek_varint(buf, ofs + cnt, &msg_len); if (!len) break; cnt += len; BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); chunk_reset(&trash); p = ulltoa(msg_len, trash.area, b_size(&trash)); if (p) { trash.data = (p - trash.area) + 1; *p = ' '; } if (!p || (trash.data + msg_len > b_size(&trash))) { /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; continue; } trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); if (applet_putchk(appctx, &trash) == -1) { ret = 0; break; } ofs += cnt + msg_len; } HA_ATOMIC_INC(b_peek(buf, ofs)); last_ofs = b_tail_ofs(buf); sft->ofs = b_peek_ofs(buf, ofs); HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); if (ret) { /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); ofs = b_tail_ofs(buf); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); if (ofs != last_ofs) { /* more data was added into the ring between the * unlock and the lock, and the writer might not * have seen us. We need to reschedule a read. */ applet_have_more_data(appctx); } else applet_have_no_more_data(appctx); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); out: /* always drain data from server */ co_skip(sc_oc(sc), sc_oc(sc)->output); return; close: se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); goto out; } void __sink_forward_session_deinit(struct sink_forward_target *sft) { struct sink *sink; sink = sft->sink; if (!sink) return; HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.ring->lock); LIST_DEL_INIT(&sft->appctx->wait_entry); HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.ring->lock); sft->appctx = NULL; task_wakeup(sink->forward_task, TASK_WOKEN_MSG); } static int sink_forward_session_init(struct appctx *appctx) { struct sink_forward_target *sft = appctx->svcctx; struct stream *s; struct sockaddr_storage *addr = NULL; if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr))) goto out_error; /* srv port should be learned from srv->svc_port not from srv->addr */ set_host_port(addr, sft->srv->svc_port); if (appctx_finalize_startup(appctx, sft->srv->proxy, &BUF_NULL) == -1) goto out_free_addr; s = appctx_strm(appctx); s->scb->dst = addr; s->scb->flags |= (SC_FL_RCV_ONCE|SC_FL_NOLINGER); s->target = &sft->srv->obj_type; s->flags = SF_ASSIGNED; s->do_log = NULL; s->uniq_id = 0; applet_expect_no_data(appctx); sft->appctx = appctx; return 0; out_free_addr: sockaddr_free(&addr); out_error: return -1; } static void sink_forward_session_release(struct appctx *appctx) { struct sink_forward_target *sft = appctx->svcctx; if (!sft) return; HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (sft->appctx == appctx) __sink_forward_session_deinit(sft); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); } static struct applet sink_forward_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = sink_forward_io_handler, .init = sink_forward_session_init, .release = sink_forward_session_release, }; static struct applet sink_forward_oc_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = sink_forward_oc_io_handler, .init = sink_forward_session_init, .release = sink_forward_session_release, }; /* * Create a new peer session in assigned state (connect will start automatically) * It sets its context into appctx->svcctx. */ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft) { struct appctx *appctx; struct applet *applet = &sink_forward_applet; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; appctx = appctx_new_here(applet, NULL); if (!appctx) goto out_close; appctx->svcctx = (void *)sft; if (appctx_init(appctx) == -1) goto out_free_appctx; return appctx; /* Error unrolling */ out_free_appctx: appctx_free_on_early_error(appctx); out_close: return NULL; } /* * Task to handle connections to forward servers */ static struct task *process_sink_forward(struct task * task, void *context, unsigned int state) { struct sink *sink = (struct sink *)context; struct sink_forward_target *sft = sink->sft; task->expire = TICK_ETERNITY; if (!stopping) { while (sft) { HA_SPIN_LOCK(SFT_LOCK, &sft->lock); /* if appctx is NULL, start a new session */ if (!sft->appctx) sft->appctx = sink_forward_session_create(sink, sft); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); sft = sft->next; } } else { while (sft) { HA_SPIN_LOCK(SFT_LOCK, &sft->lock); /* awake applet to perform a clean close */ if (sft->appctx) appctx_wakeup(sft->appctx); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); sft = sft->next; } } return task; } /* * Init task to manage connections to forward servers * * returns 0 in case of error. */ int sink_init_forward(struct sink *sink) { sink->forward_task = task_new_anywhere(); if (!sink->forward_task) return 0; sink->forward_task->process = process_sink_forward; sink->forward_task->context = (void *)sink; sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0); task_wakeup(sink->forward_task, TASK_WOKEN_INIT); return 1; } /* This tries to rotate a file-backed ring, but only if it contains contents. * This way empty rings will not cause backups to be overwritten and it's safe * to reload multiple times. That's only best effort, failures are silently * ignored. */ void sink_rotate_file_backed_ring(const char *name) { struct ring ring; char *oldback; int ret; int fd; fd = open(name, O_RDONLY); if (fd < 0) return; /* check for contents validity */ ret = read(fd, &ring, sizeof(ring)); close(fd); if (ret != sizeof(ring)) goto rotate; /* contents are present, we want to keep them => rotate. Note that * an empty ring buffer has one byte (the marker). */ if (ring.buf.data > 1) goto rotate; /* nothing to keep, let's scratch the file and preserve the backup */ return; rotate: oldback = NULL; memprintf(&oldback, "%s.bak", name); if (oldback) { /* try to rename any possibly existing ring file to * ".bak" and delete remains of older ones. This will * ensure we don't wipe useful debug info upon restart. */ unlink(oldback); if (rename(name, oldback) < 0) unlink(oldback); ha_free(&oldback); } } /* helper function to completely deallocate a sink struct */ static void sink_free(struct sink *sink) { struct sink_forward_target *sft_next; if (!sink) return; if (sink->type == SINK_TYPE_BUFFER) { if (sink->store) { size_t size = (sink->ctx.ring->buf.size + 4095UL) & -4096UL; void *area = (sink->ctx.ring->buf.area - sizeof(*sink->ctx.ring)); msync(area, size, MS_SYNC); munmap(area, size); ha_free(&sink->store); } else ring_free(sink->ctx.ring); } LIST_DEL_INIT(&sink->sink_list); // remove from parent list task_destroy(sink->forward_task); free_proxy(sink->forward_px); ha_free(&sink->name); ha_free(&sink->desc); while (sink->sft) { sft_next = sink->sft->next; ha_free(&sink->sft); sink->sft = sft_next; } ha_free(&sink); } /* Helper function to create new high-level ring buffer (as in ring section from * the config): will create a new sink of buf type, and a new forward proxy, * which will be stored in forward_px to know that the sink is responsible for * it. * * Returns NULL on failure */ static struct sink *sink_new_ringbuf(const char *id, const char *description, const char *file, int linenum, char **err_msg) { struct sink *sink; struct proxy *p = NULL; // forward_px /* allocate new proxy to handle forwards */ p = calloc(1, sizeof(*p)); if (!p) { memprintf(err_msg, "out of memory"); goto err; } init_new_proxy(p); sink_setup_proxy(p); p->id = strdup(id); p->conf.args.file = p->conf.file = strdup(file); p->conf.args.line = p->conf.line = linenum; sink = sink_new_buf(id, description, LOG_FORMAT_RAW, BUFSIZE); if (!sink) { memprintf(err_msg, "unable to create a new sink buffer for ring '%s'", id); goto err; } /* link sink to proxy */ sink->forward_px = p; return sink; err: free_proxy(p); return NULL; } /* helper function: add a new server to an existing sink * * Returns 1 on success and 0 on failure */ static int sink_add_srv(struct sink *sink, struct server *srv) { struct sink_forward_target *sft; /* allocate new sink_forward_target descriptor */ sft = calloc(1, sizeof(*sft)); if (!sft) { ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n", srv->id, sink->name); return 0; } sft->srv = srv; sft->appctx = NULL; sft->ofs = ~0; /* init ring offset */ sft->sink = sink; sft->next = sink->sft; HA_SPIN_INIT(&sft->lock); /* mark server attached to the ring */ if (!ring_attach(sink->ctx.ring)) { ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, sink->name); ha_free(&sft); return 0; } sink->sft = sft; return 1; } /* Finalize sink struct to ensure configuration consistency and * allocate final struct members * * Returns ERR_NONE on success, ERR_WARN on warning * Returns a composition of ERR_ALERT, ERR_ABORT, ERR_FATAL on failure */ static int sink_finalize(struct sink *sink) { int err_code = ERR_NONE; struct server *srv; if (sink && (sink->type == SINK_TYPE_BUFFER)) { if (!sink->maxlen) sink->maxlen = ~0; // maxlen not set: no implicit truncation else if (sink->maxlen > ring_max_payload(sink->ctx.ring)) { /* maxlen set by user however it doesn't fit: set to max value */ ha_warning("ring '%s' event max length '%u' exceeds max payload size, forced to '%lu'.\n", sink->name, sink->maxlen, (unsigned long)ring_max_payload(sink->ctx.ring)); sink->maxlen = ring_max_payload(sink->ctx.ring); err_code |= ERR_WARN; } /* prepare forward server descriptors */ if (sink->forward_px) { /* sink proxy is set: register all servers from the proxy */ srv = sink->forward_px->srv; while (srv) { if (!sink_add_srv(sink, srv)) { err_code |= ERR_ALERT | ERR_FATAL; break; } srv = srv->next; } } /* init forwarding if at least one sft is registered */ if (sink->sft && sink_init_forward(sink) == 0) { ha_alert("error when trying to initialize sink buffer forwarding.\n"); err_code |= ERR_ALERT | ERR_FATAL; } } return err_code; } /* * Parse "ring" section and create corresponding sink buffer. * * The function returns 0 in success case, otherwise, it returns error * flags. */ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) { int err_code = 0; char *err_msg = NULL; const char *inv; if (strcmp(args[0], "ring") == 0) { /* new ring section */ if (!*args[1]) { ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } inv = invalid_char(args[1]); if (inv) { ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (sink_find(args[1])) { ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } cfg_sink = sink_new_ringbuf(args[1], args[1], file, linenum, &err_msg); if (!cfg_sink) { ha_alert("parsing [%s:%d] : %s.\n", file, linenum, err_msg); ha_free(&err_msg); err_code |= ERR_ALERT | ERR_FATAL; goto err; } /* set maxlen value to 0 for now, we rely on this in postparsing * to know if it was explicitly set using the "maxlen" parameter */ cfg_sink->maxlen = 0; } else if (strcmp(args[0], "size") == 0) { size_t size; if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } size = atol(args[1]); if (!size) { ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (cfg_sink->store) { ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (size < cfg_sink->ctx.ring->buf.size) { ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n", file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); err_code |= ERR_WARN; goto err; } if (!ring_resize(cfg_sink->ctx.ring, size)) { ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); err_code |= ERR_ALERT | ERR_FATAL; goto err; } } else if (strcmp(args[0], "backing-file") == 0) { /* This tries to mmap file for size and to use it as a backing store * for ring . Existing data are delete. NULL is returned on error. */ const char *backing = args[1]; size_t size; void *area; int fd; if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { ha_alert("parsing [%s:%d] : 'backing-file' only usable with existing rings.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (cfg_sink->store) { ha_alert("parsing [%s:%d] : 'backing-file' already specified for ring '%s' (was '%s').\n", file, linenum, cfg_sink->name, cfg_sink->store); err_code |= ERR_ALERT | ERR_FATAL; goto err; } /* let's check if the file exists and is not empty. That's the * only condition under which we'll trigger a rotate, so that * config checks, reloads, or restarts that don't emit anything * do not rotate it again. */ sink_rotate_file_backed_ring(backing); fd = open(backing, O_RDWR | O_CREAT, 0600); if (fd < 0) { ha_alert("parsing [%s:%d] : cannot open backing-file '%s' for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno)); err_code |= ERR_ALERT | ERR_FATAL; goto err; } size = (cfg_sink->ctx.ring->buf.size + 4095UL) & -4096UL; if (ftruncate(fd, size) != 0) { close(fd); ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno)); err_code |= ERR_ALERT | ERR_FATAL; goto err; } area = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (area == MAP_FAILED) { close(fd); ha_alert("parsing [%s:%d] : failed to use '%s' as a backing file for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno)); err_code |= ERR_ALERT | ERR_FATAL; goto err; } /* we don't need the file anymore */ close(fd); cfg_sink->store = strdup(backing); /* never fails */ ring_free(cfg_sink->ctx.ring); cfg_sink->ctx.ring = ring_make_from_area(area, size); } else if (strcmp(args[0],"server") == 0) { if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { ha_alert("parsing [%s:%d] : unable to create server '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE); } else if (strcmp(args[0],"timeout") == 0) { if (!cfg_sink || !cfg_sink->forward_px) { ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (strcmp(args[1], "connect") == 0 || strcmp(args[1], "server") == 0) { const char *res; unsigned int tout; if (!*args[2]) { ha_alert("parsing [%s:%d] : '%s %s' expects