summaryrefslogtreecommitdiffstats
path: root/doc/tevent_queue.dox
blob: c1d629c7488b8e2ba5bddcb2af980f9fc382b3d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
/**
@page tevent_queue Chapter 5: Tevent queue
@section queue Tevent queue

There is a possibility that the dispatcher and its handlers may not be able to
handle all the incoming events as quickly as they arrive. One way to deal with
this situation is to buffer the received events by introducing an event queue
into the events stream, between the events generator and the dispatcher. Events
are added to the queue as they arrive, and the dispatcher pops them off the
beginning of the queue as fast as possible. In tevent library it is
similar, but the queue is not automatically set for any event. The queue has to
be created on purpose, and events which should follow the order of the FIFO
queue have to be explicitly pinpointed. Creating such a queue is crucial in
situations when sequential processing is absolutely essential for the
successful
completion of a task, e.g. for a large quantity of data that are about to be
written from a buffer into a socket. The tevent library has its own queue
structure that is ready to use after it has been initialized and started up
once.

@subsection cr_queue Creation of Queues

The first and most important step is the creation of the tevent queue
(represented by struct tevent_queue), which will then be in running mode.

@code
struct tevent_queue* tevent_queue_create (TALLOC_CTX *mem_ctx, const char *name)
@endcode

When the program returns from this function, the allocated memory, set
destructor and labeled queue as running has been done and the structure is
ready to be filled with entries. Stopping and starting queues on the run. If
you need to stop a queue from processing its entries, and then turn it on
again, a couple of functions which serve this purpose are:

- bool tevent_queue_stop()
- bool tevent_queue_start()

These functions actually only provide for the simple setting of a variable,
which indicates that the queue has been stopped/started. Returned value
indicates result.

@subsection add_queue  Adding Requests to a Queue

Tevent in fact offers 3 possible ways of inserting a request into a queue.
There are no vast differences between them, but still there might be situations
where one of them is more suitable and desired than another. 

@code
bool tevent_queue_add(struct tevent_queue *queue,
                      struct tevent_context *ev,
                      struct tevent_req *req,
                      tevent_queue_trigger_fn_t trigger,
                      void *private_data)
@endcode

This call is the simplest of all three. It offers only boolean verification of
whether the operation of adding the request into a queue was successful or not.
No additional deletion of an item from the queue is possible, i.e. it is only
possible to deallocate the whole tevent request, which would cause triggering
of destructor handling and also dropping the request from the queue.

<strong>Extended Options</strong>

Both of the following functions have a feature in common - they return tevent
queue entry structure representing the item in a queue. There is no further
possible handling with this structure except the use of the structure’s pointer
for its deallocation (which leads also its removal from the queue). The
difference lies in the possibility that with the following functions it is
possible to remove the tevent request from a queue without its deallocation.
The previous function can only deallocate the tevent request as it was from
memory, and thereby logically cause its removal from the queue as well. There
is no other utilization of this structure via API at this stage of tevent
library. The possibility of easier debugging while developing with tevent could
be considered to be an advantage of this returned pointer.

@code
struct tevent_queue_entry *tevent_queue_add_entry(struct tevent_queue *queue,
                                                  struct tevent_context *ev,
                                                  struct tevent_req *req,
                                                  tevent_queue_trigger_fn_t trigger,
                                                  void *private_data)
@endcode

The feature that allows for the optimized addition of entries to a queue is
that a check for an empty queue with no items is first of all carried out. If
it is found that the queue is empty, then the request for inserting the entry
into a queue will be omitted and directly triggered.

@code
struct tevent_queue_entry *tevent_queue_add_optimize_empty(struct tevent_queue *queue,
                                                            struct tevent_context *ev,
                                                            struct tevent_req *req,
                                                            tevent_queue_trigger_fn_t trigger,
                                                            void *private_data)
@endcode

When calling any of the functions serving for inserting an item into a queue,
it is possible to leave out the fourth argument (trigger) and instead of a
function pass a NULL pointer. This usage sets so-called blocking entries.
These entries, since they do not have any trigger operation to be activated,
just sit in their position until they are labeled as a done by another
function. Their purpose is to block other items in the queue from being
triggered.

@subsection example_q Example of tevent queue

@code
#include <stdio.h>
#include <unistd.h>
#include <tevent.h>

struct foo_state {
    int local_var;
    int x;
};

struct juststruct {
    TALLOC_CTX * ctx;
    struct tevent_context *ev;
    int y;
};

int created = 0;

static void timer_handler(struct tevent_context *ev, struct tevent_timer *te,
                           struct timeval current_time, void *private_data)
{
    // time event which after all sets request as done. Following item from
    // the queue  may be invoked.
    struct tevent_req *req = private_data;
    struct foo_state *stateX = tevent_req_data(req, struct foo_state);

    // processing some stuff

    printf("time_handler\n");

    tevent_req_done(req);
    talloc_free(req);

    printf("Request #%d set as done.\n", stateX->x);
}

static void trigger(struct tevent_req *req, void *private_data)
{
    struct juststruct *priv = tevent_req_callback_data (req, struct juststruct);
    struct foo_state *in = tevent_req_data(req, struct foo_state);
    struct timeval schedule;
    struct tevent_timer *tim;
    schedule = tevent_timeval_current_ofs(1, 0);
    printf("Processing request #%d\n", in->x);

    if (in->x % 3 == 0) {   // just example; third request does not contain
                            // any further operation and will be finished right
                            // away.
        tim = NULL;
    } else {
        tim = tevent_add_timer(priv->ev, req, schedule, timer_handler, req);
    }

    if (tim == NULL) {
            tevent_req_done(req);
            talloc_free(req);
            printf("Request #%d set as done.\n", in->x);
    }
}

struct tevent_req *foo_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                            const char *name, int num)
{
    struct tevent_req *req;
    struct foo_state *state;
    struct foo_state *in;
    struct tevent_timer *tim;

    printf("foo_send\n");
    req = tevent_req_create(mem_ctx, &state, struct foo_state);
    if (req == NULL) { // check for appropriate allocation
        tevent_req_error(req, 1);
        return NULL;
    }

    // exemplary filling of variables
    state->local_var = 1;
    state->x = num;

    return req;
}

static void foo_done(struct tevent_req *req) {

    enum tevent_req_state state;
    uint64_t err;

    if (tevent_req_is_error(req, &state, &err)) {
        printf("ERROR WAS SET %d\n", state);
        return;
    } else {
        // processing some stuff
        printf("Callback is done...\n");
    }
}

int main (int argc, char **argv)
{
    TALLOC_CTX *mem_ctx;
    struct tevent_req* req[6];
    struct tevent_req* tmp;
    struct tevent_context *ev;
    struct tevent_queue *fronta = NULL;
    struct juststruct *data;
    int ret;
    int i = 0;

    const char * const names[] = {
        "first", "second", "third", "fourth", "fifth"
    };

    printf("INIT\n");

    mem_ctx = talloc_new(NULL); //parent
    talloc_parent(mem_ctx);
    ev = tevent_context_init(mem_ctx);
    if (ev == NULL) {
        fprintf(stderr, "MEMORY ERROR\n");
        return EXIT_FAILURE;
    }

    // setting up queue
    fronta = tevent_queue_create(mem_ctx, "test_queue");
    tevent_queue_stop(fronta);
    tevent_queue_start(fronta);
    if (tevent_queue_running(fronta)) {
        printf ("Queue is running (length: %d)\n", tevent_queue_length(fronta));
    } else {
        printf ("Queue is not running\n");
    }

    data = talloc(ev, struct juststruct);
    data->ctx = mem_ctx;
    data->ev = ev;


    // create 4 requests
    for (i = 1; i < 5; i++) {
        req[i] = foo_send(mem_ctx, ev, names[i], i);
        tmp = req[i];
        if (req[i] == NULL) {
            fprintf(stderr, "Request error! %d \n", ret);
            break;
        }
        tevent_req_set_callback(req[i], foo_done, data);
        created++;
    }

    // add item to a queue
    tevent_queue_add(fronta, ev, req[1], trigger, data);
    tevent_queue_add(fronta, ev, req[2], trigger, data);
    tevent_queue_add(fronta, ev, req[3], trigger, data);
    tevent_queue_add(fronta, ev, req[4], trigger, data);

    printf("Queue length: %d\n", tevent_queue_length(fronta));
    while(tevent_queue_length(fronta) > 0) {
        tevent_loop_once(ev);
        printf("Queue: %d items left\n", tevent_queue_length(fronta));
    }

    talloc_free(mem_ctx);
    printf("FINISH\n");

    return EXIT_SUCCESS;
}
@endcode

*/