summaryrefslogtreecommitdiffstats
path: root/include/haproxy/task.h
blob: 1c9c45f496ac751a9daa202ec3392b5d1629e50f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
/*
 * include/haproxy/task.h
 * Functions for task management.
 *
 * Copyright (C) 2000-2020 Willy Tarreau - w@1wt.eu
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation, version 2.1
 * exclusively.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */

#ifndef _HAPROXY_TASK_H
#define _HAPROXY_TASK_H


#include <sys/time.h>

#include <import/eb32tree.h>

#include <haproxy/activity.h>
#include <haproxy/api.h>
#include <haproxy/clock.h>
#include <haproxy/fd.h>
#include <haproxy/global.h>
#include <haproxy/intops.h>
#include <haproxy/list.h>
#include <haproxy/pool.h>
#include <haproxy/task-t.h>
#include <haproxy/thread.h>
#include <haproxy/ticks.h>


/* Principle of the wait queue.
 *
 * We want to be able to tell whether an expiration date is before of after the
 * current time <now>. We KNOW that expiration dates are never too far apart,
 * because they are measured in ticks (milliseconds). We also know that almost
 * all dates will be in the future, and that a very small part of them will be
 * in the past, they are the ones which have expired since last time we checked
 * them. Using ticks, we know if a date is in the future or in the past, but we
 * cannot use that to store sorted information because that reference changes
 * all the time.
 *
 * We'll use the fact that the time wraps to sort timers. Timers above <now>
 * are in the future, timers below <now> are in the past. Here, "above" and
 * "below" are to be considered modulo 2^31.
 *
 * Timers are stored sorted in an ebtree. We use the new ability for ebtrees to
 * lookup values starting from X to only expire tasks between <now> - 2^31 and
 * <now>. If the end of the tree is reached while walking over it, we simply
 * loop back to the beginning. That way, we have no problem keeping sorted
 * wrapping timers in a tree, between (now - 24 days) and (now + 24 days). The
 * keys in the tree always reflect their real position, none can be infinite.
 * This reduces the number of checks to be performed.
 *
 * Another nice optimisation is to allow a timer to stay at an old place in the
 * queue as long as it's not further than the real expiration date. That way,
 * we use the tree as a place holder for a minorant of the real expiration
 * date. Since we have a very low chance of hitting a timeout anyway, we can
 * bounce the nodes to their right place when we scan the tree if we encounter
 * a misplaced node once in a while. This even allows us not to remove the
 * infinite timers from the wait queue.
 *
 * So, to summarize, we have :
 *   - node->key always defines current position in the wait queue
 *   - timer is the real expiration date (possibly infinite)
 *   - node->key is always before or equal to timer
 *
 * The run queue works similarly to the wait queue except that the current date
 * is replaced by an insertion counter which can also wrap without any problem.
 */

/* The farthest we can look back in a timer tree */
#define TIMER_LOOK_BACK       (1U << 31)

/* tasklets are recognized with nice==-32768 */
#define TASK_IS_TASKLET(t) ((t)->state & TASK_F_TASKLET)

/* a few exported variables */
extern struct pool_head *pool_head_task;
extern struct pool_head *pool_head_tasklet;
extern struct pool_head *pool_head_notification;

__decl_thread(extern HA_RWLOCK_T wq_lock THREAD_ALIGNED(64));

void __tasklet_wakeup_on(struct tasklet *tl, int thr);
struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
void task_kill(struct task *t);
void tasklet_kill(struct tasklet *t);
void __task_wakeup(struct task *t);
void __task_queue(struct task *task, struct eb_root *wq);

unsigned int run_tasks_from_lists(unsigned int budgets[]);

/*
 * This does 3 things :
 *   - wake up all expired tasks
 *   - call all runnable tasks
 *   - return the date of next event in <next> or eternity.
 */

void process_runnable_tasks(void);

/*
 * Extract all expired timers from the timer queue, and wakes up all
 * associated tasks.
 */
void wake_expired_tasks(void);

/* Checks the next timer for the current thread by looking into its own timer
 * list and the global one. It may return TICK_ETERNITY if no timer is present.
 * Note that the next timer might very well be slightly in the past.
 */
