diff options
Diffstat (limited to '')
-rw-r--r-- | src/rio.c | 499 |
1 files changed, 499 insertions, 0 deletions
diff --git a/src/rio.c b/src/rio.c new file mode 100644 index 0000000..bcda376 --- /dev/null +++ b/src/rio.c @@ -0,0 +1,499 @@ +/* rio.c is a simple stream-oriented I/O abstraction that provides an interface + * to write code that can consume/produce data using different concrete input + * and output devices. For instance the same rdb.c code using the rio + * abstraction can be used to read and write the RDB format using in-memory + * buffers or files. + * + * A rio object provides the following methods: + * read: read from stream. + * write: write to stream. + * tell: get the current offset. + * + * It is also possible to set a 'checksum' method that is used by rio.c in order + * to compute a checksum of the data written or read, or to query the rio object + * for the current checksum. + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "fmacros.h" +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include "rio.h" +#include "util.h" +#include "crc64.h" +#include "config.h" +#include "server.h" + +/* ------------------------- Buffer I/O implementation ----------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { + r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); + r->io.buffer.pos += len; + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioBufferRead(rio *r, void *buf, size_t len) { + if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) + return 0; /* not enough buffer to return len bytes. */ + memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); + r->io.buffer.pos += len; + return 1; +} + +/* Returns read/write position in buffer. */ +static off_t rioBufferTell(rio *r) { + return r->io.buffer.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + UNUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + +static const rio rioBufferIO = { + rioBufferRead, + rioBufferWrite, + rioBufferTell, + rioBufferFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithBuffer(rio *r, sds s) { + *r = rioBufferIO; + r->io.buffer.ptr = s; + r->io.buffer.pos = 0; +} + +/* --------------------- Stdio file pointer implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFileWrite(rio *r, const void *buf, size_t len) { + if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp); + + size_t nwritten = 0; + /* Incrementally write data to the file, avoid a single write larger than + * the autosync threshold (so that the kernel's buffer cache never has too + * many dirty pages at once). */ + while (len != nwritten) { + serverAssert(r->io.file.autosync > r->io.file.buffered); + size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered); + size_t towrite = nalign > len-nwritten ? len-nwritten : nalign; + + if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0; + nwritten += towrite; + r->io.file.buffered += towrite; + + if (r->io.file.buffered >= r->io.file.autosync) { + fflush(r->io.file.fp); + + size_t processed = r->processed_bytes + nwritten; + serverAssert(processed % r->io.file.autosync == 0); + serverAssert(r->io.file.buffered == r->io.file.autosync); + +#if HAVE_SYNC_FILE_RANGE + /* Start writeout asynchronously. */ + if (sync_file_range(fileno(r->io.file.fp), + processed - r->io.file.autosync, r->io.file.autosync, + SYNC_FILE_RANGE_WRITE) == -1) + return 0; + + if (processed >= (size_t)r->io.file.autosync * 2) { + /* To keep the promise to 'autosync', we should make sure last + * asynchronous writeout persists into disk. This call may block + * if last writeout is not finished since disk is slow. */ + if (sync_file_range(fileno(r->io.file.fp), + processed - r->io.file.autosync*2, + r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE| + SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1) + return 0; + } +#else + if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0; +#endif + r->io.file.buffered = 0; + } + } + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFileRead(rio *r, void *buf, size_t len) { + return fread(buf,len,1,r->io.file.fp); +} + +/* Returns read/write position in file. */ +static off_t rioFileTell(rio *r) { + return ftello(r->io.file.fp); +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} + +static const rio rioFileIO = { + rioFileRead, + rioFileWrite, + rioFileTell, + rioFileFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFile(rio *r, FILE *fp) { + *r = rioFileIO; + r->io.file.fp = fp; + r->io.file.buffered = 0; + r->io.file.autosync = 0; +} + +/* ------------------- Connection implementation ------------------- + * We use this RIO implementation when reading an RDB file directly from + * the connection to the memory via rdbLoadRio(), thus this implementation + * only implements reading from a connection that is, normally, + * just a socket. */ + +static size_t rioConnWrite(rio *r, const void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not yet support writing. */ +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioConnRead(rio *r, void *buf, size_t len) { + size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos; + + /* If the buffer is too small for the entire request: realloc. */ + if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len) + r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf)); + + /* If the remaining unused buffer is not large enough: memmove so that we + * can read the rest. */ + if (len > avail && sdsavail(r->io.conn.buf) < len - avail) { + sdsrange(r->io.conn.buf, r->io.conn.pos, -1); + r->io.conn.pos = 0; + } + + /* Make sure the caller didn't request to read past the limit. + * If they didn't we'll buffer till the limit, if they did, we'll + * return an error. */ + if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) { + errno = EOVERFLOW; + return 0; + } + + /* If we don't already have all the data in the sds, read more */ + while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { + size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; + size_t needs = len - buffered; + /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of + * the two. */ + size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs; + if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf); + if (r->io.conn.read_limit != 0 && + r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit) + { + toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered; + } + int retval = connRead(r->io.conn.conn, + (char*)r->io.conn.buf + sdslen(r->io.conn.buf), + toread); + if (retval == 0) { + return 0; + } else if (retval < 0) { + if (connLastErrorRetryable(r->io.conn.conn)) continue; + if (errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; + } + sdsIncrLen(r->io.conn.buf, retval); + } + + memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len); + r->io.conn.read_so_far += len; + r->io.conn.pos += len; + return len; +} + +/* Returns read/write position in file. */ +static off_t rioConnTell(rio *r) { + return r->io.conn.read_so_far; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioConnFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioConnWrite(r,NULL,0); +} + +static const rio rioConnIO = { + rioConnRead, + rioConnWrite, + rioConnTell, + rioConnFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +/* Create an RIO that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit. */ +void rioInitWithConn(rio *r, connection *conn, size_t read_limit) { + *r = rioConnIO; + r->io.conn.conn = conn; + r->io.conn.pos = 0; + r->io.conn.read_limit = read_limit; + r->io.conn.read_so_far = 0; + r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(r->io.conn.buf); +} + +/* Release the RIO stream. Optionally returns the unread buffered data + * when the SDS pointer 'remaining' is passed. */ +void rioFreeConn(rio *r, sds *remaining) { + if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) { + if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1); + *remaining = r->io.conn.buf; + } else { + sdsfree(r->io.conn.buf); + if (remaining) *remaining = NULL; + } + r->io.conn.buf = NULL; +} + +/* ------------------- File descriptor implementation ------------------ + * This target is used to write the RDB file to pipe, when the master just + * streams the data to the replicas without creating an RDB on-disk image + * (diskless replication option). + * It only implements writes. */ + +/* Returns 1 or 0 for success/failure. + * + * When buf is NULL and len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdFlush(). */ +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { + ssize_t retval; + unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* For small writes, we rather keep the data in user-space buffer, and flush + * it only when it grows. however for larger writes, we prefer to flush + * any pre-existing buffer, and write the new one directly without reallocs + * and memory copying. */ + if (len > PROTO_IOBUF_LEN) { + /* First, flush any pre-existing buffered data. */ + if (sdslen(r->io.fd.buf)) { + if (rioFdWrite(r, NULL, 0) == 0) + return 0; + } + /* Write the new data, keeping 'p' and 'len' from the input. */ + } else { + if (len) { + r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); + if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) + doflush = 1; + if (!doflush) + return 1; + } + /* Flushing the buffered data. set 'p' and 'len' accordingly. */ + p = (unsigned char*) r->io.fd.buf; + len = sdslen(r->io.fd.buf); + } + + size_t nwritten = 0; + while(nwritten != len) { + retval = write(r->io.fd.fd,p+nwritten,len-nwritten); + if (retval <= 0) { + if (retval == -1 && errno == EINTR) continue; + /* With blocking io, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; /* error. */ + } + nwritten += retval; + } + + r->io.fd.pos += len; + sdsclear(r->io.fd.buf); + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdRead(rio *r, void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdTell(rio *r) { + return r->io.fd.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdWrite(r,NULL,0); +} + +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFd(rio *r, int fd) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.buf = sdsempty(); +} + +/* release the rio stream. */ +void rioFreeFd(rio *r) { + sdsfree(r->io.fd.buf); +} + +/* ---------------------------- Generic functions ---------------------------- */ + +/* This function can be installed both in memory and file streams when checksum + * computation is needed. */ +void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { + r->cksum = crc64(r->cksum,buf,len); +} + +/* Set the file-based rio object to auto-fsync every 'bytes' file written. + * By default this is set to zero that means no automatic file sync is + * performed. + * + * This feature is useful in a few contexts since when we rely on OS write + * buffers sometimes the OS buffers way too much, resulting in too many + * disk I/O concentrated in very little time. When we fsync in an explicit + * way instead the I/O pressure is more distributed across time. */ +void rioSetAutoSync(rio *r, off_t bytes) { + if(r->write != rioFileIO.write) return; + r->io.file.autosync = bytes; +} + +/* Check the type of rio. */ +uint8_t rioCheckType(rio *r) { + if (r->read == rioFileRead) { + return RIO_TYPE_FILE; + } else if (r->read == rioBufferRead) { + return RIO_TYPE_BUFFER; + } else if (r->read == rioConnRead) { + return RIO_TYPE_CONN; + } else { + /* r->read == rioFdRead */ + return RIO_TYPE_FD; + } +} + +/* --------------------------- Higher level interface -------------------------- + * + * The following higher level functions use lower level rio.c functions to help + * generating the Redis protocol for the Append Only File. */ + +/* Write multi bulk count in the format: "*<count>\r\n". */ +size_t rioWriteBulkCount(rio *r, char prefix, long count) { + char cbuf[128]; + int clen; + + cbuf[0] = prefix; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (rioWrite(r,cbuf,clen) == 0) return 0; + return clen; +} + +/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */ +size_t rioWriteBulkString(rio *r, const char *buf, size_t len) { + size_t nwritten; + + if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0; + if (len > 0 && rioWrite(r,buf,len) == 0) return 0; + if (rioWrite(r,"\r\n",2) == 0) return 0; + return nwritten+len+2; +} + +/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */ +size_t rioWriteBulkLongLong(rio *r, long long l) { + char lbuf[32]; + unsigned int llen; + + llen = ll2string(lbuf,sizeof(lbuf),l); + return rioWriteBulkString(r,lbuf,llen); +} + +/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */ +size_t rioWriteBulkDouble(rio *r, double d) { + char dbuf[128]; + unsigned int dlen; + + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + return rioWriteBulkString(r,dbuf,dlen); +} |