diff options
Diffstat (limited to '')
-rw-r--r-- | ctdb/common/tmon.c | 602 |
1 files changed, 602 insertions, 0 deletions
diff --git a/ctdb/common/tmon.c b/ctdb/common/tmon.c new file mode 100644 index 0000000..04bad1f --- /dev/null +++ b/ctdb/common/tmon.c @@ -0,0 +1,602 @@ +/* + Trivial FD monitoring + + Copyright (C) Martin Schwenke & Amitay Isaacs, DataDirect Networks 2022 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" + +#include <ctype.h> + +#include "lib/util/blocking.h" +#include "lib/util/sys_rw.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/util.h" +#include "lib/util/smb_strtox.h" + +#include "lib/async_req/async_sock.h" + +#include "common/tmon.h" + + +enum tmon_message_type { + TMON_MSG_EXIT = 1, + TMON_MSG_ERRNO, + TMON_MSG_PING, + TMON_MSG_ASCII, + TMON_MSG_CUSTOM, +}; + +struct tmon_pkt { + enum tmon_message_type type; + uint16_t val; +}; + +struct tmon_buf { + uint8_t data[4]; +}; + +static void tmon_packet_push(struct tmon_pkt *pkt, + struct tmon_buf *buf) +{ + uint16_t type_n, val_n; + + type_n = htons(pkt->type); + val_n = htons(pkt->val); + memcpy(&buf->data[0], &type_n, 2); + memcpy(&buf->data[2], &val_n, 2); +} + +static void tmon_packet_pull(struct tmon_buf *buf, + struct tmon_pkt *pkt) +{ + uint16_t type_n, val_n; + + memcpy(&type_n, &buf->data[0], 2); + memcpy(&val_n, &buf->data[2], 2); + + pkt->type = ntohs(type_n); + pkt->val = ntohs(val_n); +} + +static int tmon_packet_write(int fd, struct tmon_pkt *pkt) +{ + struct tmon_buf buf; + ssize_t n; + + tmon_packet_push(pkt, &buf); + + n = sys_write(fd, &buf.data[0], sizeof(buf.data)); + if (n == -1) { + return errno; + } + return 0; +} + +bool tmon_set_exit(struct tmon_pkt *pkt) +{ + *pkt = (struct tmon_pkt) { + .type = TMON_MSG_EXIT, + }; + + return true; +} + +bool tmon_set_errno(struct tmon_pkt *pkt, int err) +{ + if (err <= 0 || err > UINT16_MAX) { + return false; + } + + *pkt = (struct tmon_pkt) { + .type = TMON_MSG_ERRNO, + .val = (uint16_t)err, + }; + + return true; +} + +bool tmon_set_ping(struct tmon_pkt *pkt) +{ + *pkt = (struct tmon_pkt) { + .type = TMON_MSG_PING, + }; + + return true; +} + +bool tmon_set_ascii(struct tmon_pkt *pkt, char c) +{ + if (!isascii(c)) { + return false; + } + + *pkt = (struct tmon_pkt) { + .type = TMON_MSG_ASCII, + .val = (uint16_t)c, + }; + + return true; +} + +bool tmon_set_custom(struct tmon_pkt *pkt, uint16_t val) +{ + *pkt = (struct tmon_pkt) { + .type = TMON_MSG_CUSTOM, + .val = val, + }; + + return true; +} + +static bool tmon_parse_exit(struct tmon_pkt *pkt) +{ + if (pkt->type != TMON_MSG_EXIT) { + return false; + } + if (pkt->val != 0) { + return false; + } + + return true; +} + +static bool tmon_parse_errno(struct tmon_pkt *pkt, int *err) +{ + if (pkt->type != TMON_MSG_ERRNO) { + return false; + } + *err= (int)pkt->val; + + return true; +} + +bool tmon_parse_ping(struct tmon_pkt *pkt) +{ + if (pkt->type != TMON_MSG_PING) { + return false; + } + if (pkt->val != 0) { + return false; + } + + return true; +} + +bool tmon_parse_ascii(struct tmon_pkt *pkt, char *c) +{ + if (pkt->type != TMON_MSG_ASCII) { + return false; + } + if (!isascii((int)pkt->val)) { + return false; + } + *c = (char)pkt->val; + + return true; +} + +bool tmon_parse_custom(struct tmon_pkt *pkt, uint16_t *val) +{ + if (pkt->type != TMON_MSG_CUSTOM) { + return false; + } + *val = pkt->val; + + return true; +} + +struct tmon_state { + int fd; + int direction; + struct tevent_context *ev; + bool monitor_close; + unsigned long write_interval; + unsigned long read_timeout; + struct tmon_actions actions; + struct tevent_timer *timer; + void *private_data; +}; + +static void tmon_readable(struct tevent_req *subreq); +static bool tmon_set_timeout(struct tevent_req *req, + struct tevent_context *ev); +static void tmon_timedout(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval now, + void *private_data); +static void tmon_write_loop(struct tevent_req *subreq); + +struct tevent_req *tmon_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, + int direction, + unsigned long read_timeout, + unsigned long write_interval, + struct tmon_actions *actions, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct tmon_state *state; + bool status; + + req = tevent_req_create(mem_ctx, &state, struct tmon_state); + if (req == NULL) { + return NULL; + } + + if (actions != NULL) { + /* If FD isn't readable then read actions are invalid */ + if (!(direction & TMON_FD_READ) && + (actions->timeout_callback != NULL || + actions->read_callback != NULL || + read_timeout != 0)) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + /* If FD isn't writeable then write actions are invalid */ + if (!(direction & TMON_FD_WRITE) && + (actions->write_callback != NULL || + write_interval != 0)) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + /* Can't specify write interval without a callback */ + if (state->write_interval != 0 && + state->actions.write_callback == NULL) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + } + + state->fd = fd; + state->direction = direction; + state->ev = ev; + state->write_interval = write_interval; + state->read_timeout = read_timeout; + state->private_data = private_data; + + if (actions != NULL) { + state->actions = *actions; + } + + status = set_close_on_exec(fd); + if (!status) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + + if (direction & TMON_FD_READ) { + subreq = wait_for_read_send(state, ev, fd, true); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, tmon_readable, req); + } + + if (state->read_timeout != 0) { + status = tmon_set_timeout(req, state->ev); + if (!status) { + tevent_req_error(req, ENOMEM); + return tevent_req_post(req, ev); + } + } + + if (state->write_interval != 0) { + subreq = tevent_wakeup_send( + state, + state->ev, + tevent_timeval_current_ofs(state->write_interval, 0)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, state->ev); + } + tevent_req_set_callback(subreq, tmon_write_loop, req); + } + + return req; +} + +static void tmon_readable(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct tmon_state *state = tevent_req_data( req, struct tmon_state); + struct tmon_buf buf; + struct tmon_pkt pkt; + ssize_t nread; + bool status; + int err; + int ret; + + status = wait_for_read_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (!status) { + if (ret == EPIPE && state->actions.close_callback != NULL) { + ret = state->actions.close_callback(state->private_data); + if (ret == TMON_STATUS_EXIT) { + ret = 0; + } + } + if (ret == 0) { + tevent_req_done(req); + } else { + tevent_req_error(req, ret); + } + return; + } + + nread = sys_read(state->fd, buf.data, sizeof(buf.data)); + if (nread == -1) { + tevent_req_error(req, errno); + return; + } + if (nread == 0) { + /* Can't happen, treat like EPIPE, above */ + tevent_req_error(req, EPIPE); + return; + } + if (nread != sizeof(buf.data)) { + tevent_req_error(req, EPROTO); + return; + } + + tmon_packet_pull(&buf, &pkt); + + switch (pkt.type) { + case TMON_MSG_EXIT: + status = tmon_parse_exit(&pkt); + if (!status) { + tevent_req_error(req, EPROTO); + return; + } + tevent_req_done(req); + return; + case TMON_MSG_ERRNO: + status = tmon_parse_errno(&pkt, &err); + if (!status) { + err = EPROTO; + } + tevent_req_error(req, err); + return; + default: + break; + } + + if (state->actions.read_callback == NULL) { + /* Shouldn't happen, other end should not write */ + tevent_req_error(req, EIO); + return; + } + ret = state->actions.read_callback(state->private_data, &pkt); + if (ret == TMON_STATUS_EXIT) { + tevent_req_done(req); + return; + } + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + subreq = wait_for_read_send(state, state->ev, state->fd, true); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, tmon_readable, req); + + /* Reset read timeout */ + if (state->read_timeout != 0) { + status = tmon_set_timeout(req, state->ev); + if (!status) { + tevent_req_error(req, ENOMEM); + return; + } + } +} + +static bool tmon_set_timeout(struct tevent_req *req, + struct tevent_context *ev) +{ + struct tmon_state *state = tevent_req_data( + req, struct tmon_state); + struct timeval endtime = + tevent_timeval_current_ofs(state->read_timeout, 0); + + TALLOC_FREE(state->timer); + + state->timer = tevent_add_timer(ev, req, endtime, tmon_timedout, req); + if (tevent_req_nomem(state->timer, req)) { + return false; + } + + return true; +} + +static void tmon_timedout(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval now, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct tmon_state *state = tevent_req_data(req, struct tmon_state); + int ret; + + TALLOC_FREE(state->timer); + + if (state->actions.timeout_callback != NULL) { + ret = state->actions.timeout_callback(state->private_data); + if (ret == TMON_STATUS_EXIT) { + ret = 0; + } + } else { + ret = ETIMEDOUT; + } + + if (ret == 0) { + tevent_req_done(req); + } else { + tevent_req_error(req, ret); + } +} + +static void tmon_write_loop(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct tmon_state *state = tevent_req_data( + req, struct tmon_state); + struct tmon_pkt pkt; + int ret; + bool status; + + status = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!status) { + /* Ignore error */ + } + + ret = state->actions.write_callback(state->private_data, &pkt); + if (ret == TMON_STATUS_EXIT) { + tevent_req_done(req); + return; + } + if (ret == TMON_STATUS_SKIP) { + goto done; + } + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + status = tmon_write(req, &pkt); + if (!status) { + return; + } + +done: + subreq = tevent_wakeup_send( + state, + state->ev, + tevent_timeval_current_ofs(state->write_interval, 0)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, tmon_write_loop, req); +} + +bool tmon_write(struct tevent_req *req, struct tmon_pkt *pkt) +{ + struct tmon_state *state = tevent_req_data( + req, struct tmon_state); + int ret; + + if (state->fd == -1) { + return false; + } + + if (!(state->direction & TMON_FD_WRITE)) { + tevent_req_error(req, EINVAL); + return false; + } + + ret = tmon_packet_write(state->fd, pkt); + if (ret != 0) { + if (ret == EPIPE && state->actions.close_callback != NULL) { + ret = state->actions.close_callback(state->private_data); + if (ret == TMON_STATUS_EXIT) { + ret = 0; + } + } + + if (ret == 0) { + tevent_req_done(req); + } else { + tevent_req_error(req, ret); + } + state->fd = -1; + return false; + } + + return true; +} + +bool tmon_recv(struct tevent_req *req, int *perr) +{ + if (tevent_req_is_unix_error(req, perr)) { + return false; + } + + return true; +} + +static int ping_writer(void *private_data, struct tmon_pkt *pkt) +{ + tmon_set_ping(pkt); + + return 0; +} + +static int ping_reader(void *private_data, struct tmon_pkt *pkt) +{ + bool status; + + /* Only expect pings */ + status = tmon_parse_ping(pkt); + if (!status) { + return EPROTO; + } + + return 0; +} + +struct tevent_req *tmon_ping_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, + int direction, + unsigned long timeout, + unsigned long interval) +{ + struct tevent_req *req; + struct tmon_actions actions = { + .write_callback = NULL, + }; + + if ((direction & TMON_FD_WRITE) && interval != 0) { + actions.write_callback = ping_writer; + } + if ((direction & TMON_FD_READ) && timeout != 0) { + actions.read_callback = ping_reader; + } + + req = tmon_send(mem_ctx, + ev, + fd, + direction, + timeout, + interval, + &actions, + NULL); + return req; +} + +bool tmon_ping_recv(struct tevent_req *req, int *perr) +{ + bool status; + + status = tmon_recv(req, perr); + + return status; +} |