int next_timer_expiry(void);

/*
 * Delete every tasks before running the master polling loop
 */
void mworker_cleantasks(void);

/* returns the number of running tasks+tasklets on the whole process. Note
 * that this *is* racy since a task may move from the global to a local
 * queue for example and be counted twice. This is only for statistics
 * reporting.
 */
static inline int total_run_queues()
{
	int thr, ret = 0;

	for (thr = 0; thr < global.nbthread; thr++)
		ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].rq_total);
	return ret;
}

/* returns the number of allocated tasks across all threads. Note that this
 * *is* racy since some threads might be updating their counts while we're
 * looking, but this is only for statistics reporting.
 */
static inline int total_allocated_tasks()
{
	int thr, ret;

	for (thr = ret = 0; thr < global.nbthread; thr++)
		ret += _HA_ATOMIC_LOAD(&ha_thread_ctx[thr].nb_tasks);
	return ret;
}

/* returns the number of running niced tasks+tasklets on the whole process.
 * Note that this *is* racy since a task may move from the global to a local
 * queue for example and be counted twice. This is only for statistics
 * reporting.
 */
static inline int total_niced_running_tasks()
{
	int tgrp, ret = 0;

	for (tgrp = 0; tgrp < global.nbtgroups; tgrp++)
		ret += _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].niced_tasks);
	return ret;
}

/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
{
	/* Check if leaf_p is NULL, in case he's not in the runqueue, and if
	 * it's not 0x1, which would mean it's in the tasklet list.
	 */
	return t->rq.node.leaf_p != NULL;
}

/* return 0 if task is in wait queue, otherwise non-zero */
static inline int task_in_wq(struct task *t)
{
	return t->wq.node.leaf_p != NULL;
}

/* returns true if the current thread has some work to do */
static inline int thread_has_tasks(void)
{
	return ((int)!eb_is_empty(&th_ctx->rqueue) |
	        (int)!eb_is_empty(&th_ctx->rqueue_shared) |
	        (int)!!th_ctx->tl_class_mask |
		(int)!MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list));
}

/* puts the task <t> in run queue with reason flags <f>, and returns <t> */
/* This will put the task in the local runqueue if the task is only runnable
 * by the current thread, in the global runqueue otherwies. With DEBUG_TASK,
 * the <file>:<line> from the call place are stored into the task for tracing
 * purposes.
 */
#define task_wakeup(t, f) \
	_task_wakeup(t, f, MK_CALLER(WAKEUP_TYPE_TASK_WAKEUP, 0, 0))

static inline void _task_wakeup(struct task *t, unsigned int f, const struct ha_caller *caller)
{
	unsigned int state;

	state = _HA_ATOMIC_OR_FETCH(&t->state, f);
	while (!(state & (TASK_RUNNING | TASK_QUEUED))) {
		if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_QUEUED)) {
			if (likely(caller)) {
				caller = HA_ATOMIC_XCHG(&t->caller, caller);
				BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
				HA_ATOMIC_STORE(&t->debug.prev_caller, caller);
#endif
			}
			__task_wakeup(t);
			break;
		}
	}
}

/* Atomically drop the TASK_RUNNING bit while ensuring that any wakeup that
 * happened since the flag was set will result in the task being queued (if
 * it wasn't already). This is used to safely drop the flag from within the
 * scheduler. The flag <f> is combined with existing flags before the test so
 * that it's possible to unconditionally wakeup the task and drop the RUNNING
 * flag if needed.
 */
static inline void task_drop_running(struct task *t, unsigned int f)
{
	unsigned int state, new_state;

	state = _HA_ATOMIC_LOAD(&t->state);

	while (1) {
		new_state = state | f;
		if (new_state & TASK_WOKEN_ANY)
			new_state |= TASK_QUEUED;

		if (_HA_ATOMIC_CAS(&t->state, &state, new_state & ~TASK_RUNNING))
			break;
		__ha_cpu_relax();
	}

	if ((new_state & ~state) & TASK_QUEUED)
		__task_wakeup(t);
}

