diff options
Diffstat (limited to 'src/bio.c')
-rw-r--r-- | src/bio.c | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/src/bio.c b/src/bio.c new file mode 100644 index 0000000..9242e51 --- /dev/null +++ b/src/bio.c @@ -0,0 +1,318 @@ +/* Background I/O service for Redis. + * + * This file implements operations that we need to perform in the background. + * Currently there is only a single operation, that is a background close(2) + * system call. This is needed as when the process is the last owner of a + * reference to a file closing it means unlinking it, and the deletion of the + * file is slow, blocking the server. + * + * In the future we'll either continue implementing new things we need or + * we'll switch to libeio. However there are probably long term uses for this + * file as we may want to put here Redis specific background tasks (for instance + * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL + * implementation). + * + * DESIGN + * ------ + * + * The design is trivial, we have a structure representing a job to perform + * and a different thread and job queue for every job type. + * Every thread waits for new jobs in its queue, and process every job + * sequentially. + * + * Jobs of the same type are guaranteed to be processed from the least + * recently inserted to the most recently inserted (older jobs processed + * first). + * + * Currently there is no way for the creator of the job to be notified about + * the completion of the operation, this will only be added when/if needed. + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "server.h" +#include "bio.h" + +static pthread_t bio_threads[BIO_NUM_OPS]; +static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; +static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; +static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; +static list *bio_jobs[BIO_NUM_OPS]; +/* The following array is used to hold the number of pending jobs for every + * OP type. This allows us to export the bioPendingJobsOfType() API that is + * useful when the main thread wants to perform some operation that may involve + * objects shared with the background thread. The main thread will just wait + * that there are no longer jobs of this type to be executed before performing + * the sensible operation. This data is also useful for reporting. */ +static unsigned long long bio_pending[BIO_NUM_OPS]; + +/* This structure represents a background Job. It is only used locally to this + * file as the API does not expose the internals at all. */ +typedef union bio_job { + /* Job specific arguments.*/ + struct { + int fd; /* Fd for file based background jobs */ + unsigned need_fsync:1; /* A flag to indicate that a fsync is required before + * the file is closed. */ + } fd_args; + + struct { + lazy_free_fn *free_fn; /* Function that will free the provided arguments */ + void *free_args[]; /* List of arguments to be passed to the free function */ + } free_args; +} bio_job; + +void *bioProcessBackgroundJobs(void *arg); + +/* Make sure we have enough stack to perform all the things we do in the + * main thread. */ +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) + +/* Initialize the background system, spawning the thread. */ +void bioInit(void) { + pthread_attr_t attr; + pthread_t thread; + size_t stacksize; + int j; + + /* Initialization of state vars and objects */ + for (j = 0; j < BIO_NUM_OPS; j++) { + pthread_mutex_init(&bio_mutex[j],NULL); + pthread_cond_init(&bio_newjob_cond[j],NULL); + pthread_cond_init(&bio_step_cond[j],NULL); + bio_jobs[j] = listCreate(); + bio_pending[j] = 0; + } + + /* Set the stack size as by default it may be small in some system */ + pthread_attr_init(&attr); + pthread_attr_getstacksize(&attr,&stacksize); + if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ + while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; + pthread_attr_setstacksize(&attr, stacksize); + + /* Ready to spawn our threads. We use the single argument the thread + * function accepts in order to pass the job ID the thread is + * responsible of. */ + for (j = 0; j < BIO_NUM_OPS; j++) { + void *arg = (void*)(unsigned long) j; + if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { + serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); + exit(1); + } + bio_threads[j] = thread; + } +} + +void bioSubmitJob(int type, bio_job *job) { + pthread_mutex_lock(&bio_mutex[type]); + listAddNodeTail(bio_jobs[type],job); + bio_pending[type]++; + pthread_cond_signal(&bio_newjob_cond[type]); + pthread_mutex_unlock(&bio_mutex[type]); +} + +void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { + va_list valist; + /* Allocate memory for the job structure and all required + * arguments */ + bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); + job->free_args.free_fn = free_fn; + + va_start(valist, arg_count); + for (int i = 0; i < arg_count; i++) { + job->free_args.free_args[i] = va_arg(valist, void *); + } + va_end(valist); + bioSubmitJob(BIO_LAZY_FREE, job); +} + +void bioCreateCloseJob(int fd, int need_fsync) { + bio_job *job = zmalloc(sizeof(*job)); + job->fd_args.fd = fd; + job->fd_args.need_fsync = need_fsync; + + bioSubmitJob(BIO_CLOSE_FILE, job); +} + +void bioCreateFsyncJob(int fd) { + bio_job *job = zmalloc(sizeof(*job)); + job->fd_args.fd = fd; + + bioSubmitJob(BIO_AOF_FSYNC, job); +} + +void *bioProcessBackgroundJobs(void *arg) { + bio_job *job; + unsigned long type = (unsigned long) arg; + sigset_t sigset; + + /* Check that the type is within the right interval. */ + if (type >= BIO_NUM_OPS) { + serverLog(LL_WARNING, + "Warning: bio thread started with wrong type %lu",type); + return NULL; + } + + switch (type) { + case BIO_CLOSE_FILE: + redis_set_thread_title("bio_close_file"); + break; + case BIO_AOF_FSYNC: + redis_set_thread_title("bio_aof_fsync"); + break; + case BIO_LAZY_FREE: + redis_set_thread_title("bio_lazy_free"); + break; + } + + redisSetCpuAffinity(server.bio_cpulist); + + makeThreadKillable(); + + pthread_mutex_lock(&bio_mutex[type]); + /* Block SIGALRM so we are sure that only the main thread will + * receive the watchdog signal. */ + sigemptyset(&sigset); + sigaddset(&sigset, SIGALRM); + if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) + serverLog(LL_WARNING, + "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); + + while(1) { + listNode *ln; + + /* The loop always starts with the lock hold. */ + if (listLength(bio_jobs[type]) == 0) { + pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); + continue; + } + /* Pop the job from the queue. */ + ln = listFirst(bio_jobs[type]); + job = ln->value; + /* It is now possible to unlock the background system as we know have + * a stand alone job structure to process.*/ + pthread_mutex_unlock(&bio_mutex[type]); + + /* Process the job accordingly to its type. */ + if (type == BIO_CLOSE_FILE) { + if (job->fd_args.need_fsync) { + redis_fsync(job->fd_args.fd); + } + close(job->fd_args.fd); + } else if (type == BIO_AOF_FSYNC) { + /* The fd may be closed by main thread and reused for another + * socket, pipe, or file. We just ignore these errno because + * aof fsync did not really fail. */ + if (redis_fsync(job->fd_args.fd) == -1 && + errno != EBADF && errno != EINVAL) + { + int last_status; + atomicGet(server.aof_bio_fsync_status,last_status); + atomicSet(server.aof_bio_fsync_status,C_ERR); + atomicSet(server.aof_bio_fsync_errno,errno); + if (last_status == C_OK) { + serverLog(LL_WARNING, + "Fail to fsync the AOF file: %s",strerror(errno)); + } + } else { + atomicSet(server.aof_bio_fsync_status,C_OK); + } + } else if (type == BIO_LAZY_FREE) { + job->free_args.free_fn(job->free_args.free_args); + } else { + serverPanic("Wrong job type in bioProcessBackgroundJobs()."); + } + zfree(job); + + /* Lock again before reiterating the loop, if there are no longer + * jobs to process we'll block again in pthread_cond_wait(). */ + pthread_mutex_lock(&bio_mutex[type]); + listDelNode(bio_jobs[type],ln); + bio_pending[type]--; + + /* Unblock threads blocked on bioWaitStepOfType() if any. */ + pthread_cond_broadcast(&bio_step_cond[type]); + } +} + +/* Return the number of pending jobs of the specified type. */ +unsigned long long bioPendingJobsOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex[type]); + val = bio_pending[type]; + pthread_mutex_unlock(&bio_mutex[type]); + return val; +} + +/* If there are pending jobs for the specified type, the function blocks + * and waits that the next job was processed. Otherwise the function + * does not block and returns ASAP. + * + * The function returns the number of jobs still to process of the + * requested type. + * + * This function is useful when from another thread, we want to wait + * a bio.c thread to do more work in a blocking way. + */ +unsigned long long bioWaitStepOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex[type]); + val = bio_pending[type]; + if (val != 0) { + pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); + val = bio_pending[type]; + } + pthread_mutex_unlock(&bio_mutex[type]); + return val; +} + +/* Kill the running bio threads in an unclean way. This function should be + * used only when it's critical to stop the threads for some reason. + * Currently Redis does this only on crash (for instance on SIGSEGV) in order + * to perform a fast memory check without other threads messing with memory. */ +void bioKillThreads(void) { + int err, j; + + for (j = 0; j < BIO_NUM_OPS; j++) { + if (bio_threads[j] == pthread_self()) continue; + if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) { + if ((err = pthread_join(bio_threads[j],NULL)) != 0) { + serverLog(LL_WARNING, + "Bio thread for job type #%d can not be joined: %s", + j, strerror(err)); + } else { + serverLog(LL_WARNING, + "Bio thread for job type #%d terminated",j); + } + } + } +} |