summaryrefslogtreecommitdiffstats
path: root/src/oqmgr/qmgr.h
blob: 15c941b5a145ac9f719158740c2dbe8d30f1928a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/*++
/* NAME
/*	qmgr 3h
/* SUMMARY
/*	queue manager data structures
/* SYNOPSIS
/*	#include "qmgr.h"
/* DESCRIPTION
/* .nf

 /*
  * System library.
  */
#include <sys/time.h>
#include <time.h>

 /*
  * Utility library.
  */
#include <vstream.h>
#include <scan_dir.h>

 /*
  * Global library.
  */
#include <recipient_list.h>
#include <dsn.h>

 /*
  * The queue manager is built around lots of mutually-referring structures.
  * These typedefs save some typing.
  */
typedef struct QMGR_TRANSPORT QMGR_TRANSPORT;
typedef struct QMGR_QUEUE QMGR_QUEUE;
typedef struct QMGR_ENTRY QMGR_ENTRY;
typedef struct QMGR_MESSAGE QMGR_MESSAGE;
typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST;
typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST;
typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST;
typedef struct QMGR_SCAN QMGR_SCAN;
typedef struct QMGR_FEEDBACK QMGR_FEEDBACK;

 /*
  * Hairy macros to update doubly-linked lists.
  */
#define QMGR_LIST_ROTATE(head, object) { \
    head.next->peers.prev = head.prev; \
    head.prev->peers.next = head.next; \
    head.next = object->peers.next; \
    if (object->peers.next) \
	head.next->peers.prev = 0; \
    head.prev = object; \
    object->peers.next = 0; \
}

#define QMGR_LIST_UNLINK(head, type, object) { \
    type   next = object->peers.next; \
    type   prev = object->peers.prev; \
    if (prev) prev->peers.next = next; \
    else head.next = next; \
    if (next) next->peers.prev = prev; \
    else head.prev = prev; \
    object->peers.next = object->peers.prev = 0; \
}

#define QMGR_LIST_APPEND(head, object) { \
    object->peers.next = head.next; \
    object->peers.prev = 0; \
    if (head.next) { \
	head.next->peers.prev = object; \
    } else { \
	head.prev = object; \
    } \
    head.next = object; \
}

#define QMGR_LIST_PREPEND(head, object) { \
    object->peers.prev = head.prev; \
    object->peers.next = 0; \
    if (head.prev) { \
	head.prev->peers.next = object; \
    } else { \
	head.next = object; \
    } \
    head.prev = object; \
}

#define QMGR_LIST_INIT(head) { \
    head.prev = 0; \
    head.next = 0; \
}

 /*
  * Transports are looked up by name (when we have resolved a message), or
  * round-robin wise (when we want to distribute resources fairly).
  */
struct QMGR_TRANSPORT_LIST {
    QMGR_TRANSPORT *next;
    QMGR_TRANSPORT *prev;
};

extern struct HTABLE *qmgr_transport_byname;	/* transport by name */
extern QMGR_TRANSPORT_LIST qmgr_transport_list;	/* transports, round robin */

 /*
  * Delivery agents provide feedback, as hints that Postfix should expend
  * more or fewer resources on a specific destination domain. The main.cf
  * file specifies how feedback affects delivery concurrency: add/subtract a
  * constant, a ratio of constants, or a constant divided by the delivery
  * concurrency; and it specifies how much feedback must accumulate between
  * concurrency updates.
  */
struct QMGR_FEEDBACK {
    int     hysteresis;			/* to pass, need to be this tall */
    double  base;			/* pre-computed from main.cf */
    int     index;			/* none, window, sqrt(window) */
};

#define QMGR_FEEDBACK_IDX_NONE		0	/* no window dependence */
#define QMGR_FEEDBACK_IDX_WIN		1	/* 1/window dependence */
#if 0
#define QMGR_FEEDBACK_IDX_SQRT_WIN	2	/* 1/sqrt(window) dependence */
#endif