/*
 * Unlink the task from the wait queue, and possibly update the last_timer
 * pointer. A pointer to the task itself is returned. The task *must* already
 * be in the wait queue before calling this function. If unsure, use the safer
 * task_unlink_wq() function.
 */
static inline struct task *__task_unlink_wq(struct task *t)
{
	eb32_delete(&t->wq);
	return t;
}

/* remove a task from its wait queue. It may either be the local wait queue if
 * the task is bound to a single thread or the global queue. If the task uses a
 * shared wait queue, the global wait queue lock is used.
 */
static inline struct task *task_unlink_wq(struct task *t)
{
	unsigned long locked;

	if (likely(task_in_wq(t))) {
		locked = t->tid < 0;
		BUG_ON(t->tid >= 0 && t->tid != tid && !(global.mode & MODE_STOPPING));
		if (locked)
			HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
		__task_unlink_wq(t);
		if (locked)
			HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
	}
	return t;
}

/* Place <task> into the wait queue, where it may already be. If the expiration
 * timer is infinite, do nothing and rely on wake_expired_task to clean up.
 * If the task uses a shared wait queue, it's queued into the global wait queue,
 * protected by the global wq_lock, otherwise by it necessarily belongs to the
 * current thread'sand is queued without locking.
 */
#define task_queue(t) \
	_task_queue(t, MK_CALLER(WAKEUP_TYPE_TASK_QUEUE, 0, 0))

static inline void _task_queue(struct task *task, const struct ha_caller *caller)
{
	/* If we already have a place in the wait queue no later than the
	 * timeout we're trying to set, we'll stay there, because it is very
	 * unlikely that we will reach the timeout anyway. If the timeout
	 * has been disabled, it's useless to leave the queue as well. We'll
	 * rely on wake_expired_tasks() to catch the node and move it to the
	 * proper place should it ever happen. Finally we only add the task
	 * to the queue if it was not there or if it was further than what
	 * we want.
	 */
	if (!tick_isset(task->expire))
		return;

#ifdef USE_THREAD
	if (task->tid < 0) {
		HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
			if (likely(caller)) {
				caller = HA_ATOMIC_XCHG(&task->caller, caller);
				BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
				HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
			}
			__task_queue(task, &tg_ctx->timers);
		}
		HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
	} else
#endif
	{
		BUG_ON(task->tid != tid);
		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
			if (likely(caller)) {
				caller = HA_ATOMIC_XCHG(&task->caller, caller);
				BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
				HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
			}
			__task_queue(task, &th_ctx->timers);
		}
	}
}

/* Change the thread affinity of a task to <thr>, which may either be a valid
 * thread number from 0 to nbthread-1, or a negative value to allow the task
 * to run on any thread.
 *
 * This may only be done from within the running task itself or during its
 * initialization. It will unqueue and requeue the task from the wait queue
 * if it was in it. This is safe against a concurrent task_queue() call because
 * task_queue() itself will unlink again if needed after taking into account
 * the new thread_mask.
 */
static inline void task_set_thread(struct task *t, int thr)
{
#ifndef USE_THREAD
	/* no shared queue without threads */
	thr = 0;
#endif
	if (unlikely(task_in_wq(t))) {
		task_unlink_wq(t);
		t->tid = thr;
		task_queue(t);
	}
	else {
		t->tid = thr;
	}
}

/* schedules tasklet <tl> to run onto thread <thr> or the current thread if
 * <thr> is negative. Note that it is illegal to wakeup a foreign tasklet if
 * its tid is negative and it is illegal to self-assign a tasklet that was
 * at least once scheduled on a specific thread. With DEBUG_TASK, the
 * <file>:<line> from the call place are stored into the tasklet for tracing
 * purposes.
 */
#define tasklet_wakeup_on(tl, thr) \
	_tasklet_wakeup_on(tl, thr, MK_CALLER(WAKEUP_TYPE_TASKLET_WAKEUP, 0, 0))

