diff options
Diffstat (limited to 'src/fluent-bit/lib/monkey/mk_core/mk_event_kqueue.c')
-rw-r--r-- | src/fluent-bit/lib/monkey/mk_core/mk_event_kqueue.c | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/monkey/mk_core/mk_event_kqueue.c b/src/fluent-bit/lib/monkey/mk_core/mk_event_kqueue.c new file mode 100644 index 000000000..46b4f3524 --- /dev/null +++ b/src/fluent-bit/lib/monkey/mk_core/mk_event_kqueue.c @@ -0,0 +1,373 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Monkey HTTP Server + * ================== + * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#include <mk_core/mk_event.h> +#include <mk_core/mk_memory.h> +#include <mk_core/mk_utils.h> + +static inline int _mk_event_init() +{ + return 0; +} + +static inline void *_mk_event_loop_create(int size) +{ + struct mk_event_ctx *ctx; + + /* Main event context */ + ctx = mk_mem_alloc_z(sizeof(struct mk_event_ctx)); + if (!ctx) { + return NULL; + } + + /* Create the epoll instance */ + ctx->kfd = kqueue(); + if (ctx->kfd == -1) { + mk_libc_error("kqueue"); + mk_mem_free(ctx); + return NULL; + } + + /* Allocate space for events queue */ + ctx->events = mk_mem_alloc_z(sizeof(struct kevent) * size); + if (!ctx->events) { + close(ctx->kfd); + mk_mem_free(ctx); + return NULL; + } + ctx->queue_size = size; + return ctx; +} + +/* Close handlers and memory */ +static inline void _mk_event_loop_destroy(struct mk_event_ctx *ctx) +{ + close(ctx->kfd); + mk_mem_free(ctx->events); + mk_mem_free(ctx); +} + +static inline int _mk_event_add(struct mk_event_ctx *ctx, int fd, + int type, uint32_t events, void *data) +{ + int ret; + int set = MK_FALSE; + struct mk_event *event; + struct kevent ke = {0, 0, 0, 0, 0, 0}; + + mk_bug(ctx == NULL); + mk_bug(data == NULL); + + event = (struct mk_event *) data; + if (event->mask == MK_EVENT_EMPTY) { + event->fd = fd; + event->type = type; + event->status = MK_EVENT_REGISTERED; + } + else { + if (type != MK_EVENT_UNMODIFIED) { + event->type = type; + } + } + + /* Read flag */ + if ((event->mask ^ MK_EVENT_READ) && (events & MK_EVENT_READ)) { + EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, event); + set = MK_TRUE; + } + else if ((event->mask & MK_EVENT_READ) && (events ^ MK_EVENT_READ)) { + EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, event); + set = MK_TRUE; + } + + if (set == MK_TRUE) { + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + mk_libc_error("kevent"); + return ret; + } + } + + /* Write flag */ + set = MK_FALSE; + if ((event->mask ^ MK_EVENT_WRITE) && (events & MK_EVENT_WRITE)) { + EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, event); + set = MK_TRUE; + } + else if ((event->mask & MK_EVENT_WRITE) && (events ^ MK_EVENT_WRITE)) { + EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, event); + set = MK_TRUE; + } + + if (set == MK_TRUE) { + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + mk_libc_error("kevent"); + return ret; + } + } + + event->mask = events; + event->priority = MK_EVENT_PRIORITY_DEFAULT; + + /* Remove from priority queue */ + if (!mk_list_entry_is_orphan(&event->_priority_head)) { + mk_list_del(&event->_priority_head); + } + + return 0; +} + +static inline int _mk_event_del(struct mk_event_ctx *ctx, struct mk_event *event) +{ + int ret; + struct kevent ke = {0, 0, 0, 0, 0, 0}; + + mk_bug(ctx == NULL); + mk_bug(event == NULL); + + if (!MK_EVENT_IS_REGISTERED(event)) { + return 0; + } + + if (event->mask & MK_EVENT_READ) { + EV_SET(&ke, event->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + mk_libc_error("kevent"); + return ret; + } + } + + if (event->mask & MK_EVENT_WRITE) { + EV_SET(&ke, event->fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + mk_libc_error("kevent"); + return ret; + } + } + + /* Remove from priority queue */ + if (!mk_list_entry_is_orphan(&event->_priority_head)) { + mk_list_del(&event->_priority_head); + } + + MK_EVENT_NEW(event); + + return 0; +} + +static inline int _mk_event_timeout_create(struct mk_event_ctx *ctx, + time_t sec, long nsec, void *data) +{ + int fd; + int ret; + struct mk_event *event; + struct kevent ke; + + mk_bug(data == NULL); + + /* + * We just need a file descriptor number, we don't care from where it + * comes from. + */ + fd = open("/dev/null", 0); + if (fd == -1) { + mk_libc_error("open"); + return -1; + } + + event = data; + event->fd = fd; + event->status = MK_EVENT_REGISTERED; + event->type = MK_EVENT_NOTIFICATION; + event->mask = MK_EVENT_EMPTY; + + event->priority = MK_EVENT_PRIORITY_DEFAULT; + mk_list_entry_init(&event->_priority_head); + +#if defined(NOTE_SECONDS) && !defined(__APPLE__) + /* FreeBSD or LINUX_KQUEUE defined */ + /* TODO : high resolution interval support. */ + EV_SET(&ke, fd, EVFILT_TIMER, EV_ADD, NOTE_SECONDS, sec, event); +#else + /* Other BSD have no NOTE_SECONDS & specify milliseconds */ + /* Also, on macOS, NOTE_SECONDS has severe side effect that cause + * performance degradation. */ + EV_SET(&ke, fd, EVFILT_TIMER, EV_ADD, 0, (sec * 1000) + (nsec / 1000000) , event); +#endif + + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + close(fd); + mk_libc_error("kevent"); + return -1; + } + + /* + * FIXME: the timeout event is not triggered when using libkqueue, need + * to confirm how it behave on native OSX. + */ + event->mask = MK_EVENT_READ; + + return fd; +} + +static inline int _mk_event_timeout_destroy(struct mk_event_ctx *ctx, void *data) +{ + int ret; + struct mk_event *event; + struct kevent ke = {0, 0, 0, 0, 0, 0}; + + if (data == NULL) { + return 0; + } + + event = (struct mk_event *) data; + if (!MK_EVENT_IS_REGISTERED(event)) { + return 0; + } + EV_SET(&ke, event->fd, EVFILT_TIMER, EV_DELETE, 0,0, NULL); + + ret = kevent(ctx->kfd, &ke, 1, NULL, 0, NULL); + if (ret < 0) { + mk_libc_error("kevent"); + return ret; + } + + /* Remove from priority queue */ + if (!mk_list_entry_is_orphan(&event->_priority_head)) { + mk_list_del(&event->_priority_head); + } + + close(event->fd); + + MK_EVENT_NEW(event); + + return 0; +} + +static inline int _mk_event_channel_create(struct mk_event_ctx *ctx, + int *r_fd, int *w_fd, void *data) +{ + int ret; + int fd[2]; + struct mk_event *event; + + mk_bug(data == NULL); + + ret = pipe(fd); + if (ret < 0) { + mk_libc_error("pipe"); + return ret; + } + + event = data; + event->fd = fd[0]; + event->type = MK_EVENT_NOTIFICATION; + event->mask = MK_EVENT_EMPTY; + + ret = _mk_event_add(ctx, fd[0], + MK_EVENT_NOTIFICATION, MK_EVENT_READ, event); + if (ret != 0) { + close(fd[0]); + close(fd[1]); + return ret; + } + + *r_fd = fd[0]; + *w_fd = fd[1]; + + return 0; +} + +static inline int _mk_event_channel_destroy(struct mk_event_ctx *ctx, + int r_fd, int w_fd, void *data) +{ + struct mk_event *event; + int ret; + + + event = (struct mk_event *)data; + if (event->fd != r_fd) { + return -1; + } + + ret = _mk_event_del(ctx, event); + if (ret != 0) { + return ret; + } + + close(r_fd); + close(w_fd); + + return 0; +} + +static inline int _mk_event_inject(struct mk_event_loop *loop, + struct mk_event *event, + int mask, + int prevent_duplication) +{ + size_t index; + struct mk_event_ctx *ctx; + + ctx = loop->data; + + if (prevent_duplication) { + for (index = 0 ; index < loop->n_events ; index++) { + if (ctx->events[index].udata == event) { + return 0; + } + } + } + + event->mask = mask; + + ctx->events[loop->n_events].udata = event; + + loop->n_events++; + + return 0; +} + +static inline int _mk_event_wait_2(struct mk_event_loop *loop, int timeout) +{ + struct mk_event_ctx *ctx = loop->data; + + struct timespec timev = {timeout / 1000, (timeout % 1000) * 1000000}; + loop->n_events = kevent(ctx->kfd, NULL, 0, ctx->events, ctx->queue_size, + (timeout != -1) ? &timev : NULL); + return loop->n_events; +} + +static inline char *_mk_event_backend() +{ +#ifdef LINUX_KQUEUE + return "libkqueue"; +#else + return "kqueue"; +#endif +} |