summaryrefslogtreecommitdiffstats
path: root/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket.c')
-rw-r--r--src/socket.c471
1 files changed, 471 insertions, 0 deletions
diff --git a/src/socket.c b/src/socket.c
new file mode 100644
index 0000000..dad8e93
--- /dev/null
+++ b/src/socket.c
@@ -0,0 +1,471 @@
+/*
+ * Copyright (c) 2019, Redis Labs
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "connhelpers.h"
+
+/* The connections module provides a lean abstraction of network connections
+ * to avoid direct socket and async event management across the Redis code base.
+ *
+ * It does NOT provide advanced connection features commonly found in similar
+ * libraries such as complete in/out buffer management, throttling, etc. These
+ * functions remain in networking.c.
+ *
+ * The primary goal is to allow transparent handling of TCP and TLS based
+ * connections. To do so, connections have the following properties:
+ *
+ * 1. A connection may live before its corresponding socket exists. This
+ * allows various context and configuration setting to be handled before
+ * establishing the actual connection.
+ * 2. The caller may register/unregister logical read/write handlers to be
+ * called when the connection has data to read from/can accept writes.
+ * These logical handlers may or may not correspond to actual AE events,
+ * depending on the implementation (for TCP they are; for TLS they aren't).
+ */
+
+static ConnectionType CT_Socket;
+
+/* When a connection is created we must know its type already, but the
+ * underlying socket may or may not exist:
+ *
+ * - For accepted connections, it exists as we do not model the listen/accept
+ * part; So caller calls connCreateSocket() followed by connAccept().
+ * - For outgoing connections, the socket is created by the connection module
+ * itself; So caller calls connCreateSocket() followed by connConnect(),
+ * which registers a connect callback that fires on connected/error state
+ * (and after any transport level handshake was done).
+ *
+ * NOTE: An earlier version relied on connections being part of other structs
+ * and not independently allocated. This could lead to further optimizations
+ * like using container_of(), etc. However it was discontinued in favor of
+ * this approach for these reasons:
+ *
+ * 1. In some cases conns are created/handled outside the context of the
+ * containing struct, in which case it gets a bit awkward to copy them.
+ * 2. Future implementations may wish to allocate arbitrary data for the
+ * connection.
+ * 3. The container_of() approach is anyway risky because connections may
+ * be embedded in different structs, not just client.
+ */
+
+static connection *connCreateSocket(void) {
+ connection *conn = zcalloc(sizeof(connection));
+ conn->type = &CT_Socket;
+ conn->fd = -1;
+ conn->iovcnt = IOV_MAX;
+
+ return conn;
+}
+
+/* Create a new socket-type connection that is already associated with
+ * an accepted connection.
+ *
+ * The socket is not ready for I/O until connAccept() was called and
+ * invoked the connection-level accept handler.
+ *
+ * Callers should use connGetState() and verify the created connection
+ * is not in an error state (which is not possible for a socket connection,
+ * but could but possible with other protocols).
+ */
+static connection *connCreateAcceptedSocket(int fd, void *priv) {
+ UNUSED(priv);
+ connection *conn = connCreateSocket();
+ conn->fd = fd;
+ conn->state = CONN_STATE_ACCEPTING;
+ return conn;
+}
+
+static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr,
+ ConnectionCallbackFunc connect_handler) {
+ int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr);
+ if (fd == -1) {
+ conn->state = CONN_STATE_ERROR;
+ conn->last_errno = errno;
+ return C_ERR;
+ }
+
+ conn->fd = fd;
+ conn->state = CONN_STATE_CONNECTING;
+
+ conn->conn_handler = connect_handler;
+ aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE,
+ conn->type->ae_handler, conn);
+
+ return C_OK;
+}
+
+/* ------ Pure socket connections ------- */
+
+/* A very incomplete list of implementation-specific calls. Much of the above shall
+ * move here as we implement additional connection types.
+ */
+
+static void connSocketShutdown(connection *conn) {
+ if (conn->fd == -1) return;
+
+ shutdown(conn->fd, SHUT_RDWR);
+}
+
+/* Close the connection and free resources. */
+static void connSocketClose(connection *conn) {
+ if (conn->fd != -1) {
+ aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE);
+ close(conn->fd);
+ conn->fd = -1;
+ }
+
+ /* If called from within a handler, schedule the close but
+ * keep the connection until the handler returns.
+ */
+ if (connHasRefs(conn)) {
+ conn->flags |= CONN_FLAG_CLOSE_SCHEDULED;
+ return;
+ }
+
+ zfree(conn);
+}
+
+static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
+ int ret = write(conn->fd, data, data_len);
+ if (ret < 0 && errno != EAGAIN) {
+ conn->last_errno = errno;
+
+ /* Don't overwrite the state of a connection that is not already
+ * connected, not to mess with handler callbacks.
+ */
+ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
+ conn->state = CONN_STATE_ERROR;
+ }
+
+ return ret;
+}
+
+static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) {
+ int ret = writev(conn->fd, iov, iovcnt);
+ if (ret < 0 && errno != EAGAIN) {
+ conn->last_errno = errno;
+
+ /* Don't overwrite the state of a connection that is not already
+ * connected, not to mess with handler callbacks.
+ */
+ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
+ conn->state = CONN_STATE_ERROR;
+ }
+
+ return ret;
+}
+
+static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
+ int ret = read(conn->fd, buf, buf_len);
+ if (!ret) {
+ conn->state = CONN_STATE_CLOSED;
+ } else if (ret < 0 && errno != EAGAIN) {
+ conn->last_errno = errno;
+
+ /* Don't overwrite the state of a connection that is not already
+ * connected, not to mess with handler callbacks.
+ */
+ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
+ conn->state = CONN_STATE_ERROR;
+ }
+
+ return ret;
+}
+
+static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
+ int ret = C_OK;
+
+ if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
+ conn->state = CONN_STATE_CONNECTED;
+
+ connIncrRefs(conn);
+ if (!callHandler(conn, accept_handler)) ret = C_ERR;
+ connDecrRefs(conn);
+
+ return ret;
+}
+
+/* Register a write handler, to be called when the connection is writable.
+ * If NULL, the existing handler is removed.
+ *
+ * The barrier flag indicates a write barrier is requested, resulting with
+ * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is
+ * always called before and not after the read handler in a single event
+ * loop.
+ */
+static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
+ if (func == conn->write_handler) return C_OK;
+
+ conn->write_handler = func;
+ if (barrier)
+ conn->flags |= CONN_FLAG_WRITE_BARRIER;
+ else
+ conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
+ if (!conn->write_handler)
+ aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
+ else
+ if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
+ conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
+ return C_OK;
+}
+
+/* Register a read handler, to be called when the connection is readable.
+ * If NULL, the existing handler is removed.
+ */
+static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
+ if (func == conn->read_handler) return C_OK;
+
+ conn->read_handler = func;
+ if (!conn->read_handler)
+ aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
+ else
+ if (aeCreateFileEvent(server.el,conn->fd,
+ AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
+ return C_OK;
+}
+
+static const char *connSocketGetLastError(connection *conn) {
+ return strerror(conn->last_errno);
+}
+
+static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
+{
+ UNUSED(el);
+ UNUSED(fd);
+ connection *conn = clientData;
+
+ if (conn->state == CONN_STATE_CONNECTING &&
+ (mask & AE_WRITABLE) && conn->conn_handler) {
+
+ int conn_error = anetGetError(conn->fd);
+ if (conn_error) {
+ conn->last_errno = conn_error;
+ conn->state = CONN_STATE_ERROR;
+ } else {
+ conn->state = CONN_STATE_CONNECTED;
+ }
+
+ if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
+
+ if (!callHandler(conn, conn->conn_handler)) return;
+ conn->conn_handler = NULL;
+ }
+
+ /* Normally we execute the readable event first, and the writable
+ * event later. This is useful as sometimes we may be able
+ * to serve the reply of a query immediately after processing the
+ * query.
+ *
+ * However if WRITE_BARRIER is set in the mask, our application is
+ * asking us to do the reverse: never fire the writable event
+ * after the readable. In such a case, we invert the calls.
+ * This is useful when, for instance, we want to do things
+ * in the beforeSleep() hook, like fsync'ing a file to disk,
+ * before replying to a client. */
+ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
+
+ int call_write = (mask & AE_WRITABLE) && conn->write_handler;
+ int call_read = (mask & AE_READABLE) && conn->read_handler;
+
+ /* Handle normal I/O flows */
+ if (!invert && call_read) {
+ if (!callHandler(conn, conn->read_handler)) return;
+ }
+ /* Fire the writable event. */
+ if (call_write) {
+ if (!callHandler(conn, conn->write_handler)) return;
+ }
+ /* If we have to invert the call, fire the readable event now
+ * after the writable one. */
+ if (invert && call_read) {
+ if (!callHandler(conn, conn->read_handler)) return;
+ }
+}
+
+static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
+ char cip[NET_IP_STR_LEN];
+ UNUSED(el);
+ UNUSED(mask);
+ UNUSED(privdata);
+
+ while(max--) {
+ cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
+ if (cfd == ANET_ERR) {
+ if (errno != EWOULDBLOCK)
+ serverLog(LL_WARNING,
+ "Accepting client connection: %s", server.neterr);
+ return;
+ }
+ serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
+ acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
+ }
+}
+
+static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
+ if (anetFdToString(conn->fd, ip, ip_len, port, remote) == 0)
+ return C_OK;
+
+ conn->last_errno = errno;
+ return C_ERR;
+}
+
+static int connSocketIsLocal(connection *conn) {
+ char cip[NET_IP_STR_LEN + 1] = { 0 };
+
+ if (connSocketAddr(conn, cip, sizeof(cip) - 1, NULL, 1) == C_ERR)
+ return -1;
+
+ return !strncmp(cip, "127.", 4) || !strcmp(cip, "::1");
+}
+
+static int connSocketListen(connListener *listener) {
+ return listenToPort(listener);
+}
+
+static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
+ int fd = anetTcpNonBlockConnect(NULL,addr,port);
+ if (fd == -1) {
+ conn->state = CONN_STATE_ERROR;
+ conn->last_errno = errno;
+ return C_ERR;
+ }
+
+ if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) {
+ conn->state = CONN_STATE_ERROR;
+ conn->last_errno = ETIMEDOUT;
+ }
+
+ conn->fd = fd;
+ conn->state = CONN_STATE_CONNECTED;
+ return C_OK;
+}
+
+/* Connection-based versions of syncio.c functions.
+ * NOTE: This should ideally be refactored out in favor of pure async work.
+ */
+
+static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
+ return syncWrite(conn->fd, ptr, size, timeout);
+}
+
+static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
+ return syncRead(conn->fd, ptr, size, timeout);
+}
+
+static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
+ return syncReadLine(conn->fd, ptr, size, timeout);
+}
+
+static const char *connSocketGetType(connection *conn) {
+ (void) conn;
+
+ return CONN_TYPE_SOCKET;
+}
+
+static ConnectionType CT_Socket = {
+ /* connection type */
+ .get_type = connSocketGetType,
+
+ /* connection type initialize & finalize & configure */
+ .init = NULL,
+ .cleanup = NULL,
+ .configure = NULL,
+
+ /* ae & accept & listen & error & address handler */
+ .ae_handler = connSocketEventHandler,
+ .accept_handler = connSocketAcceptHandler,
+ .addr = connSocketAddr,
+ .is_local = connSocketIsLocal,
+ .listen = connSocketListen,
+
+ /* create/shutdown/close connection */
+ .conn_create = connCreateSocket,
+ .conn_create_accepted = connCreateAcceptedSocket,
+ .shutdown = connSocketShutdown,
+ .close = connSocketClose,
+
+ /* connect & accept */
+ .connect = connSocketConnect,
+ .blocking_connect = connSocketBlockingConnect,
+ .accept = connSocketAccept,
+
+ /* IO */
+ .write = connSocketWrite,
+ .writev = connSocketWritev,
+ .read = connSocketRead,
+ .set_write_handler = connSocketSetWriteHandler,
+ .set_read_handler = connSocketSetReadHandler,
+ .get_last_error = connSocketGetLastError,
+ .sync_write = connSocketSyncWrite,
+ .sync_read = connSocketSyncRead,
+ .sync_readline = connSocketSyncReadLine,
+
+ /* pending data */
+ .has_pending_data = NULL,
+ .process_pending_data = NULL,
+};
+
+int connBlock(connection *conn) {
+ if (conn->fd == -1) return C_ERR;
+ return anetBlock(NULL, conn->fd);
+}
+
+int connNonBlock(connection *conn) {
+ if (conn->fd == -1) return C_ERR;
+ return anetNonBlock(NULL, conn->fd);
+}
+
+int connEnableTcpNoDelay(connection *conn) {
+ if (conn->fd == -1) return C_ERR;
+ return anetEnableTcpNoDelay(NULL, conn->fd);
+}
+
+int connDisableTcpNoDelay(connection *conn) {
+ if (conn->fd == -1) return C_ERR;
+ return anetDisableTcpNoDelay(NULL, conn->fd);
+}
+
+int connKeepAlive(connection *conn, int interval) {
+ if (conn->fd == -1) return C_ERR;
+ return anetKeepAlive(NULL, conn->fd, interval);
+}
+
+int connSendTimeout(connection *conn, long long ms) {
+ return anetSendTimeout(NULL, conn->fd, ms);
+}
+
+int connRecvTimeout(connection *conn, long long ms) {
+ return anetRecvTimeout(NULL, conn->fd, ms);
+}
+
+int RedisRegisterConnectionTypeSocket(void)
+{
+ return connTypeRegister(&CT_Socket);
+}