summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h')
-rw-r--r--src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h353
1 files changed, 353 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h b/src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h
new file mode 100644
index 000000000..f749cba54
--- /dev/null
+++ b/src/fluent-bit/lib/monkey/include/monkey/mk_scheduler.h
@@ -0,0 +1,353 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Monkey HTTP Server
+ * ==================
+ * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <monkey/mk_info.h>
+#include <monkey/mk_tls.h>
+
+#include <monkey/mk_core.h>
+#include <monkey/mk_server.h>
+#include <monkey/mk_stream.h>
+#include <monkey/mk_net.h>
+
+#ifndef MK_SCHEDULER_H
+#define MK_SCHEDULER_H
+
+#define MK_SCHED_CONN_TIMEOUT -1
+#define MK_SCHED_CONN_CLOSED -2
+
+#define MK_SCHED_SIGNAL_DEADBEEF 0xDEADBEEF
+#define MK_SCHED_SIGNAL_FREE_ALL 0xFFEE0000
+#define MK_SCHED_SIGNAL_EVENT_LOOP_BREAK 0xEEFFAACC
+
+#ifdef _WIN32
+ /* The pid field in the mk_sched_worker structure is ignored in platforms other than
+ * linux and mac os so it can be safely plugged in this meaningless way
+ */
+ typedef uint64_t pid_t;
+#endif
+
+/*
+ * Scheduler balancing mode:
+ *
+ * - Fair Balancing: use a single socket and upon accept
+ * new connections, lookup the less loaded thread and
+ * assign the socket to that specific epoll queue.
+ *
+ * - ReusePort: Use new Linux Kernel 3.9 feature that
+ * allows thread to share binded address on a lister
+ * socket. We let the Kernel to decide how to balance.
+ */
+#define MK_SCHEDULER_FAIR_BALANCING 0
+#define MK_SCHEDULER_REUSEPORT 1
+
+/*
+ * Thread-scope structure/variable that holds the Scheduler context for the
+ * worker (or thread) in question.
+ */
+struct mk_sched_worker
+{
+ /* The event loop on this scheduler thread */
+ struct mk_event_loop *loop;
+
+ unsigned long long accepted_connections;
+ unsigned long long closed_connections;
+ unsigned long long over_capacity;
+
+ /*
+ * The timeout queue represents client connections that
+ * have not initiated it requests or the request status
+ * is incomplete. This linear lists allows the scheduler
+ * to perform a fast check upon every timeout.
+ */
+ struct mk_list timeout_queue;
+
+ short int idx;
+ unsigned char initialized;
+
+ pthread_t tid;
+
+ pid_t pid;
+
+ /* store the memory page size (_SC_PAGESIZE) */
+ unsigned int mem_pagesize;
+
+ struct mk_http_session *request_handler;
+
+
+ struct mk_list event_free_queue;
+
+ /*
+ * This variable is used to signal the active workers,
+ * just available because of ULONG_MAX bug described
+ * on mk_scheduler.c .
+ */
+ int signal_channel_r;
+ int signal_channel_w;
+
+ /* If using REUSEPORT, this points to the list of listeners */
+ struct mk_list *listeners;
+
+ /*
+ * List head for finished requests that need to be cleared after each
+ * event loop round.
+ */
+ struct mk_list requests_done;
+
+ /* List of co-routine threads */
+ struct mk_list threads;
+ struct mk_list threads_purge;
+
+};
+
+
+/* Every connection in the server is represented by this structure */
+struct mk_sched_conn
+{
+ struct mk_event event; /* event loop context */
+ int status; /* connection status */
+ uint32_t properties;
+ char is_timeout_on; /* registered to timeout queue? */
+ time_t arrive_time; /* arrive time */
+ struct mk_sched_handler *protocol; /* protocol handler */
+ struct mk_server_listen *server_listen;
+ struct mk_plugin_network *net; /* I/O network layer */
+ struct mk_channel channel; /* stream channel */
+ struct mk_list timeout_head; /* link to the timeout queue */
+ void *data; /* optional ref for protocols */
+};
+
+/* Protocol capabilities */
+#define MK_SCHED_CONN_CAP(conn) conn->protocol->capabilities
+
+/* Connection properties */
+#define MK_SCHED_CONN_PROP(conn) conn->server_listen->listen->flags
+
+/*
+ * It defines a Handler for a connection in questions. This struct
+ * is used inside mk_sched_conn to define which protocol/handler
+ * needs to take care of every action.
+ */
+struct mk_sched_handler
+{
+ /*
+ * Protocol definition and callbacks:
+ *
+ * - name : the protocol handler name.
+ * - cb_read : callback triggered when there is data on the socket to read.
+ * - cb_close: callback triggered when the socket channel have been closed.
+ * - cb_done : callback triggered when the whole channel data have been
+ * written. This callback is optional. The return value of this
+ * callback indicate to the scheduler of the channel should be
+ * closed or not: -1 = close, 0 = leave it open and wait for more
+ * data.
+ */
+ const char *name;
+ int (*cb_read) (struct mk_sched_conn *, struct mk_sched_worker *,
+ struct mk_server *);
+ int (*cb_close) (struct mk_sched_conn *, struct mk_sched_worker *, int,
+ struct mk_server *);
+ int (*cb_done) (struct mk_sched_conn *, struct mk_sched_worker *,
+ struct mk_server *);
+ int (*cb_upgrade) (void *, void *, struct mk_server *);
+
+ /*
+ * This extra field is a small hack. The scheduler connection context
+ * contains information about the connection, and setting this field
+ * will let the scheduler allocate some extra memory bytes on the
+ * context memory reference:
+ *
+ * e.g:
+ *
+ * t_size = (sizeof(struct mk_sched_conn) + sched_extra_size);
+ * struct sched_conn *conn = malloc(t_size);
+ *
+ * This is useful for protocol or handlers where a socket connection
+ * represents one unique instance, the use case is HTTP, e.g:
+ *
+ * HTTP : 1 connection = 1 client (one request at a time)
+ * HTTP2: 1 connection = 1 client with multiple-framed requests
+ *
+ * The purpose is to avoid protocol handlers to perform more memory
+ * allocations and connection lookups the sched context is good enough
+ * to help on this, e.g:
+ *
+ * t_size = (sizeof(struct mk_sched_conn) + (sizeof(struct mk_http_session);
+ * conn = malloc(t_size);
+ */
+ int sched_extra_size;
+ char capabilities;
+};
+
+struct mk_sched_notif {
+ struct mk_event event;
+};
+
+/* Struct under thread context */
+struct mk_sched_thread_conf {
+ struct mk_server *server;
+};
+
+struct mk_sched_worker_cb {
+ void (*cb_func) (void *);
+ void *data;
+ struct mk_list _head;
+};
+
+/*
+ * All data required by the Scheduler interface is mapped inside this
+ * struct which is later linked into config->scheduler_ctx.
+ */
+struct mk_sched_ctx {
+ /* Array of sched_worker */
+ struct mk_sched_worker *workers;
+};
+
+
+struct mk_sched_worker *mk_sched_next_target(struct mk_server *server);
+int mk_sched_init(struct mk_server *server);
+int mk_sched_exit(struct mk_server *server);
+
+int mk_sched_launch_thread(struct mk_server *server, pthread_t *tout);
+
+void *mk_sched_launch_epoll_loop(void *thread_conf);
+struct mk_sched_worker *mk_sched_get_handler_owner(void);
+
+static inline struct rb_root *mk_sched_get_request_list()
+{
+ return MK_TLS_GET(mk_tls_sched_cs);
+}
+
+static inline struct mk_sched_worker *mk_sched_get_thread_conf()
+{
+ return MK_TLS_GET(mk_tls_sched_worker_node);
+}
+
+static inline struct mk_event_loop *mk_sched_loop()
+{
+ struct mk_sched_worker *w;
+
+ w = MK_TLS_GET(mk_tls_sched_worker_node);
+ return w->loop;
+}
+
+void mk_sched_update_thread_status(struct mk_sched_worker *sched,
+ int active, int closed);
+
+int mk_sched_drop_connection(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+int mk_sched_check_timeouts(struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+
+struct mk_sched_conn *mk_sched_add_connection(int remote_fd,
+ struct mk_server_listen *listener,
+ struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+int mk_sched_remove_client(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+struct mk_sched_conn *mk_sched_get_connection(struct mk_sched_worker
+ *sched, int remote_fd);
+int mk_sched_update_conn_status(struct mk_sched_worker *sched, int remote_fd,
+ int status);
+int mk_sched_sync_counters();
+void mk_sched_worker_free(struct mk_server *server);
+
+struct mk_sched_handler *mk_sched_handler_cap(char cap);
+
+/* Event handlers */
+int mk_sched_event_read(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+int mk_sched_event_write(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched,
+ struct mk_server *server);
+
+
+int mk_sched_event_close(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched,
+ int type, struct mk_server *server);
+
+void mk_sched_event_free(struct mk_event *event);
+
+
+static inline void mk_sched_event_free_all(struct mk_sched_worker *sched)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_event *event;
+
+ mk_list_foreach_safe(head, tmp, &sched->event_free_queue) {
+ event = mk_list_entry(head, struct mk_event, _head);
+ mk_list_del(&event->_head);
+ mk_mem_free(event);
+ }
+}
+
+static inline void mk_sched_conn_timeout_add(struct mk_sched_conn *conn,
+ struct mk_sched_worker *sched)
+{
+ if (conn->is_timeout_on == MK_FALSE) {
+ mk_list_add(&conn->timeout_head, &sched->timeout_queue);
+ conn->is_timeout_on = MK_TRUE;
+ }
+}
+
+static inline void mk_sched_conn_timeout_del(struct mk_sched_conn *conn)
+{
+ if (conn->is_timeout_on == MK_TRUE) {
+ mk_list_del(&conn->timeout_head);
+ conn->is_timeout_on = MK_FALSE;
+ }
+}
+
+
+#define mk_sched_conn_read(conn, buf, s) \
+ conn->net->read(conn->net->plugin, conn->event.fd, buf, s)
+#define mk_sched_conn_write(ch, buf, s) \
+ mk_net_conn_write(ch, buf, s)
+#define mk_sched_conn_writev(ch, iov) \
+ ch->io->writev(ch->io->plugin, ch->fd, iov)
+#define mk_sched_conn_sendfile(ch, f_fd, f_offs, f_count) \
+ ch->io->send_file(ch->io->plugin, ch->fd, f_fd, f_offs, f_count)
+
+#define mk_sched_switch_protocol(conn, cap) \
+ conn->protocol = mk_sched_handler_cap(cap)
+
+int mk_sched_worker_cb_add(struct mk_server *server,
+ void (*cb_func) (void *),
+ void *data);
+
+void mk_sched_worker_cb_free(struct mk_server *server);
+int mk_sched_send_signal(struct mk_sched_worker *worker, uint64_t val);
+int mk_sched_broadcast_signal(struct mk_server *server, uint64_t val);
+int mk_sched_workers_join(struct mk_server *server);
+int mk_sched_threads_purge(struct mk_sched_worker *sched);
+int mk_sched_threads_destroy_all(struct mk_sched_worker *sched);
+int mk_sched_threads_destroy_conn(struct mk_sched_worker *sched,
+ struct mk_sched_conn *conn);
+
+#endif