summaryrefslogtreecommitdiffstats
path: root/src/knot/server/dthreads.h
blob: 0c243a1c325ebb8d60230064ada8de2727a5bc58 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/*  Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

/*!
 * \brief Threading API.
 *
 * Dynamic threads provide:
 * - coherent and incoherent threading capabilities
 * - thread repurposing
 * - thread prioritization
 * - on-the-fly changing of threading unit size
 *
 * Coherent threading unit is when all threads execute
 * the same runnable function.
 *
 * Incoherent function is when at least one thread executes
 * a different runnable than the others.
 */

#pragma once

#include <pthread.h>

#define DEFAULT_THR_COUNT 2  /*!< Default thread count. */

/* Forward decls */
struct dthread;
struct dt_unit;

/*!
 * \brief Thread state enumeration.
 */
typedef enum {
	ThreadJoined    = 1 << 0, /*!< Thread is finished and joined. */
	ThreadJoinable  = 1 << 1, /*!< Thread is waiting to be reclaimed. */
	ThreadCancelled = 1 << 2, /*!< Thread is cancelled, finishing task. */
	ThreadDead      = 1 << 3, /*!< Thread is finished, exiting. */
	ThreadIdle      = 1 << 4, /*!< Thread is idle, waiting for purpose. */
	ThreadActive    = 1 << 5  /*!< Thread is active, working on a task. */
} dt_state_t;

/*!
 * \brief Thread runnable prototype.
 *
 * Runnable is basically a pointer to function which is called on active
 * thread runtime.
 *
 * \note When implementing a runnable, keep in mind to check thread state as
 *       it may change, and implement a cooperative cancellation point.
 *
 *       Implement this by checking dt_is_cancelled() and return
 *       as soon as possible.
 */
typedef int (*runnable_t)(struct dthread *);

/*!
 * \brief Single thread descriptor public API.
 */
typedef struct dthread {
	volatile unsigned  state; /*!< Bitfield of dt_flag flags. */
	runnable_t           run; /*!< Runnable function or 0. */
	runnable_t      destruct; /*!< Destructor function or 0. */
	void               *data; /*!< Currently active data */
	struct dt_unit     *unit; /*!< Reference to assigned unit. */
	void             *_adata; /*!< Thread-specific data. */
	pthread_t           _thr; /*!< Thread */
	pthread_attr_t     _attr; /*!< Thread attributes */
	pthread_mutex_t      _mx; /*!< Thread state change lock. */
} dthread_t;

/*!
 * \brief Thread unit descriptor API.
 *
 * Thread unit consists of 1..N threads.
 * Unit is coherent if all threads execute
 * the same runnable.
 */
typedef struct dt_unit {
	int                   size; /*!< Unit width (number of threads) */
	struct dthread   **threads; /*!< Array of threads */
	pthread_cond_t     _notify; /*!< Notify thread */
	pthread_mutex_t _notify_mx; /*!< Condition mutex */
	pthread_cond_t     _report; /*!< Report thread state */
	pthread_mutex_t _report_mx; /*!< Condition mutex */
	pthread_mutex_t        _mx; /*!< Unit lock */
} dt_unit_t;

/*!
 * \brief Create a set of coherent threads.
 *
 * Coherent means, that the threads will share a common runnable and the data.
 *
 * \param count Requested thread count.
 * \param runnable Runnable function for all threads.
 * \param destructor Destructor for all threads.
 * \param data Any data passed onto threads.
 *
 * \retval New instance if successful
 * \retval NULL on error
 */
dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data);

/*!
 * \brief Free unit.
 *
 * \warning Behavior is undefined if threads are still active, make sure
 *          to call dt_join() first.
 *
 * \param unit Unit to be deleted.
 */
void dt_delete(dt_unit_t **unit);

/*!
 * \brief Start all threads in selected unit.
 *
 * \param unit Unit to be started.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters (unit is null).
 */