static inline void _tasklet_wakeup_on(struct tasklet *tl, int thr, const struct ha_caller *caller)
{
	unsigned int state = tl->state;

	do {
		/* do nothing if someone else already added it */
		if (state & TASK_IN_LIST)
			return;
	} while (!_HA_ATOMIC_CAS(&tl->state, &state, state | TASK_IN_LIST));

	/* at this point we're the first ones to add this task to the list */
	if (likely(caller)) {
		caller = HA_ATOMIC_XCHG(&tl->caller, caller);
		BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
		HA_ATOMIC_STORE(&tl->debug.prev_caller, caller);
#endif
	}

	if (_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_TASK_PROFILING)
		tl->wake_date = now_mono_time();
	__tasklet_wakeup_on(tl, thr);
}

/* schedules tasklet <tl> to run onto the thread designated by tl->tid, which
 * is either its owner thread if >= 0 or the current thread if < 0. When
 * DEBUG_TASK is set, the <file>:<line> from the call place are stored into the
 * task for tracing purposes.
 */
#define tasklet_wakeup(tl) \
	_tasklet_wakeup_on(tl, (tl)->tid, MK_CALLER(WAKEUP_TYPE_TASKLET_WAKEUP, 0, 0))

/* instantly wakes up task <t> on its owner thread even if it's not the current
 * one, bypassing the run queue. The purpose is to be able to avoid contention
 * in the global run queue for massively remote tasks (e.g. queue) when there's
 * no value in passing the task again through the priority ordering since it has
 * already been subject to it once (e.g. before entering process_stream). The
 * task goes directly into the shared mt_list as a tasklet and will run as
 * TL_URGENT. Great care is taken to be certain it's not queued nor running
 * already.
 */
#define task_instant_wakeup(t, f) \
	_task_instant_wakeup(t, f, MK_CALLER(WAKEUP_TYPE_TASK_INSTANT_WAKEUP, 0, 0))

static inline void _task_instant_wakeup(struct task *t, unsigned int f, const struct ha_caller *caller)
{
	int thr = t->tid;
	unsigned int state;

	if (thr < 0)
		thr = tid;

	/* first, let's update the task's state with the wakeup condition */
	state = _HA_ATOMIC_OR_FETCH(&t->state, f);

	/* next we need to make sure the task was not/will not be added to the
	 * run queue because the tasklet list's mt_list uses the same storage
	 * as the task's run_queue.
	 */
	do {
		/* do nothing if someone else already added it */
		if (state & (TASK_QUEUED|TASK_RUNNING))
			return;
	} while (!_HA_ATOMIC_CAS(&t->state, &state, state | TASK_QUEUED));

	BUG_ON_HOT(task_in_rq(t));

	/* at this point we're the first ones to add this task to the list */
	if (likely(caller)) {
		caller = HA_ATOMIC_XCHG(&t->caller, caller);
		BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
		HA_ATOMIC_STORE(&t->debug.prev_caller, caller);
#endif
	}

	if (_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_TASK_PROFILING)
		t->wake_date = now_mono_time();
	__tasklet_wakeup_on((struct tasklet *)t, thr);
}

/* schedules tasklet <tl> to run immediately after the current one is done
 * <tl> will be queued after entry <head>, or at the head of the task list. Return
 * the new head to be used to queue future tasks. This is used to insert multiple entries
 * at the head of the tasklet list, typically to transfer processing from a tasklet
 * to another one or a set of other ones. If <head> is NULL, the tasklet list of <thr>
 * thread will be used.
 * With DEBUG_TASK, the <file>:<line> from the call place are stored into the tasklet
 * for tracing purposes.
 */
#define tasklet_wakeup_after(head, tl) \
	_tasklet_wakeup_after(head, tl, MK_CALLER(WAKEUP_TYPE_TASKLET_WAKEUP_AFTER, 0, 0))

