summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/monkey/mk_core/mk_event_select.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/monkey/mk_core/mk_event_select.c')
-rw-r--r--fluent-bit/lib/monkey/mk_core/mk_event_select.c439
1 files changed, 439 insertions, 0 deletions
diff --git a/fluent-bit/lib/monkey/mk_core/mk_event_select.c b/fluent-bit/lib/monkey/mk_core/mk_event_select.c
new file mode 100644
index 000000000..e326ba85b
--- /dev/null
+++ b/fluent-bit/lib/monkey/mk_core/mk_event_select.c
@@ -0,0 +1,439 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Monkey HTTP Server
+ * ==================
+ * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <Winsock2.h>
+#else
+#include <sys/select.h>
+#endif
+
+#include <mk_core/mk_event.h>
+#include <time.h>
+
+/* I could be wrong but i think the whole way in which
+ * this module handles the event array seems to be flawed
+ * because it's addressed by the file descriptor, which in
+ * some cases could cause an out of bounds write of a non
+ * controllable QWORD (ie. line 117)
+ *
+ * I'll leave it for the moment but I think we might want
+ * to come back to it and refactor it to just use the event
+ * array as a list (at most implementing a hash table like
+ * addressing mechanism to make it faster)
+ */
+
+struct fd_timer {
+ int fd;
+ int run;
+ time_t sec;
+ long nsec;
+ pthread_t tid;
+};
+
+static inline int _mk_event_init()
+{
+ return 0;
+}
+
+static inline void *_mk_event_loop_create(int size)
+{
+ struct mk_event_ctx *ctx;
+
+ /* Override caller 'size', we always use FD_SETSIZE */
+ size = FD_SETSIZE;
+
+ /* Main event context */
+ ctx = mk_mem_alloc_z(sizeof(struct mk_event_ctx));
+ if (!ctx) {
+ return NULL;
+ }
+
+ FD_ZERO(&ctx->rfds);
+ FD_ZERO(&ctx->wfds);
+
+ /* Allocate space for events queue, re-use the struct mk_event */
+ ctx->events = mk_mem_alloc_z(sizeof(struct mk_event *) * size);
+ if (!ctx->events) {
+ mk_mem_free(ctx);
+ return NULL;
+ }
+
+ /* Fired events (upon select(2) return) */
+ ctx->fired = mk_mem_alloc_z(sizeof(struct mk_event) * size);
+ if (!ctx->fired) {
+ mk_mem_free(ctx->events);
+ mk_mem_free(ctx);
+ return NULL;
+ }
+ ctx->queue_size = size;
+
+ return ctx;
+}
+
+/* Close handlers and memory */
+static inline void _mk_event_loop_destroy(struct mk_event_ctx *ctx)
+{
+ mk_mem_free(ctx->events);
+ mk_mem_free(ctx->fired);
+ mk_mem_free(ctx);
+}
+
+/* Add the file descriptor to the arrays */
+static inline int _mk_event_add(struct mk_event_ctx *ctx, int fd,
+ int type, uint32_t events, void *data)
+{
+ struct mk_event *event;
+
+ mk_bug(ctx == NULL);
+ mk_bug(data == NULL);
+
+ if (fd > FD_SETSIZE) {
+ return -1;
+ }
+
+ if (events & MK_EVENT_READ) {
+ FD_SET(fd, &ctx->rfds);
+ }
+ if (events & MK_EVENT_WRITE) {
+ FD_SET(fd, &ctx->wfds);
+ }
+
+ event = (struct mk_event *) data;
+ event->fd = fd;
+ event->mask = events;
+ event->status = MK_EVENT_REGISTERED;
+
+ event->priority = MK_EVENT_PRIORITY_DEFAULT;
+
+ /* Remove from priority queue */
+ if (!mk_list_entry_is_orphan(&event->_priority_head)) {
+ mk_list_del(&event->_priority_head);
+ }
+
+ if (type != MK_EVENT_UNMODIFIED) {
+ event->type = type;
+ }
+
+ ctx->events[fd] = event;
+ if (fd > ctx->max_fd) {
+ ctx->max_fd = fd;
+ }
+
+ return 0;
+}
+
+/* Delete an event */
+static inline int _mk_event_del(struct mk_event_ctx *ctx, struct mk_event *event)
+{
+ int i;
+ int fd;
+ struct mk_event *s_event;
+
+ mk_bug(ctx == NULL);
+ mk_bug(event == NULL);
+
+ if (!MK_EVENT_IS_REGISTERED(event)) {
+ return 0;
+ }
+
+ fd = event->fd;
+
+ if (event->mask & MK_EVENT_READ) {
+ FD_CLR(event->fd, &ctx->rfds);
+ }
+
+ if (event->mask & MK_EVENT_WRITE) {
+ FD_CLR(event->fd, &ctx->wfds);
+ }
+
+ /* Update max_fd, lookup */
+ if (event->fd == ctx->max_fd) {
+ for (i = (ctx->max_fd - 1); i > 0; i--) {
+ if (!ctx->events[i]) {
+ continue;
+ }
+
+ s_event = ctx->events[i];
+ if (s_event->mask != MK_EVENT_EMPTY) {
+ break;
+ }
+ }
+ ctx->max_fd = i;
+ }
+
+ ctx->events[fd] = NULL;
+
+ /* Remove from priority queue */
+ if (!mk_list_entry_is_orphan(&event->_priority_head)) {
+ mk_list_del(&event->_priority_head);
+ }
+
+ MK_EVENT_NEW(event);
+
+ return 0;
+}
+
+/*
+ * Timeout worker, it writes a byte every certain amount of seconds, it finish
+ * once the other end of the pipe closes the fd[0].
+ */
+void _timeout_worker(void *arg)
+{
+ int ret;
+ uint64_t val = 1;
+ struct fd_timer *timer;
+ struct timespec t_spec;
+
+ timer = (struct fd_timer *) arg;
+ t_spec.tv_sec = timer->sec;
+ t_spec.tv_nsec = timer->nsec;
+
+ while (timer->run == MK_TRUE) {
+ /* sleep for a while */
+ nanosleep(&t_spec, NULL);
+
+ /* send notification */
+ ret = write(timer->fd, &val, sizeof(uint64_t));
+ if (ret == -1) {
+ perror("write");
+ break;
+ }
+ }
+
+ pthread_exit(NULL);
+}
+
+/*
+ * This routine creates a timer, since this select(2) backend aims to be used
+ * in very old systems to be compatible, we cannot trust timerfd_create(2)
+ * will be available (e.g: Cygwin), so our workaround is to create a pipe(2)
+ * and a thread, this thread writes a byte upon the expiration time is reached.
+ */
+static inline int _mk_event_timeout_create(struct mk_event_ctx *ctx,
+ time_t sec, long nsec, void *data)
+{
+ int ret;
+ int fd[2];
+ struct mk_event *event;
+ struct fd_timer *timer;
+
+ timer = mk_mem_alloc(sizeof(struct fd_timer));
+ if (!timer) {
+ return -1;
+ }
+
+ ret = pipe(fd);
+ if (ret < 0) {
+ mk_mem_free(timer);
+ mk_libc_error("pipe");
+ return ret;
+ }
+
+ event = (struct mk_event *) data;
+ event->fd = fd[0];
+ event->type = MK_EVENT_NOTIFICATION;
+ event->mask = MK_EVENT_EMPTY;
+
+ _mk_event_add(ctx, fd[0], MK_EVENT_NOTIFICATION, MK_EVENT_READ, data);
+ event->mask = MK_EVENT_READ;
+
+ /* Compose the timer context, this is released inside the worker thread */
+ timer->fd = fd[1];
+ timer->sec = sec;
+ timer->nsec = nsec;
+ timer->run = MK_TRUE;
+
+ event->data = timer;
+
+ /* Now the dirty workaround, create a thread */
+ ret = mk_utils_worker_spawn(_timeout_worker, timer, &timer->tid);
+ if (ret < 0) {
+ close(fd[0]);
+ close(fd[1]);
+ mk_mem_free(timer);
+ return -1;
+ }
+
+ return fd[0];
+}
+
+static inline int _mk_event_timeout_destroy(struct mk_event_ctx *ctx, void *data)
+{
+ int fd;
+ struct mk_event *event;
+ struct fd_timer *timer;
+
+ event = (struct mk_event *) data;
+
+ fd = event->fd;
+ _mk_event_del(ctx, event);
+
+ timer = event->data;
+ timer->run = MK_FALSE;
+
+ /* Wait for the background worker to finish */
+ pthread_join(timer->tid, NULL);
+
+ /* Cleanup */
+ close(timer->fd);
+ close(fd);
+ mk_mem_free(timer);
+ return 0;
+}
+
+static inline int _mk_event_channel_create(struct mk_event_ctx *ctx,
+ int *r_fd, int *w_fd, void *data)
+{
+ int ret;
+ int fd[2];
+ struct mk_event *event;
+
+ mk_bug(data == NULL);
+
+ ret = pipe(fd);
+ if (ret < 0) {
+ mk_libc_error("pipe");
+ return ret;
+ }
+
+ event = data;
+ event->fd = fd[0];
+ event->type = MK_EVENT_NOTIFICATION;
+ event->mask = MK_EVENT_EMPTY;
+
+ ret = _mk_event_add(ctx, fd[0],
+ MK_EVENT_NOTIFICATION, MK_EVENT_READ, event);
+ if (ret != 0) {
+ close(fd[0]);
+ close(fd[1]);
+ return ret;
+ }
+ event->mask = MK_EVENT_READ;
+
+ *r_fd = fd[0];
+ *w_fd = fd[1];
+
+ return 0;
+}
+
+static inline int _mk_event_channel_destroy(struct mk_event_ctx *ctx,
+ int r_fd, int w_fd, void *data)
+{
+ struct mk_event *event;
+ int ret;
+
+
+ event = (struct mk_event *)data;
+ if (event->fd != r_fd) {
+ return -1;
+ }
+
+ ret = _mk_event_del(ctx, event);
+ if (ret != 0) {
+ return ret;
+ }
+
+ close(r_fd);
+ close(w_fd);
+
+ return 0;
+}
+
+static inline int _mk_event_inject(struct mk_event_loop *loop,
+ struct mk_event *event,
+ int mask,
+ int prevent_duplication)
+{
+ size_t index;
+ struct mk_event_ctx *ctx;
+
+ ctx = loop->data;
+
+ if (prevent_duplication) {
+ for (index = 0 ; index < loop->n_events ; index++) {
+ if (ctx->fired[index]->fd == event->fd) {
+ return 0;
+ }
+ }
+ }
+
+ event->mask = mask;
+
+ ctx->fired[loop->n_events] = event;
+
+ loop->n_events++;
+
+ return 0;
+}
+
+static inline int _mk_event_wait_2(struct mk_event_loop *loop, int timeout)
+{
+ int i;
+ int f = 0;
+ uint32_t mask;
+ struct mk_event *fired;
+ struct mk_event_ctx *ctx = loop->data;
+ struct timeval timev = {timeout / 1000, (timeout % 1000) * 1000};
+
+ memcpy(&ctx->_rfds, &ctx->rfds, sizeof(fd_set));
+ memcpy(&ctx->_wfds, &ctx->wfds, sizeof(fd_set));
+
+ loop->n_events = select(ctx->max_fd + 1, &ctx->_rfds, &ctx->_wfds, NULL,
+ (timeout != -1) ? &timev : NULL);
+ if (loop->n_events <= 0) {
+ return loop->n_events;
+ }
+
+ /*
+ * Populate our events array with the data reported. In other backends such
+ * as mk_event_epoll and mk_event_kqueue this is done when iterating the
+ * results as their native implementation already provided an array ready
+ * for processing.
+ */
+ for (i = 0; i <= ctx->max_fd; i++) {
+ /* skip empty references */
+ if (!ctx->events[i]) {
+ continue;
+ }
+
+ mask = 0;
+ if (FD_ISSET(i, &ctx->_rfds)) {
+ mask |= MK_EVENT_READ;
+ }
+ if (FD_ISSET(i, &ctx->_wfds)) {
+ mask |= MK_EVENT_WRITE;
+ }
+
+ if (mask) {
+ fired = &ctx->fired[f];
+ fired->fd = i;
+ fired->mask = mask;
+ fired->data = ctx->events[i];
+ f++;
+ }
+ }
+
+ loop->n_events = f;
+ return loop->n_events;
+}
+
+static inline char *_mk_event_backend()
+{
+ return "select";
+}