diff options
Diffstat (limited to 'lib/async_req/async_sock.c')
-rw-r--r-- | lib/async_req/async_sock.c | 786 |
1 files changed, 786 insertions, 0 deletions
diff --git a/lib/async_req/async_sock.c b/lib/async_req/async_sock.c new file mode 100644 index 0000000..ae1d325 --- /dev/null +++ b/lib/async_req/async_sock.c @@ -0,0 +1,786 @@ +/* + Unix SMB/CIFS implementation. + async socket syscalls + Copyright (C) Volker Lendecke 2008 + + ** NOTE! The following LGPL license applies to the async_sock + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" +#include <talloc.h> +#include <tevent.h> +#include "lib/async_req/async_sock.h" +#include "lib/util/iov_buf.h" +#include "lib/util/util_net.h" + +/* Note: lib/util/ is currently GPL */ +#include "lib/util/tevent_unix.h" +#include "lib/util/samba_util.h" + +struct async_connect_state { + int fd; + struct tevent_fd *fde; + int result; + long old_sockflags; + socklen_t address_len; + struct sockaddr_storage address; + + void (*before_connect)(void *private_data); + void (*after_connect)(void *private_data); + void *private_data; +}; + +static void async_connect_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); +static void async_connect_connected(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *priv); + +/** + * @brief async version of connect(2) + * @param[in] mem_ctx The memory context to hang the result off + * @param[in] ev The event context to work from + * @param[in] fd The socket to recv from + * @param[in] address Where to connect? + * @param[in] address_len Length of *address + * @retval The async request + * + * This function sets the socket into non-blocking state to be able to call + * connect in an async state. This will be reset when the request is finished. + */ + +struct tevent_req *async_connect_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd, + const struct sockaddr *address, socklen_t address_len, + void (*before_connect)(void *private_data), + void (*after_connect)(void *private_data), + void *private_data) +{ + struct tevent_req *req; + struct async_connect_state *state; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct async_connect_state); + if (req == NULL) { + return NULL; + } + + /** + * We have to set the socket to nonblocking for async connect(2). Keep + * the old sockflags around. + */ + + state->fd = fd; + state->before_connect = before_connect; + state->after_connect = after_connect; + state->private_data = private_data; + + state->old_sockflags = fcntl(fd, F_GETFL, 0); + if (state->old_sockflags == -1) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + + tevent_req_set_cleanup_fn(req, async_connect_cleanup); + + state->address_len = address_len; + if (address_len > sizeof(state->address)) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + memcpy(&state->address, address, address_len); + + ret = set_blocking(fd, false); + if (ret == -1) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + + if (state->before_connect != NULL) { + state->before_connect(state->private_data); + } + + state->result = connect(fd, address, address_len); + + if (state->after_connect != NULL) { + state->after_connect(state->private_data); + } + + if (state->result == 0) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + /* + * The only errno indicating that an initial connect is still + * in flight is EINPROGRESS. + * + * This allows callers like open_socket_out_send() to reuse + * fds and call us with an fd for which the connect is still + * in flight. The proper thing to do for callers would be + * closing the fd and starting from scratch with a fresh + * socket. + */ + + if (errno != EINPROGRESS) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + + /* + * Note for historic reasons TEVENT_FD_WRITE is not enough + * to get notified for POLLERR or EPOLLHUP even if they + * come together with POLLOUT. That means we need to + * use TEVENT_FD_READ in addition until we have + * TEVENT_FD_ERROR. + */ + state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ|TEVENT_FD_WRITE, + async_connect_connected, req); + if (state->fde == NULL) { + tevent_req_error(req, ENOMEM); + return tevent_req_post(req, ev); + } + return req; +} + +static void async_connect_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct async_connect_state *state = + tevent_req_data(req, struct async_connect_state); + + TALLOC_FREE(state->fde); + if (state->fd != -1) { + int ret; + + ret = fcntl(state->fd, F_SETFL, state->old_sockflags); + if (ret == -1) { + abort(); + } + + state->fd = -1; + } +} + +/** + * fde event handler for connect(2) + * @param[in] ev The event context that sent us here + * @param[in] fde The file descriptor event associated with the connect + * @param[in] flags Indicate read/writeability of the socket + * @param[in] priv private data, "struct async_req *" in this case + */ + +static void async_connect_connected(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *priv) +{ + struct tevent_req *req = talloc_get_type_abort( + priv, struct tevent_req); + struct async_connect_state *state = + tevent_req_data(req, struct async_connect_state); + int ret; + int socket_error = 0; + socklen_t slen = sizeof(socket_error); + + ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR, + &socket_error, &slen); + + if (ret != 0) { + /* + * According to Stevens this is the Solaris behaviour + * in case the connection encountered an error: + * getsockopt() fails, error is in errno + */ + tevent_req_error(req, errno); + return; + } + + if (socket_error != 0) { + /* + * Berkeley derived implementations (including) Linux + * return the pending error via socket_error. + */ + tevent_req_error(req, socket_error); + return; + } + + tevent_req_done(req); + return; +} + +int async_connect_recv(struct tevent_req *req, int *perrno) +{ + int err = tevent_req_simple_recv_unix(req); + + if (err != 0) { + *perrno = err; + return -1; + } + + return 0; +} + +struct writev_state { + struct tevent_context *ev; + struct tevent_queue_entry *queue_entry; + int fd; + struct tevent_fd *fde; + struct iovec *iov; + int count; + size_t total_size; + uint16_t flags; + bool err_on_readability; +}; + +static void writev_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); +static bool writev_cancel(struct tevent_req *req); +static void writev_trigger(struct tevent_req *req, void *private_data); +static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data); + +struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct tevent_queue *queue, int fd, + bool err_on_readability, + struct iovec *iov, int count) +{ + struct tevent_req *req; + struct writev_state *state; + + req = tevent_req_create(mem_ctx, &state, struct writev_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->fd = fd; + state->total_size = 0; + state->count = count; + state->iov = (struct iovec *)talloc_memdup( + state, iov, sizeof(struct iovec) * count); + if (tevent_req_nomem(state->iov, req)) { + return tevent_req_post(req, ev); + } + state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ; + state->err_on_readability = err_on_readability; + + tevent_req_set_cleanup_fn(req, writev_cleanup); + tevent_req_set_cancel_fn(req, writev_cancel); + + if (queue == NULL) { + state->fde = tevent_add_fd(state->ev, state, state->fd, + state->flags, writev_handler, req); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + return req; + } + + /* + * writev_trigger tries a nonblocking write. If that succeeds, + * we can't directly notify the callback to call + * writev_recv. The callback would TALLOC_FREE(req) after + * calling writev_recv even before writev_trigger can inspect + * it for success. + */ + tevent_req_defer_callback(req, ev); + + state->queue_entry = tevent_queue_add_optimize_empty( + queue, ev, req, writev_trigger, NULL); + if (tevent_req_nomem(state->queue_entry, req)) { + return tevent_req_post(req, ev); + } + if (!tevent_req_is_in_progress(req)) { + return tevent_req_post(req, ev); + } + return req; +} + +static void writev_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct writev_state *state = tevent_req_data(req, struct writev_state); + + TALLOC_FREE(state->queue_entry); + TALLOC_FREE(state->fde); +} + +static bool writev_cancel(struct tevent_req *req) +{ + struct writev_state *state = tevent_req_data(req, struct writev_state); + + if (state->total_size > 0) { + /* + * We've already started to write :-( + */ + return false; + } + + TALLOC_FREE(state->queue_entry); + TALLOC_FREE(state->fde); + + tevent_req_defer_callback(req, state->ev); + tevent_req_error(req, ECANCELED); + return true; +} + +static void writev_do(struct tevent_req *req, struct writev_state *state) +{ + ssize_t written; + bool ok; + + written = writev(state->fd, state->iov, state->count); + if ((written == -1) && + ((errno == EINTR) || + (errno == EAGAIN) || + (errno == EWOULDBLOCK))) { + /* retry after going through the tevent loop */ + return; + } + if (written == -1) { + tevent_req_error(req, errno); + return; + } + if (written == 0) { + tevent_req_error(req, EPIPE); + return; + } + state->total_size += written; + + ok = iov_advance(&state->iov, &state->count, written); + if (!ok) { + tevent_req_error(req, EIO); + return; + } + + if (state->count == 0) { + tevent_req_done(req); + return; + } +} + +static void writev_trigger(struct tevent_req *req, void *private_data) +{ + struct writev_state *state = tevent_req_data(req, struct writev_state); + + state->queue_entry = NULL; + + writev_do(req, state); + if (!tevent_req_is_in_progress(req)) { + return; + } + + state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags, + writev_handler, req); + if (tevent_req_nomem(state->fde, req)) { + return; + } +} + +static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct writev_state *state = + tevent_req_data(req, struct writev_state); + + if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) { + int ret, value; + + if (state->err_on_readability) { + /* Readable and the caller wants an error on read. */ + tevent_req_error(req, EPIPE); + return; + } + + /* Might be an error. Check if there are bytes to read */ + ret = ioctl(state->fd, FIONREAD, &value); + /* FIXME - should we also check + for ret == 0 and value == 0 here ? */ + if (ret == -1) { + /* There's an error. */ + tevent_req_error(req, EPIPE); + return; + } + /* A request for TEVENT_FD_READ will succeed from now and + forevermore until the bytes are read so if there was + an error we'll wait until we do read, then get it in + the read callback function. Until then, remove TEVENT_FD_READ + from the flags we're waiting for. */ + state->flags &= ~TEVENT_FD_READ; + TEVENT_FD_NOT_READABLE(fde); + + /* If not writable, we're done. */ + if (!(flags & TEVENT_FD_WRITE)) { + return; + } + } + + writev_do(req, state); +} + +ssize_t writev_recv(struct tevent_req *req, int *perrno) +{ + struct writev_state *state = + tevent_req_data(req, struct writev_state); + ssize_t ret; + + if (tevent_req_is_unix_error(req, perrno)) { + tevent_req_received(req); + return -1; + } + ret = state->total_size; + tevent_req_received(req); + return ret; +} + +struct read_packet_state { + int fd; + struct tevent_fd *fde; + uint8_t *buf; + size_t nread; + ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data); + void *private_data; +}; + +static void read_packet_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); +static void read_packet_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *private_data); + +struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, size_t initial, + ssize_t (*more)(uint8_t *buf, + size_t buflen, + void *private_data), + void *private_data) +{ + struct tevent_req *req; + struct read_packet_state *state; + + req = tevent_req_create(mem_ctx, &state, struct read_packet_state); + if (req == NULL) { + return NULL; + } + state->fd = fd; + state->nread = 0; + state->more = more; + state->private_data = private_data; + + tevent_req_set_cleanup_fn(req, read_packet_cleanup); + + state->buf = talloc_array(state, uint8_t, initial); + if (tevent_req_nomem(state->buf, req)) { + return tevent_req_post(req, ev); + } + + state->fde = tevent_add_fd(ev, state, fd, + TEVENT_FD_READ, read_packet_handler, + req); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + return req; +} + +static void read_packet_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct read_packet_state *state = + tevent_req_data(req, struct read_packet_state); + + TALLOC_FREE(state->fde); +} + +static void read_packet_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct read_packet_state *state = + tevent_req_data(req, struct read_packet_state); + size_t total = talloc_get_size(state->buf); + ssize_t nread, more; + uint8_t *tmp; + + nread = recv(state->fd, state->buf+state->nread, total-state->nread, + 0); + if ((nread == -1) && (errno == ENOTSOCK)) { + nread = read(state->fd, state->buf+state->nread, + total-state->nread); + } + if ((nread == -1) && (errno == EINTR)) { + /* retry */ + return; + } + if (nread == -1) { + tevent_req_error(req, errno); + return; + } + if (nread == 0) { + tevent_req_error(req, EPIPE); + return; + } + + state->nread += nread; + if (state->nread < total) { + /* Come back later */ + return; + } + + /* + * We got what was initially requested. See if "more" asks for -- more. + */ + if (state->more == NULL) { + /* Nobody to ask, this is a async read_data */ + tevent_req_done(req); + return; + } + + more = state->more(state->buf, total, state->private_data); + if (more == -1) { + /* We got an invalid packet, tell the caller */ + tevent_req_error(req, EIO); + return; + } + if (more == 0) { + /* We're done, full packet received */ + tevent_req_done(req); + return; + } + + if (total + more < total) { + tevent_req_error(req, EMSGSIZE); + return; + } + + tmp = talloc_realloc(state, state->buf, uint8_t, total+more); + if (tevent_req_nomem(tmp, req)) { + return; + } + state->buf = tmp; +} + +ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + uint8_t **pbuf, int *perrno) +{ + struct read_packet_state *state = + tevent_req_data(req, struct read_packet_state); + + if (tevent_req_is_unix_error(req, perrno)) { + tevent_req_received(req); + return -1; + } + *pbuf = talloc_move(mem_ctx, &state->buf); + tevent_req_received(req); + return talloc_get_size(*pbuf); +} + +struct wait_for_read_state { + struct tevent_fd *fde; + int fd; + bool check_errors; +}; + +static void wait_for_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); +static void wait_for_read_done(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data); + +struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, int fd, + bool check_errors) +{ + struct tevent_req *req; + struct wait_for_read_state *state; + + req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state); + if (req == NULL) { + return NULL; + } + + tevent_req_set_cleanup_fn(req, wait_for_read_cleanup); + + state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, + wait_for_read_done, req); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + + state->fd = fd; + state->check_errors = check_errors; + return req; +} + +static void wait_for_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct wait_for_read_state *state = + tevent_req_data(req, struct wait_for_read_state); + + TALLOC_FREE(state->fde); +} + +static void wait_for_read_done(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct wait_for_read_state *state = + tevent_req_data(req, struct wait_for_read_state); + int ret, available; + + if ((flags & TEVENT_FD_READ) == 0) { + return; + } + + if (!state->check_errors) { + tevent_req_done(req); + return; + } + + ret = ioctl(state->fd, FIONREAD, &available); + + if ((ret == -1) && (errno == EINTR)) { + /* come back later */ + return; + } + + if (ret == -1) { + tevent_req_error(req, errno); + return; + } + + if (available == 0) { + tevent_req_error(req, EPIPE); + return; + } + + tevent_req_done(req); +} + +bool wait_for_read_recv(struct tevent_req *req, int *perr) +{ + int err = tevent_req_simple_recv_unix(req); + + if (err != 0) { + *perr = err; + return false; + } + + return true; +} + +struct accept_state { + struct tevent_fd *fde; + int listen_sock; + struct samba_sockaddr addr; + int sock; +}; + +static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data); + +struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + int listen_sock) +{ + struct tevent_req *req; + struct accept_state *state; + + req = tevent_req_create(mem_ctx, &state, struct accept_state); + if (req == NULL) { + return NULL; + } + + state->listen_sock = listen_sock; + + state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ, + accept_handler, req); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + return req; +} + +static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct accept_state *state = tevent_req_data(req, struct accept_state); + int ret; + + TALLOC_FREE(state->fde); + + if ((flags & TEVENT_FD_READ) == 0) { + tevent_req_error(req, EIO); + return; + } + + state->addr.sa_socklen = sizeof(state->addr.u); + + ret = accept(state->listen_sock, + &state->addr.u.sa, + &state->addr.sa_socklen); + if ((ret == -1) && (errno == EINTR)) { + /* retry */ + return; + } + if (ret == -1) { + tevent_req_error(req, errno); + return; + } + smb_set_close_on_exec(ret); + state->sock = ret; + tevent_req_done(req); +} + +int accept_recv(struct tevent_req *req, + int *listen_sock, + struct samba_sockaddr *paddr, + int *perr) +{ + struct accept_state *state = tevent_req_data(req, struct accept_state); + int sock = state->sock; + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + tevent_req_received(req); + return -1; + } + if (listen_sock != NULL) { + *listen_sock = state->listen_sock; + } + if (paddr != NULL) { + *paddr = state->addr; + } + tevent_req_received(req); + return sock; +} |