static inline struct list *_tasklet_wakeup_after(struct list *head, struct tasklet *tl,
                                                 const struct ha_caller *caller)
{
	unsigned int state = tl->state;

	do {
		/* do nothing if someone else already added it */
		if (state & TASK_IN_LIST)
			return head;
	} while (!_HA_ATOMIC_CAS(&tl->state, &state, state | TASK_IN_LIST));

	/* at this point we're the first one to add this task to the list */
	if (likely(caller)) {
		caller = HA_ATOMIC_XCHG(&tl->caller, caller);
		BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
		HA_ATOMIC_STORE(&tl->debug.prev_caller, caller);
#endif
	}

	if (th_ctx->flags & TH_FL_TASK_PROFILING)
		tl->wake_date = now_mono_time();
	return __tasklet_wakeup_after(head, tl);
}

/* This macro shows the current function name and the last known caller of the
 * task (or tasklet) wakeup.
 */
#ifdef DEBUG_TASK
#define DEBUG_TASK_PRINT_CALLER(t) do {						\
	const struct ha_caller *__caller = (t)->caller;			\
	printf("%s woken up from %s(%s:%d)\n", __FUNCTION__,		\
	       __caller ? __caller->func : NULL, \
	       __caller ? __caller->file : NULL, \
	       __caller ? __caller->line : 0); \
} while (0)
#else
#define DEBUG_TASK_PRINT_CALLER(t) do { } while (0)
#endif


/* Try to remove a tasklet from the list. This call is inherently racy and may
 * only be performed on the thread that was supposed to dequeue this tasklet.
 * This way it is safe to call MT_LIST_DELETE without first removing the
 * TASK_IN_LIST bit, which must absolutely be removed afterwards in case
 * another thread would want to wake this tasklet up in parallel.
 */
static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
{
	if (MT_LIST_DELETE(list_to_mt_list(&t->list))) {
		_HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST);
		_HA_ATOMIC_DEC(&ha_thread_ctx[t->tid >= 0 ? t->tid : tid].rq_total);
	}
}

/*
 * Initialize a new task. The bare minimum is performed (queue pointers and
 * state).  The task is returned. This function should not be used outside of
 * task_new(). If the thread ID is < 0, the task may run on any thread.
 */
static inline struct task *task_init(struct task *t, int tid)
{
	t->wq.node.leaf_p = NULL;
	t->rq.node.leaf_p = NULL;
	t->state = TASK_SLEEPING;
#ifndef USE_THREAD
	/* no shared wq without threads */
	tid = 0;
#endif
	t->tid = tid;
	t->nice = 0;
	t->calls = 0;
	t->wake_date = 0;
	t->expire = TICK_ETERNITY;
	t->caller = NULL;
	return t;
}

/* Initialize a new tasklet. It's identified as a tasklet by its flags
 * TASK_F_TASKLET. It is expected to run on the calling thread by default,
 * it's up to the caller to change ->tid if it wants to own it.
 */
static inline void tasklet_init(struct tasklet *t)
{
	t->calls = 0;
	t->state = TASK_F_TASKLET;
	t->process = NULL;
	t->tid = -1;
	t->wake_date = 0;
	t->caller = NULL;
	LIST_INIT(&t->list);
}

/* Allocate and initialize a new tasklet, local to the thread by default. The
 * caller may assign its tid if it wants to own the tasklet.
 */
static inline struct tasklet *tasklet_new(void)
{
	struct tasklet *t = pool_alloc(pool_head_tasklet);

	if (t) {
		tasklet_init(t);
	}
	return t;
}

/*
 * Allocate and initialize a new task, to run on global thread <thr>, or any
 * thread if negative. The task count is incremented. The new task is returned,
 * or NULL in case of lack of memory. It's up to the caller to pass a valid
 * thread number (in tid space, 0 to nbthread-1, or <0 for any). Tasks created
 * this way must be freed using task_destroy().
 */
static inline struct task *task_new_on(int thr)
{
	struct task *t = pool_alloc(pool_head_task);
	if (t) {
		th_ctx->nb_tasks++;
		task_init(t, thr);
	}
	return t;
}

/* Allocate and initialize a new task, to run on the calling thread. The new
 * task is returned, or NULL in case of lack of memory. The task count is
 * incremented.
 */
static inline struct task *task_new_here()
{
	return task_new_on(tid);
}

/* Allocate and initialize a new task, to run on any thread. The new task is
 * returned, or NULL in case of lack of memory. The task count is incremented.
 */