int dt_start(dt_unit_t *unit);

/*!
 * \brief Send given signal to thread.
 *
 * \note This is useful to interrupt some blocking I/O as well, for example
 *       with SIGALRM, which is handled by default.
 * \note Signal handler may be overridden in runnable.
 *
 * \param thread Target thread instance.
 * \param signum Signal code.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 * \retval KNOT_ERROR unspecified error.
 */
int dt_signalize(dthread_t *thread, int signum);

/*!
 * \brief Wait for all thread in unit to finish.
 *
 * \param unit Unit to be joined.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 */
int dt_join(dt_unit_t *unit);

/*!
 * \brief Stop all threads in unit.
 *
 * Thread is interrupted at the nearest runnable cancellation point.
 *
 * \param unit Unit to be stopped.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 */
int dt_stop(dt_unit_t *unit);

/*!
 * \brief Set thread affinity to masked CPU's.
 *
 * \param thread Target thread instance.
 * \param cpu_id Array of CPU IDs to set affinity to.
 * \param cpu_count Number of CPUs in the array, set to 0 for no CPU.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 */
int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count);

/*!
 * \brief Wake up thread from idle state.
 *
 * Thread is awoken from idle state and reenters runnable.
 * This function only affects idle threads.
 *
 * \note Unit needs to be started with dt_start() first, as the function
 *       doesn't affect dead threads.
 *
 * \param thread Target thread instance.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 * \retval KNOT_ENOTSUP operation not supported.
 */
int dt_activate(dthread_t *thread);

/*!
 * \brief Put thread to idle state, cancels current runnable function.
 *
 * Thread is flagged with Cancel flag and returns from runnable at the nearest
 * cancellation point, which requires complying runnable function.
 *
 * \note Thread isn't disposed, but put to idle state until it's requested
 *       again or collected by dt_compact().
 *
 * \param thread Target thread instance.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 */
int dt_cancel(dthread_t *thread);

/*!
 * \brief Collect and dispose idle threads.
 *
 * \param unit Target unit instance.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 */
int dt_compact(dt_unit_t *unit);

/*!
 * \brief Return number of online processors.
 *
 * \retval Number of online CPU's if success.
 * \retval <0 on failure.
 */
int dt_online_cpus(void);

/*!
 * \brief Return optimal number of threads for instance.
 *
 * It is estimated as NUM_CPUs + CONSTANT.
 * Fallback is DEFAULT_THR_COUNT  (\see common.h).
 *
 * \return Number of threads.
 */
int dt_optimal_size(void);

/*!
 * \brief Return true if thread is cancelled.
 *
 * Synchronously check for ThreadCancelled flag.
 *
 * \param thread Target thread instance.
 *
 * \retval 1 if cancelled.
 * \retval 0 if not cancelled.
 */
int dt_is_cancelled(dthread_t *thread);

/*!
 * \brief Return thread index in threading unit.
 *
 * \note Returns 0 when thread doesn't have a unit.
 *
 * \param thread Target thread instance.
 *
 * \return Thread index.
 */
unsigned dt_get_id(dthread_t *thread);

/*!
 * \brief Lock unit to prevent parallel operations which could alter unit
 *        at the same time.
 *
 * \param unit Target unit instance.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 * \retval KNOT_EAGAIN lack of resources to lock unit, try again.
 * \retval KNOT_ERROR unspecified error.
 */
int dt_unit_lock(dt_unit_t *unit);

/*!
 * \brief Unlock unit.
 *
 * \see dt_unit_lock()
 *
 * \param unit Target unit instance.
 *
 * \retval KNOT_EOK on success.
 * \retval KNOT_EINVAL on invalid parameters.
 * \retval KNOT_EAGAIN lack of resources to unlock unit, try again.
 * \retval KNOT_ERROR unspecified error.
 */
int dt_unit_unlock(dt_unit_t *unit);