summaryrefslogtreecommitdiffstats
path: root/src/libutil/upstream.h
blob: 22a020c9c0df491d66486a800fb9aa76842fbf97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
/*
 * Copyright 2023 Vsevolod Stakhov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef UPSTREAM_H
#define UPSTREAM_H

#include "config.h"
#include "util.h"
#include "rdns.h"
#include "ucl.h"

#ifdef __cplusplus
extern "C" {
#endif

/* Forward declaration */
struct ev_loop;

enum rspamd_upstream_rotation {
	RSPAMD_UPSTREAM_RANDOM = 0,
	RSPAMD_UPSTREAM_HASHED,
	RSPAMD_UPSTREAM_ROUND_ROBIN,
	RSPAMD_UPSTREAM_MASTER_SLAVE,
	RSPAMD_UPSTREAM_SEQUENTIAL,
	RSPAMD_UPSTREAM_UNDEF
};

enum rspamd_upstream_flag {
	RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0),
	RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1),
};

struct rspamd_config;
/* Opaque upstream structures */
struct upstream;
struct upstream_list;
struct upstream_ctx;

/**
 * Init upstreams library
 * @param resolver
 */
struct upstream_ctx *rspamd_upstreams_library_init(void);

/**
 * Remove reference from upstreams library
 */
void rspamd_upstreams_library_unref(struct upstream_ctx *ctx);

/**
 * Configure attributes of upstreams library
 * @param cfg
 */
void rspamd_upstreams_library_config(struct rspamd_config *cfg,
									 struct upstream_ctx *ctx, struct ev_loop *event_loop,
									 struct rdns_resolver *resolver);

/**
 * Upstream error logic
 * 1. During error time we count upstream_ok and upstream_fail
 * 2. If failcount is more then maxerrors then we mark upstream as unavailable for dead time
 * 3. After dead time we mark upstream as alive and go to the step 1
 * 4. If all upstreams are dead, marks every upstream as alive
 */

/**
 * Add an error to an upstream
 */
void rspamd_upstream_fail(struct upstream *upstream, gboolean addr_failure, const gchar *reason);

/**
 * Increase upstream successes count
 */
void rspamd_upstream_ok(struct upstream *up);

/**
 * Set weight for an upstream
 * @param up
 */
void rspamd_upstream_set_weight(struct upstream *up, guint weight);

/**
 * Create new list of upstreams
 * @return
 */
struct upstream_list *rspamd_upstreams_create(struct upstream_ctx *ctx);

/**
 * Sets specific flag to the upstream list
 * @param ups
 * @param flags
 */
void rspamd_upstreams_set_flags(struct upstream_list *ups,
								enum rspamd_upstream_flag flags);

/**
 * Sets custom limits for upstreams
 * This function allocates memory from the upstreams ctx pool and should
 * not be called in cycles/constantly as this memory is likely persistent
 * @param ups
 * @param revive_time
 * @param revive_jitter
 * @param error_time
 * @param dns_timeout
 * @param max_errors
 * @param dns_retransmits
 */
void rspamd_upstreams_set_limits(struct upstream_list *ups,
								 gdouble revive_time,
								 gdouble revive_jitter,
								 gdouble error_time,
								 gdouble dns_timeout,
								 guint max_errors,
								 guint dns_retransmits);

/**
 * Sets rotation policy for upstreams list
 * @param ups
 * @param rot
 */
void rspamd_upstreams_set_rotation(struct upstream_list *ups,
								   enum rspamd_upstream_rotation rot);

/**
 * Destroy list of upstreams
 * @param ups
 */
void rspamd_upstreams_destroy(struct upstream_list *ups);

/**
 * Returns count of upstreams in a list
 * @param ups
 * @return
 */
gsize rspamd_upstreams_count(struct upstream_list *ups);

/**
 * Returns the number of upstreams in the list
 * @param ups
 * @return
 */
gsize rspamd_upstreams_alive(struct upstream_list *ups);

enum rspamd_upstream_parse_type {
	RSPAMD_UPSTREAM_PARSE_DEFAULT = 0,
	RSPAMD_UPSTREAM_PARSE_NAMESERVER,
};

/**
 * Add upstream from the string
 * @param ups upstream list
 * @param str string in format "name[:port[:priority]]"
 * @param def_port default port number
 * @param data optional userdata
 * @return TRUE if upstream has been added
 */
gboolean rspamd_upstreams_add_upstream(struct upstream_list *ups, const gchar *str,
									   guint16 def_port, enum rspamd_upstream_parse_type parse_type,
									   void *data);

/**
 * Add multiple upstreams from comma, semicolon or space separated line
 * @param ups upstream list
 * @param str string in format "(<ups>([<sep>+]<ups>)*)+"
 * @param def_port default port number
 * @param data optional userdata
 * @return TRUE if **any** of upstreams has been added
 */