static inline struct task *task_new_anywhere()
{
	return task_new_on(-1);
}

/*
 * Free a task. Its context must have been freed since it will be lost. The
 * task count is decremented. It it is the current task, this one is reset.
 */
static inline void __task_free(struct task *t)
{
	if (t == th_ctx->current) {
		th_ctx->current = NULL;
		__ha_barrier_store();
	}
	BUG_ON(task_in_wq(t) || task_in_rq(t));

	BUG_ON((ulong)t->caller & 1);
#ifdef DEBUG_TASK
	HA_ATOMIC_STORE(&t->debug.prev_caller, HA_ATOMIC_LOAD(&t->caller));
#endif
	HA_ATOMIC_STORE(&t->caller, (void*)1); // make sure to crash if used after free

	pool_free(pool_head_task, t);
	th_ctx->nb_tasks--;
	if (unlikely(stopping))
		pool_flush(pool_head_task);
}

/* Destroys a task : it's unlinked from the wait queues and is freed if it's
 * the current task or not queued otherwise it's marked to be freed by the
 * scheduler. It does nothing if <t> is NULL.
 */
static inline void task_destroy(struct task *t)
{
	if (!t)
		return;

	task_unlink_wq(t);
	/* We don't have to explicitly remove from the run queue.
	 * If we are in the runqueue, the test below will set t->process
	 * to NULL, and the task will be free'd when it'll be its turn
	 * to run.
	 */

	/* There's no need to protect t->state with a lock, as the task
	 * has to run on the current thread.
	 */
	if (t == th_ctx->current || !(t->state & (TASK_QUEUED | TASK_RUNNING)))
		__task_free(t);
	else
		t->process = NULL;
}

/* Should only be called by the thread responsible for the tasklet */
static inline void tasklet_free(struct tasklet *tl)
{
	if (!tl)
		return;

	if (MT_LIST_DELETE(list_to_mt_list(&tl->list)))
		_HA_ATOMIC_DEC(&ha_thread_ctx[tl->tid >= 0 ? tl->tid : tid].rq_total);

	BUG_ON((ulong)tl->caller & 1);
#ifdef DEBUG_TASK
	HA_ATOMIC_STORE(&tl->debug.prev_caller, HA_ATOMIC_LOAD(&tl->caller));
#endif
	HA_ATOMIC_STORE(&tl->caller, (void*)1); // make sure to crash if used after free
	pool_free(pool_head_tasklet, tl);
	if (unlikely(stopping))
		pool_flush(pool_head_tasklet);
}

static inline void tasklet_set_tid(struct tasklet *tl, int tid)
{
	tl->tid = tid;
}

/* Ensure <task> will be woken up at most at <when>. If the task is already in
 * the run queue (but not running), nothing is done. It may be used that way
 * with a delay :  task_schedule(task, tick_add(now_ms, delay));
 * It MUST NOT be used with a timer in the past, and even less with
 * TICK_ETERNITY (which would block all timers). Note that passing it directly
 * now_ms without using tick_add() will definitely make this happen once every
 * 49.7 days.
 */
#define task_schedule(t, w) \
	_task_schedule(t, w, MK_CALLER(WAKEUP_TYPE_TASK_SCHEDULE, 0, 0))

static inline void _task_schedule(struct task *task, int when, const struct ha_caller *caller)
{
	/* TODO: mthread, check if there is no tisk with this test */
	if (task_in_rq(task))
		return;

#ifdef USE_THREAD
	if (task->tid < 0) {
		/* FIXME: is it really needed to lock the WQ during the check ? */
		HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
		if (task_in_wq(task))
			when = tick_first(when, task->expire);

		task->expire = when;
		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
			if (likely(caller)) {
				caller = HA_ATOMIC_XCHG(&task->caller, caller);
				BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
				HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
			}
			__task_queue(task, &tg_ctx->timers);
		}
		HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
	} else
#endif
	{
		BUG_ON(task->tid != tid);
		if (task_in_wq(task))
			when = tick_first(when, task->expire);

		task->expire = when;
		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
			if (likely(caller)) {
				caller = HA_ATOMIC_XCHG(&task->caller, caller);
				BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
				HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
			}
			__task_queue(task, &th_ctx->timers);
		}
	}
}

