/* * Copyright (c) 2019, Redis Labs * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "connhelpers.h" /* The connections module provides a lean abstraction of network connections * to avoid direct socket and async event management across the Redis code base. * * It does NOT provide advanced connection features commonly found in similar * libraries such as complete in/out buffer management, throttling, etc. These * functions remain in networking.c. * * The primary goal is to allow transparent handling of TCP and TLS based * connections. To do so, connections have the following properties: * * 1. A connection may live before its corresponding socket exists. This * allows various context and configuration setting to be handled before * establishing the actual connection. * 2. The caller may register/unregister logical read/write handlers to be * called when the connection has data to read from/can accept writes. * These logical handlers may or may not correspond to actual AE events, * depending on the implementation (for TCP they are; for TLS they aren't). */ static ConnectionType CT_Socket; /* When a connection is created we must know its type already, but the * underlying socket may or may not exist: * * - For accepted connections, it exists as we do not model the listen/accept * part; So caller calls connCreateSocket() followed by connAccept(). * - For outgoing connections, the socket is created by the connection module * itself; So caller calls connCreateSocket() followed by connConnect(), * which registers a connect callback that fires on connected/error state * (and after any transport level handshake was done). * * NOTE: An earlier version relied on connections being part of other structs * and not independently allocated. This could lead to further optimizations * like using container_of(), etc. However it was discontinued in favor of * this approach for these reasons: * * 1. In some cases conns are created/handled outside the context of the * containing struct, in which case it gets a bit awkward to copy them. * 2. Future implementations may wish to allocate arbitrary data for the * connection. * 3. The container_of() approach is anyway risky because connections may * be embedded in different structs, not just client. */ static connection *connCreateSocket(void) { connection *conn = zcalloc(sizeof(connection)); conn->type = &CT_Socket; conn->fd = -1; conn->iovcnt = IOV_MAX; return conn; } /* Create a new socket-type connection that is already associated with * an accepted connection. * * The socket is not ready for I/O until connAccept() was called and * invoked the connection-level accept handler. * * Callers should use connGetState() and verify the created connection * is not in an error state (which is not possible for a socket connection, * but could but possible with other protocols). */ static connection *connCreateAcceptedSocket(int fd, void *priv) { UNUSED(priv); connection *conn = connCreateSocket(); conn->fd = fd; conn->state = CONN_STATE_ACCEPTING; return conn; } static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr, ConnectionCallbackFunc connect_handler) { int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr); if (fd == -1) { conn->state = CONN_STATE_ERROR; conn->last_errno = errno; return C_ERR; } conn->fd = fd; conn->state = CONN_STATE_CONNECTING; conn->conn_handler = connect_handler; aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn); return C_OK; } /* ------ Pure socket connections ------- */ /* A very incomplete list of implementation-specific calls. Much of the above shall * move here as we implement additional connection types. */ static void connSocketShutdown(connection *conn) { if (conn->fd == -1) return; shutdown(conn->fd, SHUT_RDWR); } /* Close the connection and free resources. */ static void connSocketClose(connection *conn) { if (conn->fd != -1) { aeDeleteFileEvent(server.el,conn->fd, AE_READABLE | AE_WRITABLE); close(conn->fd); conn->fd = -1; } /* If called from within a handler, schedule the close but * keep the connection until the handler returns. */ if (connHasRefs(conn)) { conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; return; } zfree(conn); } static int connSocketWrite(connection *conn, const void *data, size_t data_len) { int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; /* Don't overwrite the state of a connection that is not already * connected, not to mess with handler callbacks. */ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR; } return ret; } static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) { int ret = writev(conn->fd, iov, iovcnt); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; /* Don't overwrite the state of a connection that is not already * connected, not to mess with handler callbacks. */ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR; } return ret; } static int connSocketRead(connection *conn, void *buf, size_t buf_len) { int ret = read(conn->fd, buf, buf_len); if (!ret) { conn->state = CONN_STATE_CLOSED; } else if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; /* Don't overwrite the state of a connection that is not already * connected, not to mess with handler callbacks. */ if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR; } return ret; } static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { int ret = C_OK; if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; conn->state = CONN_STATE_CONNECTED; connIncrRefs(conn); if (!callHandler(conn, accept_handler)) ret = C_ERR; connDecrRefs(conn); return ret; } /* Register a write handler, to be called when the connection is writable. * If NULL, the existing handler is removed. * * The barrier flag indicates a write barrier is requested, resulting with * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is * always called before and not after the read handler in a single event * loop. */ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { if (func == conn->write_handler) return C_OK; conn->write_handler = func; if (barrier) conn->flags |= CONN_FLAG_WRITE_BARRIER; else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); else if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE, conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } /* Register a read handler, to be called when the connection is readable. * If NULL, the existing handler is removed. */ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { if (func == conn->read_handler) return C_OK; conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); else if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } static const char *connSocketGetLastError(connection *conn) { return strerror(conn->last_errno); } static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { UNUSED(el); UNUSED(fd); connection *conn = clientData; if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) { int conn_error = anetGetError(conn->fd); if (conn_error) { conn->last_errno = conn_error; conn->state = CONN_STATE_ERROR; } else { conn->state = CONN_STATE_CONNECTED; } if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; } /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if WRITE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsync'ing a file to disk, * before replying to a client. */ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler; /* Handle normal I/O flows */ if (!invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } /* Fire the writable event. */ if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } } static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip); } } static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { if (anetFdToString(conn->fd, ip, ip_len, port, remote) == 0) return C_OK; conn->last_errno = errno; return C_ERR; } static int connSocketIsLocal(connection *conn) { char cip[NET_IP_STR_LEN + 1] = { 0 }; if (connSocketAddr(conn, cip, sizeof(cip) - 1, NULL, 1) == C_ERR) return -1; return !strncmp(cip, "127.", 4) || !strcmp(cip, "::1"); } static int connSocketListen(connListener *listener) { return listenToPort(listener); } static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { int fd = anetTcpNonBlockConnect(NULL,addr,port); if (fd == -1) { conn->state = CONN_STATE_ERROR; conn->last_errno = errno; return C_ERR; } if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) { conn->state = CONN_STATE_ERROR; conn->last_errno = ETIMEDOUT; } conn->fd = fd; conn->state = CONN_STATE_CONNECTED; return C_OK; } /* Connection-based versions of syncio.c functions. * NOTE: This should ideally be refactored out in favor of pure async work. */ static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) { return syncWrite(conn->fd, ptr, size, timeout); } static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) { return syncRead(conn->fd, ptr, size, timeout); } static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) { return syncReadLine(conn->fd, ptr, size, timeout); } static const char *connSocketGetType(connection *conn) { (void) conn; return CONN_TYPE_SOCKET; } static ConnectionType CT_Socket = { /* connection type */ .get_type = connSocketGetType, /* connection type initialize & finalize & configure */ .init = NULL, .cleanup = NULL, .configure = NULL, /* ae & accept & listen & error & address handler */ .ae_handler = connSocketEventHandler, .accept_handler = connSocketAcceptHandler, .addr = connSocketAddr, .is_local = connSocketIsLocal, .listen = connSocketListen, /* create/shutdown/close connection */ .conn_create = connCreateSocket, .conn_create_accepted = connCreateAcceptedSocket, .shutdown = connSocketShutdown, .close = connSocketClose, /* connect & accept */ .connect = connSocketConnect, .blocking_connect = connSocketBlockingConnect, .accept = connSocketAccept, /* IO */ .write = connSocketWrite, .writev = connSocketWritev, .read = connSocketRead, .set_write_handler = connSocketSetWriteHandler, .set_read_handler = connSocketSetReadHandler, .get_last_error = connSocketGetLastError, .sync_write = connSocketSyncWrite, .sync_read = connSocketSyncRead, .sync_readline = connSocketSyncReadLine, /* pending data */ .has_pending_data = NULL, .process_pending_data = NULL, }; int connBlock(connection *conn) { if (conn->fd == -1) return C_ERR; return anetBlock(NULL, conn->fd); } int connNonBlock(connection *conn) { if (conn->fd == -1) return C_ERR; return anetNonBlock(NULL, conn->fd); } int connEnableTcpNoDelay(connection *conn) { if (conn->fd == -1) return C_ERR; return anetEnableTcpNoDelay(NULL, conn->fd); } int connDisableTcpNoDelay(connection *conn) { if (conn->fd == -1) return C_ERR; return anetDisableTcpNoDelay(NULL, conn->fd); } int connKeepAlive(connection *conn, int interval) { if (conn->fd == -1) return C_ERR; return anetKeepAlive(NULL, conn->fd, interval); } int connSendTimeout(connection *conn, long long ms) { return anetSendTimeout(NULL, conn->fd, ms); } int connRecvTimeout(connection *conn, long long ms) { return anetRecvTimeout(NULL, conn->fd, ms); } int RedisRegisterConnectionTypeSocket(void) { return connTypeRegister(&CT_Socket); }