1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
#include "stats-common.h"
#include "array.h"
#include "llist.h"
#include "hash.h"
#include "str.h"
#include "strescape.h"
#include "lib-event-private.h"
#include "event-filter.h"
#include "ostream.h"
#include "connection.h"
#include "master-service.h"
#include "stats-event-category.h"
#include "stats-metrics.h"
#include "stats-settings.h"
#include "client-writer.h"
#define STATS_UPDATE_CLIENTS_DELAY_MSECS 1000
struct stats_event {
struct stats_event *prev, *next;
uint64_t id;
struct event *event;
};
struct writer_client {
struct connection conn;
struct stats_event *events;
HASH_TABLE(struct stats_event *, struct stats_event *) events_hash;
};
static struct timeout *to_update_clients;
static struct connection_list *writer_clients = NULL;
static void client_writer_send_handshake(struct writer_client *client)
{
string_t *filter = t_str_new(128);
string_t *str = t_str_new(128);
event_filter_export(stats_metrics_get_event_filter(stats_metrics), filter);
str_append(str, "FILTER\t");
str_append_tabescaped(str, str_c(filter));
str_append_c(str, '\n');
o_stream_nsend(client->conn.output, str_data(str), str_len(str));
}
static unsigned int stats_event_hash(const struct stats_event *event)
{
return (unsigned int)event->id;
}
static int stats_event_cmp(const struct stats_event *event1,
const struct stats_event *event2)
{
return event1->id == event2->id ? 0 : 1;
}
void client_writer_create(int fd)
{
struct writer_client *client;
client = i_new(struct writer_client, 1);
hash_table_create(&client->events_hash, default_pool, 0,
stats_event_hash, stats_event_cmp);
connection_init_server(writer_clients, &client->conn,
"stats", fd, fd);
client_writer_send_handshake(client);
}
static void writer_client_destroy(struct connection *conn)
{
struct writer_client *client = (struct writer_client *)conn;
struct stats_event *event, *next;
for (event = client->events; event != NULL; event = next) {
next = event->next;
event_unref(&event->event);
i_free(event);
}
hash_table_destroy(&client->events_hash);
connection_deinit(conn);
i_free(conn);
master_service_client_connection_destroyed(master_service);
}
static struct stats_event *
writer_client_find_event(struct writer_client *client, uint64_t event_id)
{
struct stats_event lookup_event = { .id = event_id };
return hash_table_lookup(client->events_hash, &lookup_event);
}
static bool
writer_client_run_event(struct writer_client *client,
uint64_t parent_event_id, const char *const *args,
struct event **event_r, const char **error_r)
{
struct event *parent_event;
unsigned int log_type;
if (parent_event_id == 0)
parent_event = NULL;
else {
struct stats_event *stats_parent_event =
writer_client_find_event(client, parent_event_id);
if (stats_parent_event == NULL) {
*error_r = "Unknown parent event ID";
return FALSE;
}
parent_event = stats_parent_event->event;
}
if (args[0] == NULL || str_to_uint(args[0], &log_type) < 0 ||
log_type >= LOG_TYPE_COUNT) {
*error_r = "Invalid log type";
return FALSE;
}
const struct failure_context ctx = {
.type = (enum log_type)log_type
};
args++;
struct event *event = event_create(parent_event);
if (!event_import_unescaped(event, args, error_r)) {
event_unref(&event);
return FALSE;
}
stats_metrics_event(stats_metrics, event, &ctx);
*event_r = event;
return TRUE;
}
static bool
writer_client_input_event(struct writer_client *client,
const char *const *args, const char **error_r)
{
struct event *event, *global_event = NULL;
uint64_t parent_event_id, global_event_id;
bool ret;
if (args[1] == NULL || str_to_uint64(args[0], &global_event_id) < 0) {
*error_r = "Invalid global event ID";
return FALSE;
}
if (args[1] == NULL || str_to_uint64(args[1], &parent_event_id) < 0) {
*error_r = "Invalid parent ID";
return FALSE;
}
if (global_event_id != 0) {
struct stats_event *stats_global_event =
writer_client_find_event(client, global_event_id);
if (stats_global_event == NULL) {
*error_r = "Unknown global event ID";
return FALSE;
}
global_event = stats_global_event->event;
event_push_global(global_event);
}
ret = writer_client_run_event(client, parent_event_id, args+2,
&event, error_r);
if (global_event != NULL)
event_pop_global(global_event);
if (!ret)
return FALSE;
event_unref(&event);
return TRUE;
}
static bool
writer_client_input_event_begin(struct writer_client *client,
const char *const *args, const char **error_r)
{
struct event *event;
struct stats_event *stats_event;
uint64_t event_id, parent_event_id;
if (args[0] == NULL || args[1] == NULL ||
str_to_uint64(args[0], &event_id) < 0 ||
str_to_uint64(args[1], &parent_event_id) < 0) {
*error_r = "Invalid event IDs";
return FALSE;
}
if (writer_client_find_event(client, event_id) != NULL) {
*error_r = "Duplicate event ID";
return FALSE;
}
if (!writer_client_run_event(client, parent_event_id, args+2, &event, error_r))
return FALSE;
stats_event = i_new(struct stats_event, 1);
stats_event->id = event_id;
stats_event->event = event;
DLLIST_PREPEND(&client->events, stats_event);
hash_table_insert(client->events_hash, stats_event, stats_event);
return TRUE;
}
static bool
writer_client_input_event_update(struct writer_client *client,
const char *const *args, const char **error_r)
{
struct stats_event *stats_event, *parent_stats_event;
struct event *parent_event;
uint64_t event_id, parent_event_id;
if (args[0] == NULL || args[1] == NULL ||
str_to_uint64(args[0], &event_id) < 0 ||
str_to_uint64(args[1], &parent_event_id) < 0) {
*error_r = "Invalid event IDs";
return FALSE;
}
stats_event = writer_client_find_event(client, event_id);
if (stats_event == NULL) {
*error_r = "Unknown event ID";
return FALSE;
}
parent_stats_event = parent_event_id == 0 ? NULL :
writer_client_find_event(client, parent_event_id);
parent_event = parent_stats_event == NULL ? NULL :
parent_stats_event->event;
if (stats_event->event->parent != parent_event) {
*error_r = "Event unexpectedly changed parent";
return FALSE;
}
return event_import_unescaped(stats_event->event, args+2, error_r);
}
static bool
writer_client_input_event_end(struct writer_client *client,
const char *const *args, const char **error_r)
{
struct stats_event *stats_event;
uint64_t event_id;
if (args[0] == NULL || str_to_uint64(args[0], &event_id) < 0) {
*error_r = "Invalid event ID";
return FALSE;
}
stats_event = writer_client_find_event(client, event_id);
if (stats_event == NULL) {
*error_r = "Unknown event ID";
return FALSE;
}
DLLIST_REMOVE(&client->events, stats_event);
hash_table_remove(client->events_hash, stats_event);
event_unref(&stats_event->event);
i_free(stats_event);
return TRUE;
}
static bool
writer_client_input_category(struct writer_client *client ATTR_UNUSED,
const char *const *args, const char **error_r)
{
struct event_category *category, *parent;
if (args[0] == NULL) {
*error_r = "Missing category name";
return FALSE;
}
if (args[1] == NULL)
parent = NULL;
else if ((parent = event_category_find_registered(args[1])) == NULL) {
*error_r = "Unknown parent category";
return FALSE;
}
category = event_category_find_registered(args[0]);
if (category == NULL) {
/* new category - create */
stats_event_category_register(args[0], parent);
} else if (category->parent != parent) {
*error_r = t_strdup_printf(
"Category parent '%s' changed to '%s'",
category->parent == NULL ? "" : category->parent->name,
parent == NULL ? "" : parent->name);
return FALSE;
} else {
/* duplicate - ignore */
return TRUE;
}
return TRUE;
}
static int
writer_client_input_args(struct connection *conn, const char *const *args)
{
struct writer_client *client = (struct writer_client *)conn;
const char *error, *cmd = args[0];
bool ret;
if (cmd == NULL) {
i_error("Client sent empty line");
return 1;
}
if (strcmp(cmd, "EVENT") == 0)
ret = writer_client_input_event(client, args+1, &error);
else if (strcmp(cmd, "BEGIN") == 0)
ret = writer_client_input_event_begin(client, args+1, &error);
else if (strcmp(cmd, "UPDATE") == 0)
ret = writer_client_input_event_update(client, args+1, &error);
else if (strcmp(cmd, "END") == 0)
ret = writer_client_input_event_end(client, args+1, &error);
else if (strcmp(cmd, "CATEGORY") == 0)
ret = writer_client_input_category(client, args+1, &error);
else {
error = "Unknown command";
ret = FALSE;
}
if (!ret) {
i_error("Client sent invalid input for %s: %s (input: %s)",
cmd, error, t_strarray_join(args, "\t"));
return -1;
}
return 1;
}
static struct connection_settings client_set = {
.service_name_in = "stats-client",
.service_name_out = "stats-server",
.major_version = 4,
.minor_version = 0,
.input_max_size = 1024*128, /* "big enough" */
.output_max_size = SIZE_MAX,
.client = FALSE,
};
static const struct connection_vfuncs client_vfuncs = {
.destroy = writer_client_destroy,
.input_args = writer_client_input_args,
};
static void
client_writer_update_connections_internal(void *context ATTR_UNUSED)
{
struct connection *conn;
for (conn = writer_clients->connections; conn != NULL; conn = conn->next) {
struct writer_client *client =
container_of(conn, struct writer_client, conn);
client_writer_send_handshake(client);
}
timeout_remove(&to_update_clients);
}
void client_writer_update_connections(void)
{
if (to_update_clients != NULL)
return;
to_update_clients = timeout_add(STATS_UPDATE_CLIENTS_DELAY_MSECS,
client_writer_update_connections_internal,
NULL);
}
void client_writers_init(void)
{
writer_clients = connection_list_init(&client_set, &client_vfuncs);
}
void client_writers_deinit(void)
{
timeout_remove(&to_update_clients);
connection_list_deinit(&writer_clients);
}
|