summaryrefslogtreecommitdiffstats
path: root/web/server/h2o/libh2o/deps/klib/kthread.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--web/server/h2o/libh2o/deps/klib/kthread.c143
1 files changed, 143 insertions, 0 deletions
diff --git a/web/server/h2o/libh2o/deps/klib/kthread.c b/web/server/h2o/libh2o/deps/klib/kthread.c
new file mode 100644
index 00000000..80f84cb3
--- /dev/null
+++ b/web/server/h2o/libh2o/deps/klib/kthread.c
@@ -0,0 +1,143 @@
+#include <pthread.h>
+#include <stdlib.h>
+#include <limits.h>
+
+/************
+ * kt_for() *
+ ************/
+
+struct kt_for_t;
+
+typedef struct {
+ struct kt_for_t *t;
+ long i;
+} ktf_worker_t;
+
+typedef struct kt_for_t {
+ int n_threads;
+ long n;
+ ktf_worker_t *w;
+ void (*func)(void*,long,int);
+ void *data;
+} kt_for_t;
+
+static inline long steal_work(kt_for_t *t)
+{
+ int i, min_i = -1;
+ long k, min = LONG_MAX;
+ for (i = 0; i < t->n_threads; ++i)
+ if (min > t->w[i].i) min = t->w[i].i, min_i = i;
+ k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
+ return k >= t->n? -1 : k;
+}
+
+static void *ktf_worker(void *data)
+{
+ ktf_worker_t *w = (ktf_worker_t*)data;
+ long i;
+ for (;;) {
+ i = __sync_fetch_and_add(&w->i, w->t->n_threads);
+ if (i >= w->t->n) break;
+ w->t->func(w->t->data, i, w - w->t->w);
+ }
+ while ((i = steal_work(w->t)) >= 0)
+ w->t->func(w->t->data, i, w - w->t->w);
+ pthread_exit(0);
+}
+
+void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n)
+{
+ int i;
+ kt_for_t t;
+ pthread_t *tid;
+ t.func = func, t.data = data, t.n_threads = n_threads, t.n = n;
+ t.w = (ktf_worker_t*)alloca(n_threads * sizeof(ktf_worker_t));
+ tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t));
+ for (i = 0; i < n_threads; ++i)
+ t.w[i].t = &t, t.w[i].i = i;
+ for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
+ for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
+}
+
+/*****************
+ * kt_pipeline() *
+ *****************/
+
+struct ktp_t;
+
+typedef struct {
+ struct ktp_t *pl;
+ int step, running;
+ void *data;
+} ktp_worker_t;
+
+typedef struct ktp_t {
+ void *shared;
+ void *(*func)(void*, int, void*);
+ int n_workers, n_steps;
+ ktp_worker_t *workers;
+ pthread_mutex_t mutex;
+ pthread_cond_t cv;
+} ktp_t;
+
+static void *ktp_worker(void *data)
+{
+ ktp_worker_t *w = (ktp_worker_t*)data;
+ ktp_t *p = w->pl;
+ while (w->step < p->n_steps) {
+ // test whether we can kick off the job with this worker
+ pthread_mutex_lock(&p->mutex);
+ for (;;) {
+ int i;
+ // test whether another worker is doing the same step
+ for (i = 0; i < p->n_workers; ++i) {
+ if (w == &p->workers[i]) continue; // ignore itself
+ if (p->workers[i].running && p->workers[i].step == w->step)
+ break;
+ }
+ if (i == p->n_workers) break; // no other workers doing w->step; then this worker will
+ pthread_cond_wait(&p->cv, &p->mutex);
+ }
+ w->running = 1;
+ pthread_mutex_unlock(&p->mutex);
+
+ // working on w->step
+ w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL
+
+ // update step and let other workers know
+ pthread_mutex_lock(&p->mutex);
+ w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
+ w->running = 0;
+ pthread_cond_broadcast(&p->cv);
+ pthread_mutex_unlock(&p->mutex);
+ }
+ pthread_exit(0);
+}
+
+void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
+{
+ ktp_t aux;
+ pthread_t *tid;
+ int i;
+
+ if (n_threads < 1) n_threads = 1;
+ aux.n_workers = n_threads;
+ aux.n_steps = n_steps;
+ aux.func = func;
+ aux.shared = shared_data;
+ pthread_mutex_init(&aux.mutex, 0);
+ pthread_cond_init(&aux.cv, 0);
+
+ aux.workers = alloca(n_threads * sizeof(ktp_worker_t));
+ for (i = 0; i < n_threads; ++i) {
+ ktp_worker_t *w = &aux.workers[i];
+ w->step = w->running = 0; w->pl = &aux; w->data = 0;
+ }
+
+ tid = alloca(n_threads * sizeof(pthread_t));
+ for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]);
+ for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
+
+ pthread_mutex_destroy(&aux.mutex);
+ pthread_cond_destroy(&aux.cv);
+}