1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
|
/*
* general purpose event handlers management
*
* Copyright 2022 HAProxy Technologies
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2.1 of the License, or (at your option) any later version.
*
*/
#include <string.h>
#include <haproxy/event_hdl.h>
#include <haproxy/compiler.h>
#include <haproxy/task.h>
#include <haproxy/tools.h>
#include <haproxy/errors.h>
#include <haproxy/signal.h>
#include <haproxy/xxhash.h>
#include <haproxy/cfgparse.h>
/* event types changes in event_hdl-t.h file should be reflected in the
* map below to allow string to type and type to string conversions
*/
static struct event_hdl_sub_type_map event_hdl_sub_type_map[] = {
{"NONE", EVENT_HDL_SUB_NONE},
{"SERVER", EVENT_HDL_SUB_SERVER},
{"SERVER_ADD", EVENT_HDL_SUB_SERVER_ADD},
{"SERVER_DEL", EVENT_HDL_SUB_SERVER_DEL},
{"SERVER_UP", EVENT_HDL_SUB_SERVER_UP},
{"SERVER_DOWN", EVENT_HDL_SUB_SERVER_DOWN},
{"SERVER_STATE", EVENT_HDL_SUB_SERVER_STATE},
{"SERVER_ADMIN", EVENT_HDL_SUB_SERVER_ADMIN},
{"SERVER_CHECK", EVENT_HDL_SUB_SERVER_CHECK},
{"SERVER_INETADDR", EVENT_HDL_SUB_SERVER_INETADDR},
};
/* internal types (only used in this file) */
struct event_hdl_async_task_default_ctx
{
event_hdl_async_equeue e_queue; /* event queue list */
event_hdl_cb_async func; /* event handling func */
};
/* memory pools declarations */
DECLARE_STATIC_POOL(pool_head_sub, "ehdl_sub", sizeof(struct event_hdl_sub));
DECLARE_STATIC_POOL(pool_head_sub_event, "ehdl_sub_e", sizeof(struct event_hdl_async_event));
DECLARE_STATIC_POOL(pool_head_sub_event_data, "ehdl_sub_ed", sizeof(struct event_hdl_async_event_data));
DECLARE_STATIC_POOL(pool_head_sub_taskctx, "ehdl_sub_tctx", sizeof(struct event_hdl_async_task_default_ctx));
/* global event_hdl tunables (public variable) */
struct event_hdl_tune event_hdl_tune;
/* global subscription list (implicit where NULL is used as sublist argument) */
static event_hdl_sub_list global_event_hdl_sub_list;
/* every known subscription lists are tracked in this list (including the global one) */
static struct mt_list known_event_hdl_sub_list = MT_LIST_HEAD_INIT(known_event_hdl_sub_list);
static void _event_hdl_sub_list_destroy(event_hdl_sub_list *sub_list);
static void event_hdl_deinit(struct sig_handler *sh)
{
event_hdl_sub_list *cur_list;
struct mt_list *elt1, elt2;
/* destroy all known subscription lists */
mt_list_for_each_entry_safe(cur_list, &known_event_hdl_sub_list, known, elt1, elt2) {
/* remove cur elem from list */
MT_LIST_DELETE_SAFE(elt1);
/* then destroy it */
_event_hdl_sub_list_destroy(cur_list);
}
}
static void event_hdl_init(void)
{
/* initialize global subscription list */
event_hdl_sub_list_init(&global_event_hdl_sub_list);
/* register the deinit function, will be called on soft-stop */
signal_register_fct(0, event_hdl_deinit, 0);
/* set some default values */
event_hdl_tune.max_events_at_once = EVENT_HDL_MAX_AT_ONCE;
}
/* general purpose hashing function when you want to compute
* an ID based on <scope> x <name>
* It is your responsibility to make sure <scope> is not used
* elsewhere in the code (or that you are fine with sharing
* the scope).
*/
inline uint64_t event_hdl_id(const char *scope, const char *name)
{
XXH64_state_t state;
XXH64_reset(&state, 0);
XXH64_update(&state, scope, strlen(scope));
XXH64_update(&state, name, strlen(name));
return XXH64_digest(&state);
}
/* takes a sub_type as input, returns corresponding sub_type
* printable string or "N/A" if not found.
* If not found, an error will be reported to stderr so the developers
* know that a sub_type is missing its associated string in event_hdl-t.h
*/
const char *event_hdl_sub_type_to_string(struct event_hdl_sub_type sub_type)
{
int it;
for (it = 0; it < (int)(sizeof(event_hdl_sub_type_map) / sizeof(event_hdl_sub_type_map[0])); it++) {
if (sub_type.family == event_hdl_sub_type_map[it].type.family &&
sub_type.subtype == event_hdl_sub_type_map[it].type.subtype)
return event_hdl_sub_type_map[it].name;
}
ha_alert("event_hdl-t.h: missing sub_type string representation.\n"
"Please reflect any changes in event_hdl_sub_type_map.\n");
return "N/A";
}
/* returns the internal sub_type corresponding
* to the printable representation <name>
* or EVENT_HDL_SUB_NONE if no such event exists
* (see event_hdl-t.h for the complete list of supported types)
*/
struct event_hdl_sub_type event_hdl_string_to_sub_type(const char *name)
{
int it;
for (it = 0; it < (int)(sizeof(event_hdl_sub_type_map) / sizeof(event_hdl_sub_type_map[0])); it++) {
if (!strcmp(name, event_hdl_sub_type_map[it].name))
return event_hdl_sub_type_map[it].type;
}
return EVENT_HDL_SUB_NONE;
}
/* Takes <subscriptions> sub list as input, returns a printable string
* containing every sub_types contained in <subscriptions>
* separated by '|' char.
* Returns NULL if no sub_types are found in <subscriptions>
* This functions leverages memprintf, thus it is up to the
* caller to free the returned value (if != NULL) when he no longer
* uses it.
*/
char *event_hdl_sub_type_print(struct event_hdl_sub_type subscriptions)
{
char *out = NULL;
int it;
uint8_t first = 1;
for (it = 0; it < (int)(sizeof(event_hdl_sub_type_map) / sizeof(event_hdl_sub_type_map[0])); it++) {
if (subscriptions.family == event_hdl_sub_type_map[it].type.family &&
((subscriptions.subtype & event_hdl_sub_type_map[it].type.subtype) ==
event_hdl_sub_type_map[it].type.subtype)) {
if (first) {
memprintf(&out, "%s", event_hdl_sub_type_map[it].name);
first--;
}
else
memprintf(&out, "%s%s%s", out, "|", event_hdl_sub_type_map[it].name);
}
}
return out;
}
/* event_hdl debug/reporting function */
typedef void (*event_hdl_report_hdl_state_func)(const char *fmt, ...);
static void event_hdl_report_hdl_state(event_hdl_report_hdl_state_func report_func,
const struct event_hdl *hdl, const char *what, const char *state)
{
report_func("[event_hdl]:%s (%s)'#%llu@%s': %s\n",
what,
(hdl->async) ? "ASYNC" : "SYNC",
(long long unsigned int)hdl->id,
hdl->dorigin,
state);
}
static inline void _event_hdl_async_data_drop(struct event_hdl_async_event_data *data)
{
if (HA_ATOMIC_SUB_FETCH(&data->refcount, 1) == 0) {
/* we were the last one holding a reference to event data - free required */
if (data->mfree) {
/* Some event data members are dynamically allocated and thus
* require specific cleanup using user-provided function.
* We directly pass a pointer to internal data storage but
* we only expect the cleanup function to typecast it in the
* relevant data type to give enough context to the function to
* perform the cleanup on data members, and not actually freeing
* data pointer since it is our internal buffer :)
*/
data->mfree(&data->data);
}
pool_free(pool_head_sub_event_data, data);
}
}
void event_hdl_async_free_event(struct event_hdl_async_event *e)
{
if (unlikely(event_hdl_sub_type_equal(e->type, EVENT_HDL_SUB_END))) {
/* last event for hdl, special case */
/* free subscription entry as we're the last one still using it
* (it is already removed from mt_list, no race can occur)
*/
event_hdl_drop(e->sub_mgmt.this);
HA_ATOMIC_DEC(&jobs);
}
else if (e->_data)
_event_hdl_async_data_drop(e->_data); /* data wrapper */
pool_free(pool_head_sub_event, e);
}
/* wakeup the task depending on its type:
* normal async mode internally uses tasklets but advanced async mode
* allows both tasks and tasklets.
* While tasks and tasklets may be easily casted, we need to use the proper
* API to wake them up (the waiting queues are exclusive).
*/
static void event_hdl_task_wakeup(struct tasklet *task)
{
if (TASK_IS_TASKLET(task))
tasklet_wakeup(task);
else
task_wakeup((struct task *)task, TASK_WOKEN_OTHER); /* TODO: switch to TASK_WOKEN_EVENT? */
}
/* task handler used for normal async subscription mode
* if you use advanced async subscription mode, you can use this
* as an example to implement your own task wrapper
*/
static struct task *event_hdl_async_task_default(struct task *task, void *ctx, unsigned int state)
{
struct tasklet *tl = (struct tasklet *)task;
struct event_hdl_async_task_default_ctx *task_ctx = ctx;
struct event_hdl_async_event *event;
int max_notif_at_once_it = 0;
uint8_t done = 0;
/* run through e_queue, and call func() for each event
* if we read END event, it indicates we must stop:
* no more events to come (handler is unregistered)
* so we must free task_ctx and stop task
*/
while (max_notif_at_once_it < event_hdl_tune.max_events_at_once &&
(event = event_hdl_async_equeue_pop(&task_ctx->e_queue)))
{
if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_END)) {
done = 1;
event_hdl_async_free_event(event);
/* break is normally not even required, EVENT_HDL_SUB_END
* is guaranteed to be last event of e_queue
* (because in normal mode one sub == one e_queue)
*/
break;
}
else {
struct event_hdl_cb cb;
cb.e_type = event->type;
cb.e_data = event->data;
cb.sub_mgmt = &event->sub_mgmt;
cb._sync = 0;
/* call user function */
task_ctx->func(&cb, event->private);
max_notif_at_once_it++;
}
event_hdl_async_free_event(event);
}
if (done) {
/* our job is done, subscription is over: no more events to come */
pool_free(pool_head_sub_taskctx, task_ctx);
tasklet_free(tl);
return NULL;
}
return task;
}
/* internal subscription mgmt functions */
static inline struct event_hdl_sub_type _event_hdl_getsub(struct event_hdl_sub *cur_sub)
{
return cur_sub->sub;
}
static inline struct event_hdl_sub_type _event_hdl_getsub_async(struct event_hdl_sub *cur_sub)
{
struct mt_list lock;
struct event_hdl_sub_type type = EVENT_HDL_SUB_NONE;
lock = MT_LIST_LOCK_ELT(&cur_sub->mt_list);
if (lock.next != &cur_sub->mt_list)
type = _event_hdl_getsub(cur_sub);
// else already removed
MT_LIST_UNLOCK_ELT(&cur_sub->mt_list, lock);
return type;
}
static inline int _event_hdl_resub(struct event_hdl_sub *cur_sub, struct event_hdl_sub_type type)
{
if (!event_hdl_sub_family_equal(cur_sub->sub, type))
return 0; /* family types differ, do nothing */
cur_sub->sub.subtype = type.subtype; /* new subtype assignment */
return 1;
}
static inline int _event_hdl_resub_async(struct event_hdl_sub *cur_sub, struct event_hdl_sub_type type)
{
int status = 0;
struct mt_list lock;
lock = MT_LIST_LOCK_ELT(&cur_sub->mt_list);
if (lock.next != &cur_sub->mt_list)
status = _event_hdl_resub(cur_sub, type);
// else already removed
MT_LIST_UNLOCK_ELT(&cur_sub->mt_list, lock);
return status;
}
static inline void _event_hdl_unsubscribe(struct event_hdl_sub *del_sub)
{
struct mt_list lock;
if (del_sub->hdl.async) {
/* ASYNC SUB MODE */
/* push EVENT_HDL_SUB_END (to notify the task that the subscription is dead) */
/* push END EVENT in busy state so we can safely wakeup
* the task before releasing it.
* Not doing that would expose us to a race where the task could've already
* consumed the END event before the wakeup, and some tasks
* kill themselves (ie: normal async mode) when they receive such event
*/
HA_ATOMIC_INC(&del_sub->hdl.async_equeue->size);
lock = MT_LIST_APPEND_LOCKED(&del_sub->hdl.async_equeue->head, &del_sub->async_end->mt_list);
/* wake up the task */
event_hdl_task_wakeup(del_sub->hdl.async_task);
/* unlock END EVENT (we're done, the task is now free to consume it) */
MT_LIST_UNLOCK_ELT(&del_sub->async_end->mt_list, lock);
/* we don't free sub here
* freeing will be performed by async task so it can safely rely
* on the pointer until it notices it
*/
} else {
/* SYNC SUB MODE */
/* we can directly free the subscription:
* no other thread can access it since we successfully
* removed it from the list
*/
event_hdl_drop(del_sub);
}
}
static inline void _event_hdl_unsubscribe_async(struct event_hdl_sub *del_sub)
{
if (!MT_LIST_DELETE(&del_sub->mt_list))
return; /* already removed (but may be pending in e_queues) */
_event_hdl_unsubscribe(del_sub);
}
/* sub_mgmt function pointers (for handlers) */
static struct event_hdl_sub_type event_hdl_getsub_sync(const struct event_hdl_sub_mgmt *mgmt)
{
if (!mgmt)
return EVENT_HDL_SUB_NONE;
if (!mgmt->this)
return EVENT_HDL_SUB_NONE; /* already removed from sync ctx */
return _event_hdl_getsub(mgmt->this);
}
static struct event_hdl_sub_type event_hdl_getsub_async(const struct event_hdl_sub_mgmt *mgmt)
{
if (!mgmt)
return EVENT_HDL_SUB_NONE;
return _event_hdl_getsub_async(mgmt->this);
}
static int event_hdl_resub_sync(const struct event_hdl_sub_mgmt *mgmt, struct event_hdl_sub_type type)
{
if (!mgmt)
return 0;
if (!mgmt->this)
return 0; /* already removed from sync ctx */
return _event_hdl_resub(mgmt->this, type);
}
static int event_hdl_resub_async(const struct event_hdl_sub_mgmt *mgmt, struct event_hdl_sub_type type)
{
if (!mgmt)
return 0;
return _event_hdl_resub_async(mgmt->this, type);
}
static void event_hdl_unsubscribe_sync(const struct event_hdl_sub_mgmt *mgmt)
{
if (!mgmt)
return;
if (!mgmt->this)
return; /* already removed from sync ctx */
/* assuming that publish sync code will notice that mgmt->this is NULL
* and will perform the list removal using MT_LIST_DELETE_SAFE and
* _event_hdl_unsubscribe()
* while still owning the lock
*/
((struct event_hdl_sub_mgmt *)mgmt)->this = NULL;
}
static void event_hdl_unsubscribe_async(const struct event_hdl_sub_mgmt *mgmt)
{
if (!mgmt)
return;
_event_hdl_unsubscribe_async(mgmt->this);
}
#define EVENT_HDL_SUB_MGMT_ASYNC(_sub) (struct event_hdl_sub_mgmt){ .this = _sub, \
.getsub = event_hdl_getsub_async, \
.resub = event_hdl_resub_async, \
.unsub = event_hdl_unsubscribe_async}
#define EVENT_HDL_SUB_MGMT_SYNC(_sub) (struct event_hdl_sub_mgmt){ .this = _sub, \
.getsub = event_hdl_getsub_sync, \
.resub = event_hdl_resub_sync, \
.unsub = event_hdl_unsubscribe_sync}
struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
struct event_hdl_sub_type e_type, struct event_hdl hdl)
{
struct event_hdl_sub *new_sub = NULL;
struct mt_list *elt1, elt2;
struct event_hdl_async_task_default_ctx *task_ctx = NULL;
struct mt_list lock;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
/* hdl API consistency check */
/*FIXME: do we need to ensure that if private is set, private_free should be set as well? */
BUG_ON((!hdl.async && !hdl.sync_ptr) ||
(hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL && !hdl.async_ptr) ||
(hdl.async == EVENT_HDL_ASYNC_MODE_ADVANCED &&
(!hdl.async_equeue || !hdl.async_task)));
new_sub = pool_alloc(pool_head_sub);
if (new_sub == NULL) {
goto memory_error;
}
/* assignments */
new_sub->sub.family = e_type.family;
new_sub->sub.subtype = e_type.subtype;
new_sub->flags = 0;
new_sub->hdl = hdl;
if (hdl.async) {
/* async END event pre-allocation */
new_sub->async_end = pool_alloc(pool_head_sub_event);
if (!new_sub->async_end) {
/* memory error */
goto memory_error;
}
if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
/* normal mode: no task provided, we must initialize it */
/* initialize task context */
task_ctx = pool_alloc(pool_head_sub_taskctx);
if (!task_ctx) {
/* memory error */
goto memory_error;
}
event_hdl_async_equeue_init(&task_ctx->e_queue);
task_ctx->func = new_sub->hdl.async_ptr;
new_sub->hdl.async_equeue = &task_ctx->e_queue;
new_sub->hdl.async_task = tasklet_new();
if (!new_sub->hdl.async_task) {
/* memory error */
goto memory_error;
}
new_sub->hdl.async_task->context = task_ctx;
new_sub->hdl.async_task->process = event_hdl_async_task_default;
}
/* initialize END event (used to notify about subscription ending)
* used by both normal and advanced mode:
* - to safely terminate the task in normal mode
* - to safely free subscription and
* keep track of active subscriptions in advanced mode
*/
new_sub->async_end->type = EVENT_HDL_SUB_END;
new_sub->async_end->sub_mgmt = EVENT_HDL_SUB_MGMT_ASYNC(new_sub);
new_sub->async_end->private = new_sub->hdl.private;
new_sub->async_end->_data = NULL;
MT_LIST_INIT(&new_sub->async_end->mt_list);
}
/* set refcount to 2:
* 1 for handler (because handler can manage the subscription itself)
* 1 for caller (will be dropped automatically if caller use the non-ptr version)
*/
new_sub->refcount = 2;
/* ready for registration */
MT_LIST_INIT(&new_sub->mt_list);
lock = MT_LIST_LOCK_ELT(&sub_list->known);
/* check if such identified hdl is not already registered */
if (hdl.id) {
struct event_hdl_sub *cur_sub;
uint8_t found = 0;
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (hdl.id == cur_sub->hdl.id) {
/* we found matching registered hdl */
found = 1;
break;
}
}
if (found) {
/* error already registered */
MT_LIST_UNLOCK_ELT(&sub_list->known, lock);
event_hdl_report_hdl_state(ha_alert, &hdl, "SUB", "could not subscribe: subscription with this id already exists");
goto cleanup;
}
}
if (lock.next == &sub_list->known) {
/* this is an expected corner case on de-init path, a subscribe attempt
* was made but the subscription list is already destroyed, we pretend
* it is a memory/IO error since it should not be long before haproxy
* enters the deinit() function anyway
*/
MT_LIST_UNLOCK_ELT(&sub_list->known, lock);
goto cleanup;
}
/* Append in list (global or user specified list).
* For now, append when sync mode, and insert when async mode
* so that async handlers are executed first
*/
if (hdl.async) {
/* Prevent the task from being aborted on soft-stop: let's wait
* until the END event is acknowledged by the task.
* (decrease is performed in event_hdl_async_free_event())
*
* If we don't do this, event_hdl API will leak and we won't give
* a chance to the event-handling task to perform cleanup
*/
HA_ATOMIC_INC(&jobs);
/* async mode, insert at the beginning of the list */
MT_LIST_INSERT(&sub_list->head, &new_sub->mt_list);
} else {
/* sync mode, append at the end of the list */
MT_LIST_APPEND(&sub_list->head, &new_sub->mt_list);
}
MT_LIST_UNLOCK_ELT(&sub_list->known, lock);
return new_sub;
cleanup:
if (new_sub) {
if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
tasklet_free(new_sub->hdl.async_task);
pool_free(pool_head_sub_taskctx, task_ctx);
}
if (hdl.async)
pool_free(pool_head_sub_event, new_sub->async_end);
pool_free(pool_head_sub, new_sub);
}
return NULL;
memory_error:
event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not register subscription due to memory error");
goto cleanup;
}
void event_hdl_take(struct event_hdl_sub *sub)
{
HA_ATOMIC_INC(&sub->refcount);
}
void event_hdl_drop(struct event_hdl_sub *sub)
{
if (HA_ATOMIC_SUB_FETCH(&sub->refcount, 1) != 0)
return;
/* we were the last one holding a reference to event sub - free required */
if (sub->hdl.private_free) {
/* free private data if specified upon registration */
sub->hdl.private_free(sub->hdl.private);
}
pool_free(pool_head_sub, sub);
}
int event_hdl_resubscribe(struct event_hdl_sub *cur_sub, struct event_hdl_sub_type type)
{
return _event_hdl_resub_async(cur_sub, type);
}
void _event_hdl_pause(struct event_hdl_sub *cur_sub)
{
cur_sub->flags |= EHDL_SUB_F_PAUSED;
}
void event_hdl_pause(struct event_hdl_sub *cur_sub)
{
struct mt_list lock;
lock = MT_LIST_LOCK_ELT(&cur_sub->mt_list);
if (lock.next != &cur_sub->mt_list)
_event_hdl_pause(cur_sub);
// else already removed
MT_LIST_UNLOCK_ELT(&cur_sub->mt_list, lock);
}
void _event_hdl_resume(struct event_hdl_sub *cur_sub)
{
cur_sub->flags &= ~EHDL_SUB_F_PAUSED;
}
void event_hdl_resume(struct event_hdl_sub *cur_sub)
{
struct mt_list lock;
lock = MT_LIST_LOCK_ELT(&cur_sub->mt_list);
if (lock.next != &cur_sub->mt_list)
_event_hdl_resume(cur_sub);
// else already removed
MT_LIST_UNLOCK_ELT(&cur_sub->mt_list, lock);
}
void event_hdl_unsubscribe(struct event_hdl_sub *del_sub)
{
_event_hdl_unsubscribe_async(del_sub);
/* drop refcount, assuming caller no longer use ptr */
event_hdl_drop(del_sub);
}
int event_hdl_subscribe(event_hdl_sub_list *sub_list, struct event_hdl_sub_type e_type, struct event_hdl hdl)
{
struct event_hdl_sub *sub;
sub = event_hdl_subscribe_ptr(sub_list, e_type, hdl);
if (sub) {
/* drop refcount because the user is not willing to hold a reference */
event_hdl_drop(sub);
return 1;
}
return 0;
}
/* Subscription external lookup functions
*/
int event_hdl_lookup_unsubscribe(event_hdl_sub_list *sub_list,
uint64_t lookup_id)
{
struct event_hdl_sub *del_sub = NULL;
struct mt_list *elt1, elt2;
int found = 0;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
mt_list_for_each_entry_safe(del_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == del_sub->hdl.id) {
/* we found matching registered hdl */
MT_LIST_DELETE_SAFE(elt1);
_event_hdl_unsubscribe(del_sub);
found = 1;
break; /* id is unique, stop searching */
}
}
return found;
}
int event_hdl_lookup_resubscribe(event_hdl_sub_list *sub_list,
uint64_t lookup_id, struct event_hdl_sub_type type)
{
struct event_hdl_sub *cur_sub = NULL;
struct mt_list *elt1, elt2;
int status = 0;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
status = _event_hdl_resub(cur_sub, type);
break; /* id is unique, stop searching */
}
}
return status;
}
int event_hdl_lookup_pause(event_hdl_sub_list *sub_list,
uint64_t lookup_id)
{
struct event_hdl_sub *cur_sub = NULL;
struct mt_list *elt1, elt2;
int found = 0;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
_event_hdl_pause(cur_sub);
found = 1;
break; /* id is unique, stop searching */
}
}
return found;
}
int event_hdl_lookup_resume(event_hdl_sub_list *sub_list,
uint64_t lookup_id)
{
struct event_hdl_sub *cur_sub = NULL;
struct mt_list *elt1, elt2;
int found = 0;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
_event_hdl_resume(cur_sub);
found = 1;
break; /* id is unique, stop searching */
}
}
return found;
}
struct event_hdl_sub *event_hdl_lookup_take(event_hdl_sub_list *sub_list,
uint64_t lookup_id)
{
struct event_hdl_sub *cur_sub = NULL;
struct mt_list *elt1, elt2;
uint8_t found = 0;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
event_hdl_take(cur_sub);
found = 1;
break; /* id is unique, stop searching */
}
}
if (found)
return cur_sub;
return NULL;
}
/* event publishing functions
*/
static int _event_hdl_publish(event_hdl_sub_list *sub_list, struct event_hdl_sub_type e_type,
const struct event_hdl_cb_data *data)
{
struct event_hdl_sub *cur_sub;
struct mt_list *elt1, elt2;
struct event_hdl_async_event_data *async_data = NULL; /* reuse async data for multiple async hdls */
int error = 0;
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
/* notify each function that has subscribed to sub_family.type, unless paused */
if ((cur_sub->sub.family == e_type.family) &&
((cur_sub->sub.subtype & e_type.subtype) == e_type.subtype) &&
!(cur_sub->flags & EHDL_SUB_F_PAUSED)) {
/* hdl should be notified */
if (!cur_sub->hdl.async) {
/* sync mode: simply call cb pointer
* it is up to the callee to schedule a task if needed or
* take specific precautions in order to return as fast as possible
* and not use locks that are already held by the caller
*/
struct event_hdl_cb cb;
struct event_hdl_sub_mgmt sub_mgmt;
sub_mgmt = EVENT_HDL_SUB_MGMT_SYNC(cur_sub);
cb.e_type = e_type;
if (data)
cb.e_data = data->_ptr;
else
cb.e_data = NULL;
cb.sub_mgmt = &sub_mgmt;
cb._sync = 1;
/* call user function */
cur_sub->hdl.sync_ptr(&cb, cur_sub->hdl.private);
if (!sub_mgmt.this) {
/* user has performed hdl unsub
* we must remove it from the list
*/
MT_LIST_DELETE_SAFE(elt1);
/* then free it */
_event_hdl_unsubscribe(cur_sub);
}
} else {
/* async mode: here we need to prepare event data
* and push it to the event_queue of the task(s)
* responsible for consuming the events of current
* subscription.
* Once the event is pushed, we wake up the associated task.
* This feature depends on <haproxy/task> that also
* depends on <haproxy/pool>:
* If STG_PREPARE+STG_POOL is not performed prior to publishing to
* async handler, program may crash.
* Hopefully, STG_PREPARE+STG_POOL should be done early in
* HAProxy startup sequence.
*/
struct event_hdl_async_event *new_event;
new_event = pool_alloc(pool_head_sub_event);
if (!new_event) {
error = 1;
break; /* stop on error */
}
new_event->type = e_type;
new_event->private = cur_sub->hdl.private;
new_event->when = date;
new_event->sub_mgmt = EVENT_HDL_SUB_MGMT_ASYNC(cur_sub);
if (data) {
/* if this fails, please adjust EVENT_HDL_ASYNC_EVENT_DATA in
* event_hdl-t.h file or consider providing dynamic struct members
* to reduce overall struct size
*/
BUG_ON(data->_size > sizeof(async_data->data));
if (!async_data) {
/* first async hdl reached - preparing async_data cache */
async_data = pool_alloc(pool_head_sub_event_data);
if (!async_data) {
error = 1;
pool_free(pool_head_sub_event, new_event);
break; /* stop on error */
}
/* async data assignment */
memcpy(async_data->data, data->_ptr, data->_size);
async_data->mfree = data->_mfree;
/* Initialize refcount, we start at 1 to prevent async
* data from being freed by an async handler while we
* still use it. We will drop the reference when the
* publish is over.
*
* (first use, atomic operation not required)
*/
async_data->refcount = 1;
}
new_event->_data = async_data;
new_event->data = async_data->data;
/* increment refcount because multiple hdls could
* use the same async_data
*/
HA_ATOMIC_INC(&async_data->refcount);
} else
new_event->data = NULL;
/* appending new event to event hdl queue */
MT_LIST_INIT(&new_event->mt_list);
HA_ATOMIC_INC(&cur_sub->hdl.async_equeue->size);
MT_LIST_APPEND(&cur_sub->hdl.async_equeue->head, &new_event->mt_list);
/* wake up the task */
event_hdl_task_wakeup(cur_sub->hdl.async_task);
} /* end async mode */
} /* end hdl should be notified */
} /* end mt_list */
if (async_data) {
/* we finished publishing, drop the reference on async data */
_event_hdl_async_data_drop(async_data);
} else {
/* no async subscribers, we are responsible for calling the data
* member freeing function if it was provided
*/
if (data && data->_mfree)
data->_mfree(data->_ptr);
}
if (error) {
event_hdl_report_hdl_state(ha_warning, &cur_sub->hdl, "PUBLISH", "memory error");
return 0;
}
return 1;
}
/* Publish function should not be used from high calling rate or time sensitive
* places for now, because list lookup based on e_type is not optimized at
* all!
* Returns 1 in case of SUCCESS:
* Subscribed handlers were notified successfully
* Returns 0 in case of FAILURE:
* FAILURE means memory error while handling the very first async handler from
* the subscription list.
* As async handlers are executed first within the list, when such failure occurs
* you can safely assume that no events were published for the current call
*/
int event_hdl_publish(event_hdl_sub_list *sub_list,
struct event_hdl_sub_type e_type, const struct event_hdl_cb_data *data)
{
if (!e_type.family) {
/* do nothing, these types are reserved for internal use only
* (ie: unregistering) */
return 0;
}
if (sub_list) {
/* if sublist is provided, first publish event to list subscribers */
return _event_hdl_publish(sub_list, e_type, data);
} else {
/* publish to global list */
return _event_hdl_publish(&global_event_hdl_sub_list, e_type, data);
}
}
void event_hdl_sub_list_init(event_hdl_sub_list *sub_list)
{
BUG_ON(!sub_list); /* unexpected, global sublist is managed internally */
MT_LIST_INIT(&sub_list->head);
MT_LIST_APPEND(&known_event_hdl_sub_list, &sub_list->known);
}
/* internal function, assumes that sub_list ptr is always valid */
static void _event_hdl_sub_list_destroy(event_hdl_sub_list *sub_list)
{
struct event_hdl_sub *cur_sub;
struct mt_list *elt1, elt2;
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
/* remove cur elem from list */
MT_LIST_DELETE_SAFE(elt1);
/* then free it */
_event_hdl_unsubscribe(cur_sub);
}
}
/* when a subscription list is no longer used, call this
* to do the cleanup and make sure all related subscriptions are
* safely ended according to their types
*/
void event_hdl_sub_list_destroy(event_hdl_sub_list *sub_list)
{
BUG_ON(!sub_list); /* unexpected, global sublist is managed internally */
if (!MT_LIST_DELETE(&sub_list->known))
return; /* already destroyed */
_event_hdl_sub_list_destroy(sub_list);
}
/* config parser for global "tune.events.max-events-at-once" */
static int event_hdl_parse_max_events_at_once(char **args, int section_type, struct proxy *curpx,
const struct proxy *defpx, const char *file, int line,
char **err)
{
int arg = -1;
if (too_many_args(1, args, err, NULL))
return -1;
if (*(args[1]) != 0)
arg = atoi(args[1]);
if (arg < 1 || arg > 10000) {
memprintf(err, "'%s' expects an integer argument between 1 and 10000.", args[0]);
return -1;
}
event_hdl_tune.max_events_at_once = arg;
return 0;
}
/* config keyword parsers */
static struct cfg_kw_list cfg_kws = {ILH, {
{ CFG_GLOBAL, "tune.events.max-events-at-once", event_hdl_parse_max_events_at_once },
{ 0, NULL, NULL }
}};
INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
INITCALL0(STG_INIT, event_hdl_init);
|