#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN
#include <math.h>
#endif

extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *);

#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN
#define QMGR_FEEDBACK_VAL(fb, win) \
    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win))
#else
#define QMGR_FEEDBACK_VAL(fb, win) \
    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \
    (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \
    (fb).base / sqrt(win))
#endif

 /*
  * Each transport (local, smtp-out, bounce) can have one queue per next hop
  * name. Queues are looked up by next hop name (when we have resolved a
  * message destination), or round-robin wise (when we want to deliver
  * messages fairly).
  */
struct QMGR_QUEUE_LIST {
    QMGR_QUEUE *next;
    QMGR_QUEUE *prev;
};

struct QMGR_TRANSPORT {
    int     flags;			/* blocked, etc. */
    int     pending;			/* incomplete DA connections */
    char   *name;			/* transport name */
    int     dest_concurrency_limit;	/* concurrency per domain */
    int     init_dest_concurrency;	/* init. per-domain concurrency */
    int     recipient_limit;		/* recipients per transaction */
    struct HTABLE *queue_byname;	/* queues indexed by domain */
    QMGR_QUEUE_LIST queue_list;		/* queues, round robin order */
    QMGR_TRANSPORT_LIST peers;		/* linkage */
    DSN    *dsn;			/* why unavailable */
    QMGR_FEEDBACK pos_feedback;		/* positive feedback control */
    QMGR_FEEDBACK neg_feedback;		/* negative feedback control */
    int     fail_cohort_limit;		/* flow shutdown control */
    int     xport_rate_delay;		/* suspend per delivery */
    int     rate_delay;			/* suspend per delivery */
};

#define QMGR_TRANSPORT_STAT_DEAD	(1<<1)
#define QMGR_TRANSPORT_STAT_RATE_LOCK	(1<<2)

typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
extern QMGR_TRANSPORT *qmgr_transport_select(void);
extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);

#define QMGR_TRANSPORT_THROTTLED(t)	((t)->flags & QMGR_TRANSPORT_STAT_DEAD)

 /*
  * Each next hop (e.g., a domain name) has its own queue of pending message
  * transactions. The "todo" queue contains messages that are to be delivered
  * to this next hop. When a message is elected for transmission, it is moved
  * from the "todo" queue to the "busy" queue. Messages are taken from the
  * "todo" queue in sequence. An initial destination delivery concurrency > 1
  * ensures that one problematic message will not block all other traffic to
  * that next hop.
  */
struct QMGR_ENTRY_LIST {
    QMGR_ENTRY *next;
    QMGR_ENTRY *prev;
};

struct QMGR_QUEUE {
    int     dflags;			/* delivery request options */
    time_t  last_done;			/* last delivery completion */
    char   *name;			/* domain name or address */
    char   *nexthop;			/* domain name */
    int     todo_refcount;		/* queue entries (todo list) */
    int     busy_refcount;		/* queue entries (busy list) */
    int     window;			/* slow open algorithm */
    double  success;			/* accumulated positive feedback */
    double  failure;			/* accumulated negative feedback */
    double  fail_cohorts;		/* pseudo-cohort failure count */
    QMGR_TRANSPORT *transport;		/* transport linkage */
    QMGR_ENTRY_LIST todo;		/* todo queue entries */
    QMGR_ENTRY_LIST busy;		/* messages on the wire */
    QMGR_QUEUE_LIST peers;		/* neighbor queues */
    DSN    *dsn;			/* why unavailable */
    time_t  clog_time_to_warn;		/* time of next warning */
};

#define	QMGR_QUEUE_TODO	1		/* waiting for service */
#define QMGR_QUEUE_BUSY	2		/* recipients on the wire */

extern int qmgr_queue_count;

extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *);
extern QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *);
extern void qmgr_queue_done(QMGR_QUEUE *);
extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *);
extern void qmgr_queue_unthrottle(QMGR_QUEUE *);
extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *);
extern void qmgr_queue_suspend(QMGR_QUEUE *, int);

 /*
  * Exclusive queue states. Originally there were only two: "throttled" and
  * "not throttled". It was natural to encode these in the queue window size.
  * After 10 years it's not practical to rip out all the working code and
  * change representations, so we just clean up the names a little.
  * 
  * Note: only the "ready" state can reach every state (including itself);
  * non-ready states can reach only the "ready" state. Other transitions are
  * forbidden, because they would result in dangling event handlers.
  */
#define QMGR_QUEUE_STAT_THROTTLED	0	/* back-off timer */
#define QMGR_QUEUE_STAT_SUSPENDED	-1	/* voluntary delay timer */
#define QMGR_QUEUE_STAT_SAVED		-2	/* delayed cleanup timer */
#define QMGR_QUEUE_STAT_BAD		-3	/* can't happen */

#define QMGR_QUEUE_READY(q)	((q)->window > 0)
#define QMGR_QUEUE_THROTTLED(q)	((q)->window == QMGR_QUEUE_STAT_THROTTLED)
#define QMGR_QUEUE_SUSPENDED(q)	((q)->window == QMGR_QUEUE_STAT_SUSPENDED)
#define QMGR_QUEUE_SAVED(q)	((q)->window == QMGR_QUEUE_STAT_SAVED)
#define QMGR_QUEUE_BAD(q)	((q)->window <= QMGR_QUEUE_STAT_BAD)

#define QMGR_QUEUE_STATUS(q) ( \
	    QMGR_QUEUE_READY(q) ? "ready" : \
	    QMGR_QUEUE_THROTTLED(q) ? "throttled" : \
	    QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \
	    QMGR_QUEUE_SAVED(q) ? "saved" : \
	    "invalid queue status" \
	)

 /*
  * Structure of one next-hop queue entry. In order to save some copying
  * effort we allow multiple recipients per transaction.
  */
struct QMGR_ENTRY {
    VSTREAM *stream;			/* delivery process */
    QMGR_MESSAGE *message;		/* message info */
    RECIPIENT_LIST rcpt_list;		/* as many as it takes */
    QMGR_QUEUE *queue;			/* parent linkage */
    QMGR_ENTRY_LIST peers;		/* neighbor entries */
};

extern QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *);
extern void qmgr_entry_unselect(QMGR_QUEUE *, QMGR_ENTRY *);
extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *);
extern void qmgr_entry_done(QMGR_ENTRY *, int);
extern QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *, QMGR_MESSAGE *);

 /*
  * All common in-core information about a message is kept here. When all
  * recipients have been tried the message file is linked to the "deferred"
  * queue (some hosts not reachable), to the "bounce" queue (some recipients
  * were rejected), and is then removed from the "active" queue.
  */
struct QMGR_MESSAGE {
    int     flags;			/* delivery problems */
    int     qflags;			/* queuing flags */
    int     tflags;			/* tracing flags */
    long    tflags_offset;		/* offset for killing */
    int     rflags;			/* queue file read flags */
    VSTREAM *fp;			/* open queue file or null */
    int     refcount;			/* queue entries */
    int     single_rcpt;		/* send one rcpt at a time */
    struct timeval arrival_time;	/* start of receive transaction */
    time_t  create_time;		/* queue file create time */
    struct timeval active_time;		/* time of entry into active queue */
    long    warn_offset;		/* warning bounce flag offset */
    time_t  warn_time;			/* time next warning to be sent */
    long    data_offset;		/* data seek offset */
    char   *queue_name;			/* queue name */
    char   *queue_id;			/* queue file */
    char   *encoding;			/* content encoding */
    char   *sender;			/* complete address */
    char   *dsn_envid;			/* DSN envelope ID */
    int     dsn_ret;			/* DSN headers/full */
    int     smtputf8;			/* requires unicode */
    char   *verp_delims;		/* VERP delimiters */
    char   *filter_xport;		/* filtering transport */
    char   *inspect_xport;		/* inspecting transport */
    char   *redirect_addr;		/* info@spammer.tld */
    long    data_size;			/* data segment size */
    long    cont_length;		/* message content length */
    long    rcpt_offset;		/* more recipients here */
    char   *client_name;		/* client hostname */
    char   *client_addr;		/* client address */
    char   *client_port;		/* client port */
    char   *client_proto;		/* client protocol */
    char   *client_helo;		/* helo parameter */
    char   *sasl_method;		/* SASL method */
    char   *sasl_username;		/* SASL user name */
    char   *sasl_sender;		/* SASL sender */
    char   *log_ident;			/* up-stream queue ID */
    char   *rewrite_context;		/* address qualification */
    RECIPIENT_LIST rcpt_list;		/* complete addresses */
};

 /*
  * Flags 0-15 are reserved for qmgr_user.h.
  */
