summaryrefslogtreecommitdiffstats
path: root/src/rio.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rio.c499
1 files changed, 499 insertions, 0 deletions
diff --git a/src/rio.c b/src/rio.c
new file mode 100644
index 0000000..bcda376
--- /dev/null
+++ b/src/rio.c
@@ -0,0 +1,499 @@
+/* rio.c is a simple stream-oriented I/O abstraction that provides an interface
+ * to write code that can consume/produce data using different concrete input
+ * and output devices. For instance the same rdb.c code using the rio
+ * abstraction can be used to read and write the RDB format using in-memory
+ * buffers or files.
+ *
+ * A rio object provides the following methods:
+ * read: read from stream.
+ * write: write to stream.
+ * tell: get the current offset.
+ *
+ * It is also possible to set a 'checksum' method that is used by rio.c in order
+ * to compute a checksum of the data written or read, or to query the rio object
+ * for the current checksum.
+ *
+ * ----------------------------------------------------------------------------
+ *
+ * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "fmacros.h"
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include "rio.h"
+#include "util.h"
+#include "crc64.h"
+#include "config.h"
+#include "server.h"
+
+/* ------------------------- Buffer I/O implementation ----------------------- */
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
+ r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
+ r->io.buffer.pos += len;
+ return 1;
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioBufferRead(rio *r, void *buf, size_t len) {
+ if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
+ return 0; /* not enough buffer to return len bytes. */
+ memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
+ r->io.buffer.pos += len;
+ return 1;
+}
+
+/* Returns read/write position in buffer. */
+static off_t rioBufferTell(rio *r) {
+ return r->io.buffer.pos;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioBufferFlush(rio *r) {
+ UNUSED(r);
+ return 1; /* Nothing to do, our write just appends to the buffer. */
+}
+
+static const rio rioBufferIO = {
+ rioBufferRead,
+ rioBufferWrite,
+ rioBufferTell,
+ rioBufferFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* flags */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithBuffer(rio *r, sds s) {
+ *r = rioBufferIO;
+ r->io.buffer.ptr = s;
+ r->io.buffer.pos = 0;
+}
+
+/* --------------------- Stdio file pointer implementation ------------------- */
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
+ if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp);
+
+ size_t nwritten = 0;
+ /* Incrementally write data to the file, avoid a single write larger than
+ * the autosync threshold (so that the kernel's buffer cache never has too
+ * many dirty pages at once). */
+ while (len != nwritten) {
+ serverAssert(r->io.file.autosync > r->io.file.buffered);
+ size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered);
+ size_t towrite = nalign > len-nwritten ? len-nwritten : nalign;
+
+ if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0;
+ nwritten += towrite;
+ r->io.file.buffered += towrite;
+
+ if (r->io.file.buffered >= r->io.file.autosync) {
+ fflush(r->io.file.fp);
+
+ size_t processed = r->processed_bytes + nwritten;
+ serverAssert(processed % r->io.file.autosync == 0);
+ serverAssert(r->io.file.buffered == r->io.file.autosync);
+
+#if HAVE_SYNC_FILE_RANGE
+ /* Start writeout asynchronously. */
+ if (sync_file_range(fileno(r->io.file.fp),
+ processed - r->io.file.autosync, r->io.file.autosync,
+ SYNC_FILE_RANGE_WRITE) == -1)
+ return 0;
+
+ if (processed >= (size_t)r->io.file.autosync * 2) {
+ /* To keep the promise to 'autosync', we should make sure last
+ * asynchronous writeout persists into disk. This call may block
+ * if last writeout is not finished since disk is slow. */
+ if (sync_file_range(fileno(r->io.file.fp),
+ processed - r->io.file.autosync*2,
+ r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE|
+ SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1)
+ return 0;
+ }
+#else
+ if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0;
+#endif
+ r->io.file.buffered = 0;
+ }
+ }
+ return 1;
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFileRead(rio *r, void *buf, size_t len) {
+ return fread(buf,len,1,r->io.file.fp);
+}
+
+/* Returns read/write position in file. */
+static off_t rioFileTell(rio *r) {
+ return ftello(r->io.file.fp);
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFileFlush(rio *r) {
+ return (fflush(r->io.file.fp) == 0) ? 1 : 0;
+}
+
+static const rio rioFileIO = {
+ rioFileRead,
+ rioFileWrite,
+ rioFileTell,
+ rioFileFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* flags */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithFile(rio *r, FILE *fp) {
+ *r = rioFileIO;
+ r->io.file.fp = fp;
+ r->io.file.buffered = 0;
+ r->io.file.autosync = 0;
+}
+
+/* ------------------- Connection implementation -------------------
+ * We use this RIO implementation when reading an RDB file directly from
+ * the connection to the memory via rdbLoadRio(), thus this implementation
+ * only implements reading from a connection that is, normally,
+ * just a socket. */
+
+static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
+ UNUSED(r);
+ UNUSED(buf);
+ UNUSED(len);
+ return 0; /* Error, this target does not yet support writing. */
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioConnRead(rio *r, void *buf, size_t len) {
+ size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
+
+ /* If the buffer is too small for the entire request: realloc. */
+ if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
+ r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
+
+ /* If the remaining unused buffer is not large enough: memmove so that we
+ * can read the rest. */
+ if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
+ sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
+ r->io.conn.pos = 0;
+ }
+
+ /* Make sure the caller didn't request to read past the limit.
+ * If they didn't we'll buffer till the limit, if they did, we'll
+ * return an error. */
+ if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) {
+ errno = EOVERFLOW;
+ return 0;
+ }
+
+ /* If we don't already have all the data in the sds, read more */
+ while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
+ size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
+ size_t needs = len - buffered;
+ /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
+ * the two. */
+ size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
+ if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
+ if (r->io.conn.read_limit != 0 &&
+ r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
+ {
+ toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
+ }
+ int retval = connRead(r->io.conn.conn,
+ (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
+ toread);
+ if (retval == 0) {
+ return 0;
+ } else if (retval < 0) {
+ if (connLastErrorRetryable(r->io.conn.conn)) continue;
+ if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0;
+ }
+ sdsIncrLen(r->io.conn.buf, retval);
+ }
+
+ memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
+ r->io.conn.read_so_far += len;
+ r->io.conn.pos += len;
+ return len;
+}
+
+/* Returns read/write position in file. */
+static off_t rioConnTell(rio *r) {
+ return r->io.conn.read_so_far;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioConnFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioConnWrite(r,NULL,0);
+}
+
+static const rio rioConnIO = {
+ rioConnRead,
+ rioConnWrite,
+ rioConnTell,
+ rioConnFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* flags */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+/* Create an RIO that implements a buffered read from an fd
+ * read_limit argument stops buffering when the reaching the limit. */
+void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
+ *r = rioConnIO;
+ r->io.conn.conn = conn;
+ r->io.conn.pos = 0;
+ r->io.conn.read_limit = read_limit;
+ r->io.conn.read_so_far = 0;
+ r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
+ sdsclear(r->io.conn.buf);
+}
+
+/* Release the RIO stream. Optionally returns the unread buffered data
+ * when the SDS pointer 'remaining' is passed. */
+void rioFreeConn(rio *r, sds *remaining) {
+ if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
+ if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
+ *remaining = r->io.conn.buf;
+ } else {
+ sdsfree(r->io.conn.buf);
+ if (remaining) *remaining = NULL;
+ }
+ r->io.conn.buf = NULL;
+}
+
+/* ------------------- File descriptor implementation ------------------
+ * This target is used to write the RDB file to pipe, when the master just
+ * streams the data to the replicas without creating an RDB on-disk image
+ * (diskless replication option).
+ * It only implements writes. */
+
+/* Returns 1 or 0 for success/failure.
+ *
+ * When buf is NULL and len is 0, the function performs a flush operation
+ * if there is some pending buffer, so this function is also used in order
+ * to implement rioFdFlush(). */
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
+ ssize_t retval;
+ unsigned char *p = (unsigned char*) buf;
+ int doflush = (buf == NULL && len == 0);
+
+ /* For small writes, we rather keep the data in user-space buffer, and flush
+ * it only when it grows. however for larger writes, we prefer to flush
+ * any pre-existing buffer, and write the new one directly without reallocs
+ * and memory copying. */
+ if (len > PROTO_IOBUF_LEN) {
+ /* First, flush any pre-existing buffered data. */
+ if (sdslen(r->io.fd.buf)) {
+ if (rioFdWrite(r, NULL, 0) == 0)
+ return 0;
+ }
+ /* Write the new data, keeping 'p' and 'len' from the input. */
+ } else {
+ if (len) {
+ r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
+ if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
+ doflush = 1;
+ if (!doflush)
+ return 1;
+ }
+ /* Flushing the buffered data. set 'p' and 'len' accordingly. */
+ p = (unsigned char*) r->io.fd.buf;
+ len = sdslen(r->io.fd.buf);
+ }
+
+ size_t nwritten = 0;
+ while(nwritten != len) {
+ retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
+ if (retval <= 0) {
+ if (retval == -1 && errno == EINTR) continue;
+ /* With blocking io, which is the sole user of this
+ * rio target, EWOULDBLOCK is returned only because of
+ * the SO_SNDTIMEO socket option, so we translate the error
+ * into one more recognizable by the user. */
+ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0; /* error. */
+ }
+ nwritten += retval;
+ }
+
+ r->io.fd.pos += len;
+ sdsclear(r->io.fd.buf);
+ return 1;
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
+ UNUSED(r);
+ UNUSED(buf);
+ UNUSED(len);
+ return 0; /* Error, this target does not support reading. */
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.pos;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdWrite(r,NULL,0);
+}
+
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* flags */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithFd(rio *r, int fd) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.buf = sdsempty();
+}
+
+/* release the rio stream. */
+void rioFreeFd(rio *r) {
+ sdsfree(r->io.fd.buf);
+}
+
+/* ---------------------------- Generic functions ---------------------------- */
+
+/* This function can be installed both in memory and file streams when checksum
+ * computation is needed. */
+void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
+ r->cksum = crc64(r->cksum,buf,len);
+}
+
+/* Set the file-based rio object to auto-fsync every 'bytes' file written.
+ * By default this is set to zero that means no automatic file sync is
+ * performed.
+ *
+ * This feature is useful in a few contexts since when we rely on OS write
+ * buffers sometimes the OS buffers way too much, resulting in too many
+ * disk I/O concentrated in very little time. When we fsync in an explicit
+ * way instead the I/O pressure is more distributed across time. */
+void rioSetAutoSync(rio *r, off_t bytes) {
+ if(r->write != rioFileIO.write) return;
+ r->io.file.autosync = bytes;
+}
+
+/* Check the type of rio. */
+uint8_t rioCheckType(rio *r) {
+ if (r->read == rioFileRead) {
+ return RIO_TYPE_FILE;
+ } else if (r->read == rioBufferRead) {
+ return RIO_TYPE_BUFFER;
+ } else if (r->read == rioConnRead) {
+ return RIO_TYPE_CONN;
+ } else {
+ /* r->read == rioFdRead */
+ return RIO_TYPE_FD;
+ }
+}
+
+/* --------------------------- Higher level interface --------------------------
+ *
+ * The following higher level functions use lower level rio.c functions to help
+ * generating the Redis protocol for the Append Only File. */
+
+/* Write multi bulk count in the format: "*<count>\r\n". */
+size_t rioWriteBulkCount(rio *r, char prefix, long count) {
+ char cbuf[128];
+ int clen;
+
+ cbuf[0] = prefix;
+ clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
+ cbuf[clen++] = '\r';
+ cbuf[clen++] = '\n';
+ if (rioWrite(r,cbuf,clen) == 0) return 0;
+ return clen;
+}
+
+/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
+size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
+ size_t nwritten;
+
+ if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
+ if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
+ if (rioWrite(r,"\r\n",2) == 0) return 0;
+ return nwritten+len+2;
+}
+
+/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
+size_t rioWriteBulkLongLong(rio *r, long long l) {
+ char lbuf[32];
+ unsigned int llen;
+
+ llen = ll2string(lbuf,sizeof(lbuf),l);
+ return rioWriteBulkString(r,lbuf,llen);
+}
+
+/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
+size_t rioWriteBulkDouble(rio *r, double d) {
+ char dbuf[128];
+ unsigned int dlen;
+
+ dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
+ return rioWriteBulkString(r,dbuf,dlen);
+}