diff options
Diffstat (limited to '')
-rw-r--r-- | src/ae.c | 512 |
1 files changed, 512 insertions, 0 deletions
diff --git a/src/ae.c b/src/ae.c new file mode 100644 index 0000000..b05cf56 --- /dev/null +++ b/src/ae.c @@ -0,0 +1,512 @@ +/* A simple event-driven programming library. Originally I wrote this code + * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated + * it in form of a library for easy reuse. + * + * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "ae.h" +#include "anet.h" +#include "redisassert.h" + +#include <stdio.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdlib.h> +#include <poll.h> +#include <string.h> +#include <time.h> +#include <errno.h> + +#include "zmalloc.h" +#include "config.h" + +/* Include the best multiplexing layer supported by this system. + * The following should be ordered by performances, descending. */ +#ifdef HAVE_EVPORT +#include "ae_evport.c" +#else + #ifdef HAVE_EPOLL + #include "ae_epoll.c" + #else + #ifdef HAVE_KQUEUE + #include "ae_kqueue.c" + #else + #include "ae_select.c" + #endif + #endif +#endif + + +aeEventLoop *aeCreateEventLoop(int setsize) { + aeEventLoop *eventLoop; + int i; + + monotonicInit(); /* just in case the calling app didn't initialize */ + + if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; + eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); + eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); + if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; + eventLoop->setsize = setsize; + eventLoop->timeEventHead = NULL; + eventLoop->timeEventNextId = 0; + eventLoop->stop = 0; + eventLoop->maxfd = -1; + eventLoop->beforesleep = NULL; + eventLoop->aftersleep = NULL; + eventLoop->flags = 0; + if (aeApiCreate(eventLoop) == -1) goto err; + /* Events with mask == AE_NONE are not set. So let's initialize the + * vector with it. */ + for (i = 0; i < setsize; i++) + eventLoop->events[i].mask = AE_NONE; + return eventLoop; + +err: + if (eventLoop) { + zfree(eventLoop->events); + zfree(eventLoop->fired); + zfree(eventLoop); + } + return NULL; +} + +/* Return the current set size. */ +int aeGetSetSize(aeEventLoop *eventLoop) { + return eventLoop->setsize; +} + +/* Tells the next iteration/s of the event processing to set timeout of 0. */ +void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { + if (noWait) + eventLoop->flags |= AE_DONT_WAIT; + else + eventLoop->flags &= ~AE_DONT_WAIT; +} + +/* Resize the maximum set size of the event loop. + * If the requested set size is smaller than the current set size, but + * there is already a file descriptor in use that is >= the requested + * set size minus one, AE_ERR is returned and the operation is not + * performed at all. + * + * Otherwise AE_OK is returned and the operation is successful. */ +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + int i; + + if (setsize == eventLoop->setsize) return AE_OK; + if (eventLoop->maxfd >= setsize) return AE_ERR; + if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR; + + eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize); + eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize); + eventLoop->setsize = setsize; + + /* Make sure that if we created new slots, they are initialized with + * an AE_NONE mask. */ + for (i = eventLoop->maxfd+1; i < setsize; i++) + eventLoop->events[i].mask = AE_NONE; + return AE_OK; +} + +void aeDeleteEventLoop(aeEventLoop *eventLoop) { + aeApiFree(eventLoop); + zfree(eventLoop->events); + zfree(eventLoop->fired); + + /* Free the time events list. */ + aeTimeEvent *next_te, *te = eventLoop->timeEventHead; + while (te) { + next_te = te->next; + zfree(te); + te = next_te; + } + zfree(eventLoop); +} + +void aeStop(aeEventLoop *eventLoop) { + eventLoop->stop = 1; +} + +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData) +{ + if (fd >= eventLoop->setsize) { + errno = ERANGE; + return AE_ERR; + } + aeFileEvent *fe = &eventLoop->events[fd]; + + if (aeApiAddEvent(eventLoop, fd, mask) == -1) + return AE_ERR; + fe->mask |= mask; + if (mask & AE_READABLE) fe->rfileProc = proc; + if (mask & AE_WRITABLE) fe->wfileProc = proc; + fe->clientData = clientData; + if (fd > eventLoop->maxfd) + eventLoop->maxfd = fd; + return AE_OK; +} + +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) +{ + if (fd >= eventLoop->setsize) return; + aeFileEvent *fe = &eventLoop->events[fd]; + if (fe->mask == AE_NONE) return; + + /* We want to always remove AE_BARRIER if set when AE_WRITABLE + * is removed. */ + if (mask & AE_WRITABLE) mask |= AE_BARRIER; + + aeApiDelEvent(eventLoop, fd, mask); + fe->mask = fe->mask & (~mask); + if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { + /* Update the max fd */ + int j; + + for (j = eventLoop->maxfd-1; j >= 0; j--) + if (eventLoop->events[j].mask != AE_NONE) break; + eventLoop->maxfd = j; + } +} + +void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) { + if (fd >= eventLoop->setsize) return NULL; + aeFileEvent *fe = &eventLoop->events[fd]; + if (fe->mask == AE_NONE) return NULL; + + return fe->clientData; +} + +int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { + if (fd >= eventLoop->setsize) return 0; + aeFileEvent *fe = &eventLoop->events[fd]; + + return fe->mask; +} + +long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, + aeTimeProc *proc, void *clientData, + aeEventFinalizerProc *finalizerProc) +{ + long long id = eventLoop->timeEventNextId++; + aeTimeEvent *te; + + te = zmalloc(sizeof(*te)); + if (te == NULL) return AE_ERR; + te->id = id; + te->when = getMonotonicUs() + milliseconds * 1000; + te->timeProc = proc; + te->finalizerProc = finalizerProc; + te->clientData = clientData; + te->prev = NULL; + te->next = eventLoop->timeEventHead; + te->refcount = 0; + if (te->next) + te->next->prev = te; + eventLoop->timeEventHead = te; + return id; +} + +int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) +{ + aeTimeEvent *te = eventLoop->timeEventHead; + while(te) { + if (te->id == id) { + te->id = AE_DELETED_EVENT_ID; + return AE_OK; + } + te = te->next; + } + return AE_ERR; /* NO event with the specified ID found */ +} + +/* How many microseconds until the first timer should fire. + * If there are no timers, -1 is returned. + * + * Note that's O(N) since time events are unsorted. + * Possible optimizations (not needed by Redis so far, but...): + * 1) Insert the event in order, so that the nearest is just the head. + * Much better but still insertion or deletion of timers is O(N). + * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). + */ +static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { + aeTimeEvent *te = eventLoop->timeEventHead; + if (te == NULL) return -1; + + aeTimeEvent *earliest = NULL; + while (te) { + if (!earliest || te->when < earliest->when) + earliest = te; + te = te->next; + } + + monotime now = getMonotonicUs(); + return (now >= earliest->when) ? 0 : earliest->when - now; +} + +/* Process time events */ +static int processTimeEvents(aeEventLoop *eventLoop) { + int processed = 0; + aeTimeEvent *te; + long long maxId; + + te = eventLoop->timeEventHead; + maxId = eventLoop->timeEventNextId-1; + monotime now = getMonotonicUs(); + while(te) { + long long id; + + /* Remove events scheduled for deletion. */ + if (te->id == AE_DELETED_EVENT_ID) { + aeTimeEvent *next = te->next; + /* If a reference exists for this timer event, + * don't free it. This is currently incremented + * for recursive timerProc calls */ + if (te->refcount) { + te = next; + continue; + } + if (te->prev) + te->prev->next = te->next; + else + eventLoop->timeEventHead = te->next; + if (te->next) + te->next->prev = te->prev; + if (te->finalizerProc) { + te->finalizerProc(eventLoop, te->clientData); + now = getMonotonicUs(); + } + zfree(te); + te = next; + continue; + } + + /* Make sure we don't process time events created by time events in + * this iteration. Note that this check is currently useless: we always + * add new timers on the head, however if we change the implementation + * detail, this check may be useful again: we keep it here for future + * defense. */ + if (te->id > maxId) { + te = te->next; + continue; + } + + if (te->when <= now) { + int retval; + + id = te->id; + te->refcount++; + retval = te->timeProc(eventLoop, id, te->clientData); + te->refcount--; + processed++; + now = getMonotonicUs(); + if (retval != AE_NOMORE) { + te->when = now + retval * 1000; + } else { + te->id = AE_DELETED_EVENT_ID; + } + } + te = te->next; + } + return processed; +} + +/* Process every pending time event, then every pending file event + * (that may be registered by time event callbacks just processed). + * Without special flags the function sleeps until some file event + * fires, or when the next time event occurs (if any). + * + * If flags is 0, the function does nothing and returns. + * if flags has AE_ALL_EVENTS set, all the kind of events are processed. + * if flags has AE_FILE_EVENTS set, file events are processed. + * if flags has AE_TIME_EVENTS set, time events are processed. + * if flags has AE_DONT_WAIT set, the function returns ASAP once all + * the events that can be handled without a wait are processed. + * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. + * + * The function returns the number of events processed. */ +int aeProcessEvents(aeEventLoop *eventLoop, int flags) +{ + int processed = 0, numevents; + + /* Nothing to do? return ASAP */ + if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; + + /* Note that we want to call select() even if there are no + * file events to process as long as we want to process time + * events, in order to sleep until the next time event is ready + * to fire. */ + if (eventLoop->maxfd != -1 || + ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { + int j; + struct timeval tv, *tvp; + int64_t usUntilTimer = -1; + + if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) + usUntilTimer = usUntilEarliestTimer(eventLoop); + + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } else { + /* If we have to check for events but need to return + * ASAP because of AE_DONT_WAIT we need to set the timeout + * to zero */ + if (flags & AE_DONT_WAIT) { + tv.tv_sec = tv.tv_usec = 0; + tvp = &tv; + } else { + /* Otherwise we can block */ + tvp = NULL; /* wait forever */ + } + } + + if (eventLoop->flags & AE_DONT_WAIT) { + tv.tv_sec = tv.tv_usec = 0; + tvp = &tv; + } + + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) + eventLoop->beforesleep(eventLoop); + + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); + + /* After sleep callback. */ + if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) + eventLoop->aftersleep(eventLoop); + + for (j = 0; j < numevents; j++) { + int fd = eventLoop->fired[j].fd; + aeFileEvent *fe = &eventLoop->events[fd]; + int mask = eventLoop->fired[j].mask; + int fired = 0; /* Number of events fired for current fd. */ + + /* Normally we execute the readable event first, and the writable + * event later. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if AE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsyncing a file to disk, + * before replying to a client. */ + int invert = fe->mask & AE_BARRIER; + + /* Note the "fe->mask & mask & ..." code: maybe an already + * processed event removed an element that fired and we still + * didn't processed, so we check if the event is still valid. + * + * Fire the readable event if the call sequence is not + * inverted. */ + if (!invert && fe->mask & mask & AE_READABLE) { + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ + } + + /* Fire the writable event. */ + if (fe->mask & mask & AE_WRITABLE) { + if (!fired || fe->wfileProc != fe->rfileProc) { + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } + } + + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert) { + fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ + if ((fe->mask & mask & AE_READABLE) && + (!fired || fe->wfileProc != fe->rfileProc)) + { + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } + } + + processed++; + } + } + /* Check time events */ + if (flags & AE_TIME_EVENTS) + processed += processTimeEvents(eventLoop); + + return processed; /* return the number of processed file/time events */ +} + +/* Wait for milliseconds until the given file descriptor becomes + * writable/readable/exception */ +int aeWait(int fd, int mask, long long milliseconds) { + struct pollfd pfd; + int retmask = 0, retval; + + memset(&pfd, 0, sizeof(pfd)); + pfd.fd = fd; + if (mask & AE_READABLE) pfd.events |= POLLIN; + if (mask & AE_WRITABLE) pfd.events |= POLLOUT; + + if ((retval = poll(&pfd, 1, milliseconds))== 1) { + if (pfd.revents & POLLIN) retmask |= AE_READABLE; + if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; + return retmask; + } else { + return retval; + } +} + +void aeMain(aeEventLoop *eventLoop) { + eventLoop->stop = 0; + while (!eventLoop->stop) { + aeProcessEvents(eventLoop, AE_ALL_EVENTS| + AE_CALL_BEFORE_SLEEP| + AE_CALL_AFTER_SLEEP); + } +} + +char *aeGetApiName(void) { + return aeApiName(); +} + +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { + eventLoop->beforesleep = beforesleep; +} + +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) { + eventLoop->aftersleep = aftersleep; +} |