diff options
Diffstat (limited to 'src/fluent-bit/lib/monkey/mk_core/mk_thread.c')
-rw-r--r-- | src/fluent-bit/lib/monkey/mk_core/mk_thread.c | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/monkey/mk_core/mk_thread.c b/src/fluent-bit/lib/monkey/mk_core/mk_thread.c new file mode 100644 index 000000000..9b5589f36 --- /dev/null +++ b/src/fluent-bit/lib/monkey/mk_core/mk_thread.c @@ -0,0 +1,347 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Monkey HTTP Server (Duda I/O) + * ----------------------------- + * Copyright 2017 Eduardo Silva <eduardo@monkey.io> + * Copyright 2014, Zeying Xie <swpdtz at gmail dot com> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> +#include <string.h> + +#if defined (__APPLE__) +#include <sys/ucontext.h> +#else +#include <ucontext.h> +#endif + +#include <limits.h> + +#include <mk_core/mk_pthread.h> +#include <mk_core/mk_memory.h> +#include <mk_core/mk_thread.h> + +/* + * @OBJ_NAME: dthread + * @OBJ_MENU: Dthread + * @OBJ_DESC: The dthread object provides a set of methods to handle user space cooperative thread, namely dthread(duda thread). + * A dthread can be suspended when it encounters something that will block(in other + * words, something will be available in the future), while another dthread that + * is ready to run is awakened. Back and forth, all dthreads within the same pthread + * work collaboratively. This means dthread is non-preemptive and requires the user + * to explicitly give up control when necessary. + * Dthreads communicate with each other by using channel, a channel is like a pipe, + * one dthread feeds data to the channel while another cosumes from it. + * + */ + +#ifdef USE_VALGRIND +#include <valgrind/valgrind.h> +#endif + +#define MK_THREAD_STACK_SIZE (3 * (PTHREAD_STACK_MIN) / 2) +#define DEFAULT_MK_THREAD_NUM 16 + +struct mk_thread { + mk_thread_func func; + void *data; + ucontext_t context; + struct mk_thread_scheduler *sch; + int status; + int parent_id; +#ifdef USE_VALGRIND + unsigned int valgrind_stack_id; +#endif + struct mk_list chan_list; + char stack[MK_THREAD_STACK_SIZE]; +} mk_thread_t; + +struct mk_thread_scheduler { + ucontext_t main; + int n_dthread; + int cap; + int running_id; + struct mk_thread **dt; +}; + +static void _mk_thread_release(struct mk_thread *dt); + +static void _mk_thread_entry_point(struct mk_thread_scheduler *sch) +{ + int id; + struct mk_thread *dt; + struct mk_list *head; + struct mk_thread_channel *chan; + + assert(sch); + id = sch->running_id; + dt = sch->dt[id]; + dt->func(dt->data); + dt->status = MK_THREAD_DEAD; + + mk_list_foreach(head, &dt->chan_list) { + chan = mk_list_entry(head, struct mk_thread_channel, _head); + chan->receiver = -1; + } + sch->n_dthread--; + sch->running_id = dt->parent_id; +} + +struct mk_thread_scheduler *mk_thread_open() +{ + struct mk_thread_scheduler *sch; + + sch = mk_mem_alloc(sizeof(*sch)); + if (!sch) { + return NULL; + } + + sch->n_dthread = 0; + sch->cap = DEFAULT_MK_THREAD_NUM; + sch->running_id = -1; + sch->dt = mk_mem_alloc_z(sizeof(struct mk_thread *) * sch->cap); + if (!sch->dt) { + mk_mem_free(sch); + return NULL; + } + + return sch; +} + +void mk_thread_close(struct mk_thread_scheduler *sch) +{ + struct mk_thread *dt; + + int i; + for (i = 0; i < sch->cap; ++i) { + dt = sch->dt[i]; + if (dt) { + _mk_thread_release(dt); + } + } + mk_mem_free(sch->dt); + sch->dt = NULL; + mk_mem_free(sch); +} + +/* + * @METHOD_NAME: create + * @METHOD_DESC: create a new dthread. + * @METHOD_PROTO: int create(mk_thread_func func, void *data) + * @METHOD_PARAM: func the function to be executed when the newly created dthread + * is started. + * @METHOD_PARAM: data user specific data that will be passed to func. + * @METHOD_RETURN: the dthread id associated with the new dthread. + */ +int mk_thread_create(mk_thread_func func, void *data) +{ + int i; + int id; + void *p; + struct mk_thread_scheduler *sch; + struct mk_thread *dt; + + sch = pthread_getspecific(mk_thread_scheduler); + if (!sch) { + sch = mk_thread_open(); + assert(sch); + pthread_setspecific(mk_thread_scheduler, (void *) sch); + } + + if (sch->n_dthread >= sch->cap) { + id = sch->cap; + + p = mk_mem_realloc(sch->dt, sch->cap * 2 * sizeof(struct mk_thread *)); + if (!p) { + return -1; + } + sch->dt = p; + memset(sch->dt + sch->cap, 0, sizeof(struct mk_thread *) * sch->cap); + sch->cap *= 2; + } + else { + for (i = 0; i < sch->cap; ++i) { + id = (i + sch->cap) % sch->cap; + if (sch->dt[id] == NULL || sch->dt[id]->status == MK_THREAD_DEAD) { + break; + } + } + } + + /* may use dthread pooling instead of release and realloc */ + if (sch->dt[id] && sch->dt[id]->status == MK_THREAD_DEAD) { + _mk_thread_release(sch->dt[id]); + sch->dt[id] = NULL; + } + + dt = mk_mem_alloc(sizeof(*dt)); + if (!dt) { + return -1; + } + + dt->func = func; + dt->data = data; + dt->sch = sch; + dt->status = MK_THREAD_READY; + dt->parent_id = -1; +#ifdef USE_VALGRIND + dt->valgrind_stack_id = VALGRIND_STACK_REGISTER(dt->stack, dt->stack + MK_THREAD_STACK_SIZE); +#endif + mk_list_init(&dt->chan_list); + sch->dt[id] = dt; + sch->n_dthread++; + return id; +} + +static void _mk_thread_release(struct mk_thread *dt) +{ + assert(dt); +#ifdef USE_VALGRIND + VALGRIND_STACK_DEREGISTER(dt->valgrind_stack_id); +#endif + mk_mem_free(dt); +} + +/* + * @METHOD_NAME: status + * @METHOD_DESC: get the status of a given dthread. + * @METHOD_PROTO: int status(int id) + * @METHOD_PARAM: id the dthread id of the target dthread. + * @METHOD_RETURN: it returns one of the following status: MK_THREAD_DEAD, MK_THREAD_READY, + * MK_THREAD_RUNNING, MK_THREAD_SUSPEND. + */ +int mk_thread_status(int id) +{ + struct mk_thread_scheduler *sch; + + sch = pthread_getspecific(mk_thread_scheduler); + assert(sch); + assert(id >= 0 && id < sch->cap); + if (!sch->dt[id]) return MK_THREAD_DEAD; + return sch->dt[id]->status; +} + +/* + * @METHOD_NAME: yield + * @METHOD_DESC: require the currently running dthread explicitly to give up control + * back to the dthread scheduler. + * @METHOD_PROTO: void yield() + * @METHOD_RETURN: this method do not return any value. + */ +void mk_thread_yield() +{ + int id; + struct mk_thread *dt; + struct mk_thread_scheduler *sch; + + sch = pthread_getspecific(mk_thread_scheduler); + assert(sch); + + id = sch->running_id; + assert(id >= 0); + + dt = sch->dt[id]; + dt->status = MK_THREAD_SUSPEND; + sch->running_id = -1; + swapcontext(&dt->context, &sch->main); +} + +/* + * @METHOD_NAME: resume + * @METHOD_DESC: resume a given dthread and suspend the currently running dthread. + * @METHOD_PROTO: void resume(int id) + * @METHOD_PARAM: id the dthread id of the target dthread. + * @METHOD_RETURN: this method do not return any value. + */ +void mk_thread_resume(int id) +{ + struct mk_thread *dt; + struct mk_thread *running_dt; + struct mk_thread_scheduler *sch; + + sch = pthread_getspecific(mk_thread_scheduler); + assert(sch); + assert(id >= 0 && id < sch->cap); + + running_dt = NULL; + if (sch->running_id != -1) { + running_dt = sch->dt[sch->running_id]; + } + + dt = sch->dt[id]; + if (!dt) return; + switch (dt->status) { + case MK_THREAD_READY: + getcontext(&dt->context); + dt->context.uc_stack.ss_sp = dt->stack; + dt->context.uc_stack.ss_size = MK_THREAD_STACK_SIZE; + if (running_dt) { + dt->context.uc_link = &running_dt->context; + dt->parent_id = sch->running_id; + running_dt->status = MK_THREAD_SUSPEND; + } else { + dt->context.uc_link = &sch->main; + } + sch->running_id = id; + dt->status = MK_THREAD_RUNNING; + makecontext(&dt->context, (void (*)(void))_mk_thread_entry_point, 1, sch); + if (running_dt) { + swapcontext(&running_dt->context, &dt->context); + } else { + swapcontext(&sch->main, &dt->context); + } + break; + case MK_THREAD_SUSPEND: + sch->running_id = id; + dt->status = MK_THREAD_RUNNING; + if (running_dt) { + running_dt->status = MK_THREAD_SUSPEND; + swapcontext(&running_dt->context, &dt->context); + } else { + swapcontext(&sch->main, &dt->context); + } + break; + default: + assert(0); + } +} + +/* + * @METHOD_NAME: running + * @METHOD_DESC: get the id of the currently running dthread. + * @METHOD_PROTO: int running() + * @METHOD_RETURN: the dthread id associated with the currently running dthread. + */ +int mk_thread_running() +{ + struct mk_thread_scheduler *sch; + + sch = pthread_getspecific(mk_thread_scheduler); + assert(sch); + return sch->running_id; +} + +void mk_thread_add_channel(int id, struct mk_thread_channel *chan) +{ + struct mk_thread_scheduler *sch; + struct mk_thread *dt; + + assert(chan); + sch = pthread_getspecific(mk_thread_scheduler); + assert(sch); + assert(id >= 0 && id < sch->cap); + dt = sch->dt[id]; + mk_list_add(&chan->_head, &dt->chan_list); +} |