#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT	(1<<16)

#define QMGR_MESSAGE_LOCKED	((QMGR_MESSAGE *) 1)

extern int qmgr_message_count;
extern int qmgr_recipient_count;
extern int qmgr_vrfy_pend_count;

extern void qmgr_message_free(QMGR_MESSAGE *);
extern void qmgr_message_update_warn(QMGR_MESSAGE *);
extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);

#define QMGR_MSG_STATS(stats, message) \
    MSG_STATS_INIT2(stats, \
		    incoming_arrival, message->arrival_time, \
		    active_arrival, message->active_time)

 /*
  * qmgr_defer.c
  */
extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *);
extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *);
extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);

 /*
  * qmgr_bounce.c
  */
extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);

 /*
  * qmgr_deliver.c
  */
extern int qmgr_deliver_concurrency;
extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *);

 /*
  * qmgr_active.c
  */
extern int qmgr_active_feed(QMGR_SCAN *, const char *);
extern void qmgr_active_drain(void);
extern void qmgr_active_done(QMGR_MESSAGE *);

 /*
  * qmgr_move.c
  */
extern void qmgr_move(const char *, const char *, time_t);

 /*
  * qmgr_enable.c
  */
extern void qmgr_enable_all(void);
extern void qmgr_enable_transport(QMGR_TRANSPORT *);
extern void qmgr_enable_queue(QMGR_QUEUE *);

 /*
  * Queue scan context.
  */
struct QMGR_SCAN {
    char   *queue;			/* queue name */
    int     flags;			/* private, this run */
    int     nflags;			/* private, next run */
    struct SCAN_DIR *handle;		/* scan */
};

 /*
  * Flags that control queue scans or destination selection. These are
  * similar to the QMGR_REQ_XXX request codes.
  */
#define QMGR_SCAN_START	(1<<0)		/* start now/restart when done */
#define QMGR_SCAN_ALL	(1<<1)		/* all queue file time stamps */
#define QMGR_FLUSH_ONCE	(1<<2)		/* unthrottle once */
#define QMGR_FLUSH_DFXP	(1<<3)		/* override defer_transports */
#define QMGR_FLUSH_EACH	(1<<4)		/* unthrottle per message */

 /*
  * qmgr_scan.c
  */
extern QMGR_SCAN *qmgr_scan_create(const char *);
extern void qmgr_scan_request(QMGR_SCAN *, int);
extern char *qmgr_scan_next(QMGR_SCAN *);

 /*
  * qmgr_error.c
  */
extern QMGR_TRANSPORT *qmgr_error_transport(const char *);
extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *);
extern char *qmgr_error_nexthop(DSN *);

/* LICENSE
/* .ad
/* .fi
/*	The Secure Mailer license must be distributed with this software.
/* AUTHOR(S)
/*	Wietse Venema
/*	IBM T.J. Watson Research
/*	P.O. Box 704
/*	Yorktown Heights, NY 10598, USA
/*
/*	Wietse Venema
/*	Google, Inc.
/*	111 8th Avenue
/*	New York, NY 10011, USA
/*--*/