/* returns the string corresponding to a task type as found in the task caller
 * locations.
 */
static inline const char *task_wakeup_type_str(uint t)
{
	switch (t) {
	case WAKEUP_TYPE_TASK_WAKEUP          : return "task_wakeup";
	case WAKEUP_TYPE_TASK_INSTANT_WAKEUP  : return "task_instant_wakeup";
	case WAKEUP_TYPE_TASKLET_WAKEUP       : return "tasklet_wakeup";
	case WAKEUP_TYPE_TASKLET_WAKEUP_AFTER : return "tasklet_wakeup_after";
	case WAKEUP_TYPE_TASK_QUEUE           : return "task_queue";
	case WAKEUP_TYPE_TASK_SCHEDULE        : return "task_schedule";
	case WAKEUP_TYPE_APPCTX_WAKEUP        : return "appctx_wakeup";
	default                               : return "?";
	}
}

/* This function register a new signal. "lua" is the current lua
 * execution context. It contains a pointer to the associated task.
 * "link" is a list head attached to an other task that must be wake
 * the lua task if an event occurs. This is useful with external
 * events like TCP I/O or sleep functions. This function allocate
 * memory for the signal.
 */
static inline struct notification *notification_new(struct list *purge, struct list *event, struct task *wakeup)
{
	struct notification *com = pool_alloc(pool_head_notification);
	if (!com)
		return NULL;
	LIST_APPEND(purge, &com->purge_me);
	LIST_APPEND(event, &com->wake_me);
	HA_SPIN_INIT(&com->lock);
	com->task = wakeup;
	return com;
}

/* This function purge all the pending signals when the LUA execution
 * is finished. This prevent than a coprocess try to wake a deleted
 * task. This function remove the memory associated to the signal.
 * The purge list is not locked because it is owned by only one
 * process. before browsing this list, the caller must ensure to be
 * the only one browser.
 */
static inline void notification_purge(struct list *purge)
{
	struct notification *com, *back;

	/* Delete all pending communication signals. */
	list_for_each_entry_safe(com, back, purge, purge_me) {
		HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
		LIST_DELETE(&com->purge_me);
		if (!com->task) {
			HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
			pool_free(pool_head_notification, com);
			continue;
		}
		com->task = NULL;
		HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
	}
}

/* In some cases, the disconnected notifications must be cleared.
 * This function just release memory blocks. The purge list is not
 * locked because it is owned by only one process. Before browsing
 * this list, the caller must ensure to be the only one browser.
 * The "com" is not locked because when com->task is NULL, the
 * notification is no longer used.
 */
static inline void notification_gc(struct list *purge)
{
	struct notification *com, *back;

	/* Delete all pending communication signals. */
	list_for_each_entry_safe (com, back, purge, purge_me) {
		if (com->task)
			continue;
		LIST_DELETE(&com->purge_me);
		pool_free(pool_head_notification, com);
	}
}

/* This function sends signals. It wakes all the tasks attached
 * to a list head, and remove the signal, and free the used
 * memory. The wake list is not locked because it is owned by
 * only one process. before browsing this list, the caller must
 * ensure to be the only one browser.
 */
static inline void notification_wake(struct list *wake)
{
	struct notification *com, *back;

	/* Wake task and delete all pending communication signals. */
	list_for_each_entry_safe(com, back, wake, wake_me) {
		HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
		LIST_DELETE(&com->wake_me);
		if (!com->task) {
			HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
			pool_free(pool_head_notification, com);
			continue;
		}
		task_wakeup(com->task, TASK_WOKEN_MSG);
		com->task = NULL;
		HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
	}
}

/* This function returns true is some notification are pending
 */
static inline int notification_registered(struct list *wake)
{
	return !LIST_ISEMPTY(wake);
}

#endif /* _HAPROXY_TASK_H */

/*
 * Local variables:
 *  c-indent-level: 8
 *  c-basic-offset: 8
 * End:
 */