gboolean rspamd_upstreams_parse_line(struct upstream_list *ups,
									 const gchar *str, guint16 def_port, void *data);


gboolean rspamd_upstreams_parse_line_len(struct upstream_list *ups,
										 const gchar *str, gsize len,
										 guint16 def_port,
										 void *data);

/**
 * Parse upstreams list from the UCL object
 * @param ups
 * @param in
 * @param def_port
 * @param data
 * @return
 */
gboolean rspamd_upstreams_from_ucl(struct upstream_list *ups,
								   const ucl_object_t *in, guint16 def_port, void *data);


typedef void (*rspamd_upstream_traverse_func)(struct upstream *up, guint idx,
											  void *ud);

/**
 * Traverse upstreams list calling the function specified
 * @param ups
 * @param cb
 * @param ud
 */
void rspamd_upstreams_foreach(struct upstream_list *ups,
							  rspamd_upstream_traverse_func cb, void *ud);

enum rspamd_upstreams_watch_event {
	RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0,
	RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1,
	RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2,
	RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3,
	RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3),
};

typedef void (*rspamd_upstream_watch_func)(struct upstream *up,
										   enum rspamd_upstreams_watch_event event,
										   guint cur_errors,
										   void *ud);

/**
 * Adds new watcher to the upstreams list
 * @param ups
 * @param events
 * @param func
 * @param ud
 */
void rspamd_upstreams_add_watch_callback(struct upstream_list *ups,
										 enum rspamd_upstreams_watch_event events,
										 rspamd_upstream_watch_func func,
										 GFreeFunc free_func,
										 gpointer ud);

/**
 * Returns the next IP address of the upstream (internal rotation)
 * @param up
 * @return
 */
rspamd_inet_addr_t *rspamd_upstream_addr_next(struct upstream *up);

/**
 * Returns the current IP address of the upstream
 * @param up
 * @return
 */
rspamd_inet_addr_t *rspamd_upstream_addr_cur(const struct upstream *up);

/**
 * Add custom address for an upstream (ownership of addr is transferred to upstream)
 * @param up
 * @return
 */
gboolean rspamd_upstream_add_addr(struct upstream *up,
								  rspamd_inet_addr_t *addr);

/**
 * Returns the symbolic name of the upstream
 * @param up
 * @return
 */
const gchar *rspamd_upstream_name(struct upstream *up);

/**
 * Returns the port of the current address for the upstream
 * @param up
 * @return
 */
gint rspamd_upstream_port(struct upstream *up);

/**
 * Sets opaque user data associated with this upstream
 * @param up
 * @param data
 * @return old data
 */
gpointer rspamd_upstream_set_data(struct upstream *up, gpointer data);

/**
 * Gets opaque user data associated with this upstream
 * @param up
 * @return
 */
gpointer rspamd_upstream_get_data(struct upstream *up);

/**
 * Get new upstream from the list
 * @param ups upstream list
 * @param type type of rotation algorithm, for `RSPAMD_UPSTREAM_HASHED` it is required to specify `key` and `keylen` as arguments
 * @return
 */
struct upstream *rspamd_upstream_get(struct upstream_list *ups,
									 enum rspamd_upstream_rotation default_type,
									 const guchar *key, gsize keylen);

/**
 * Get new upstream from the list
 * @param ups upstream list
 * @param type type of rotation algorithm, for `RSPAMD_UPSTREAM_HASHED` it is required to specify `key` and `keylen` as arguments
 * @return
 */
struct upstream *rspamd_upstream_get_forced(struct upstream_list *ups,
											enum rspamd_upstream_rotation forced_type,
											const guchar *key, gsize keylen);

/**
 * Get new upstream from the list excepting the upstream specified
 * @param ups upstream list
 * @param type type of rotation algorithm, for `RSPAMD_UPSTREAM_HASHED` it is required to specify `key` and `keylen` as arguments
 * @return
 */
struct upstream *rspamd_upstream_get_except(struct upstream_list *ups,
											struct upstream *except,
											enum rspamd_upstream_rotation default_type,
											const guchar *key, gsize keylen);

/**
 * Re-resolve addresses for all upstreams registered
 */
void rspamd_upstream_reresolve(struct upstream_ctx *ctx);

/**
 * Share ownership on upstream
 * @param up
 * @return
 */
struct upstream *rspamd_upstream_ref(struct upstream *up);
/**
 * Unshare ownership on upstream
 * @param up
 */
void rspamd_upstream_unref(struct upstream *up);

#ifdef __cplusplus
}
#endif

#endif /* UPSTREAM_H */