summaryrefslogtreecommitdiffstats
path: root/src/lib-master/stats-client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib-master/stats-client.c')
-rw-r--r--src/lib-master/stats-client.c399
1 files changed, 399 insertions, 0 deletions
diff --git a/src/lib-master/stats-client.c b/src/lib-master/stats-client.c
new file mode 100644
index 0000000..1f5037a
--- /dev/null
+++ b/src/lib-master/stats-client.c
@@ -0,0 +1,399 @@
+/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "str.h"
+#include "strescape.h"
+#include "ostream.h"
+#include "time-util.h"
+#include "lib-event-private.h"
+#include "event-filter.h"
+#include "connection.h"
+#include "stats-client.h"
+
+#define STATS_CLIENT_TIMEOUT_MSECS (5*1000)
+#define STATS_CLIENT_RECONNECT_INTERVAL_MSECS (10*1000)
+
+struct stats_client {
+ struct connection conn;
+ struct event_filter *filter;
+ struct ioloop *ioloop;
+ struct timeout *to_reconnect;
+ bool handshaked;
+ bool handshake_received_at_least_once;
+ bool silent_notfound_errors;
+};
+
+static struct connection_list *stats_clients;
+
+static void stats_client_connect(struct stats_client *client);
+
+static int
+client_handshake_filter(const char *const *args, struct event_filter **filter_r,
+ const char **error_r)
+{
+ if (strcmp(args[0], "FILTER") != 0) {
+ *error_r = "Expected FILTER";
+ return -1;
+ }
+ if (args[1] == NULL || args[1][0] == '\0') {
+ *filter_r = NULL;
+ return 0;
+ }
+
+ *filter_r = event_filter_create();
+ if (!event_filter_import(*filter_r, t_str_tabunescape(args[1]), error_r)) {
+ event_filter_unref(filter_r);
+ return -1;
+ }
+ return 0;
+}
+
+static int
+stats_client_handshake(struct stats_client *client, const char *const *args)
+{
+ struct event_filter *filter;
+ const char *error;
+
+ if (client_handshake_filter(args, &filter, &error) < 0) {
+ i_error("stats: Received invalid handshake: %s (input: %s)",
+ error, t_strarray_join(args, "\t"));
+ return -1;
+ }
+ client->handshaked = TRUE;
+ client->handshake_received_at_least_once = TRUE;
+ if (client->ioloop != NULL)
+ io_loop_stop(client->ioloop);
+
+ if (filter == NULL)
+ filter = event_filter_create();
+
+ event_filter_unref(&client->filter);
+ client->filter = filter;
+ event_set_global_debug_send_filter(client->filter);
+ return 1;
+}
+
+static int
+stats_client_input_args(struct connection *conn, const char *const *args)
+{
+ struct stats_client *client = (struct stats_client *)conn;
+
+ return stats_client_handshake(client, args);
+
+}
+
+static void stats_client_reconnect(struct stats_client *client)
+{
+ timeout_remove(&client->to_reconnect);
+ stats_client_connect(client);
+}
+
+static void stats_client_destroy(struct connection *conn)
+{
+ struct stats_client *client = (struct stats_client *)conn;
+ struct event *event;
+ unsigned int reconnect_msecs = STATS_CLIENT_RECONNECT_INTERVAL_MSECS;
+
+ /* after reconnection the IDs need to be re-sent */
+ for (event = events_get_head(); event != NULL; event = event->next)
+ event->sent_to_stats_id = 0;
+
+ client->handshaked = FALSE;
+ connection_disconnect(conn);
+ if (client->ioloop != NULL) {
+ /* waiting for stats handshake to finish */
+ io_loop_stop(client->ioloop);
+ } else if (conn->connect_finished.tv_sec != 0) {
+ int msecs_since_connected =
+ timeval_diff_msecs(&ioloop_timeval,
+ &conn->connect_finished);
+ if (msecs_since_connected >= STATS_CLIENT_RECONNECT_INTERVAL_MSECS) {
+ /* reconnect immdiately */
+ reconnect_msecs = 0;
+ } else {
+ /* wait for reconnect interval since we last
+ were connected. */
+ reconnect_msecs = STATS_CLIENT_RECONNECT_INTERVAL_MSECS -
+ msecs_since_connected;
+ }
+ }
+ if (client->to_reconnect == NULL) {
+ client->to_reconnect =
+ timeout_add(reconnect_msecs,
+ stats_client_reconnect, client);
+ }
+}
+
+static const struct connection_settings stats_client_set = {
+ .service_name_in = "stats-server",
+ .service_name_out = "stats-client",
+ .major_version = 4,
+ .minor_version = 0,
+
+ .input_max_size = SIZE_MAX,
+ .output_max_size = SIZE_MAX,
+ .client = TRUE
+};
+
+static const struct connection_vfuncs stats_client_vfuncs = {
+ .destroy = stats_client_destroy,
+ .input_args = stats_client_input_args,
+};
+
+static void
+stats_event_write(struct stats_client *client,
+ struct event *event, struct event *global_event,
+ const struct failure_context *ctx, string_t *str, bool begin)
+{
+ struct event *merged_event;
+ struct event *parent_event;
+ bool update = FALSE, flush_output = FALSE;
+
+ merged_event = begin ? event_ref(event) : event_minimize(event);
+ parent_event = merged_event->parent;
+
+ if (parent_event != NULL) {
+ if (parent_event->sent_to_stats_id !=
+ parent_event->change_id) {
+ stats_event_write(client, parent_event, NULL,
+ ctx, str, TRUE);
+ }
+ i_assert(parent_event->sent_to_stats_id != 0);
+ }
+ if (begin) {
+ i_assert(event == merged_event);
+ update = (event->sent_to_stats_id != 0);
+ const char *cmd = !update ? "BEGIN" : "UPDATE";
+ str_printfa(str, "%s\t%"PRIu64"\t", cmd, event->id);
+ event->sent_to_stats_id = event->change_id;
+ /* Flush the BEGINs early on, because the stats event writing
+ may trigger more events recursively (e.g. data_stack_grow),
+ which may use the BEGIN events as parents. */
+ flush_output = !update;
+ } else {
+ str_printfa(str, "EVENT\t%"PRIu64"\t",
+ global_event == NULL ? 0 : global_event->id);
+ }
+ str_printfa(str, "%"PRIu64"\t",
+ parent_event == NULL ? 0 : parent_event->id);
+ if (!update)
+ str_printfa(str, "%u\t", ctx->type);
+ event_export(merged_event, str);
+ str_append_c(str, '\n');
+ event_unref(&merged_event);
+ if (flush_output || str_len(str) >= IO_BLOCK_SIZE) {
+ o_stream_nsend(client->conn.output, str_data(str), str_len(str));
+ str_truncate(str, 0);
+ }
+}
+
+static void
+stats_client_send_event(struct stats_client *client, struct event *event,
+ const struct failure_context *ctx)
+{
+ static int recursion = 0;
+
+ if (!client->handshaked)
+ return;
+
+ if (!event_filter_match(client->filter, event, ctx))
+ return;
+
+ /* Need to send the event for stats and/or export */
+ string_t *str = t_str_new(256);
+
+ if (++recursion == 0)
+ o_stream_cork(client->conn.output);
+ struct event *global_event = event_get_global();
+ if (global_event != NULL)
+ stats_event_write(client, global_event, NULL, ctx, str, TRUE);
+
+ stats_event_write(client, event, global_event, ctx, str, FALSE);
+ o_stream_nsend(client->conn.output, str_data(str), str_len(str));
+
+ i_assert(recursion > 0);
+ if (--recursion == 0) {
+ if (o_stream_uncork_flush(client->conn.output) < 0) {
+ e_error(client->conn.event, "write() failed: %s",
+ o_stream_get_error(client->conn.output));
+ }
+ }
+}
+
+static void
+stats_client_free_event(struct stats_client *client, struct event *event)
+{
+ if (event->sent_to_stats_id == 0)
+ return;
+ o_stream_nsend_str(client->conn.output,
+ t_strdup_printf("END\t%"PRIu64"\n", event->id));
+}
+
+static bool
+stats_event_callback(struct event *event, enum event_callback_type type,
+ struct failure_context *ctx,
+ const char *fmt ATTR_UNUSED, va_list args ATTR_UNUSED)
+{
+ if (stats_clients->connections == NULL)
+ return TRUE;
+ struct stats_client *client =
+ (struct stats_client *)stats_clients->connections;
+ if (client->conn.output == NULL || client->conn.output->closed)
+ return TRUE;
+
+ switch (type) {
+ case EVENT_CALLBACK_TYPE_CREATE:
+ break;
+ case EVENT_CALLBACK_TYPE_SEND:
+ stats_client_send_event(client, event, ctx);
+ break;
+ case EVENT_CALLBACK_TYPE_FREE:
+ stats_client_free_event(client, event);
+ break;
+ }
+ return TRUE;
+}
+
+static void
+stats_category_append(string_t *str, const struct event_category *category)
+{
+ str_append(str, "CATEGORY\t");
+ str_append_tabescaped(str, category->name);
+ if (category->parent != NULL) {
+ str_append_c(str, '\t');
+ str_append_tabescaped(str, category->parent->name);
+ }
+ str_append_c(str, '\n');
+}
+
+static void stats_category_registered(struct event_category *category)
+{
+ if (stats_clients->connections == NULL)
+ return;
+ struct stats_client *client =
+ (struct stats_client *)stats_clients->connections;
+ if (client->conn.output == NULL)
+ return;
+
+ string_t *str = t_str_new(64);
+ stats_category_append(str, category);
+ o_stream_nsend(client->conn.output, str_data(str), str_len(str));
+}
+
+static void stats_global_init(void)
+{
+ stats_clients = connection_list_init(&stats_client_set,
+ &stats_client_vfuncs);
+ event_register_callback(stats_event_callback);
+ event_category_register_callback(stats_category_registered);
+}
+
+static void stats_global_deinit(void)
+{
+ event_unregister_callback(stats_event_callback);
+ event_category_unregister_callback(stats_category_registered);
+ connection_list_deinit(&stats_clients);
+}
+
+static void stats_client_timeout(struct stats_client *client)
+{
+ e_error(client->conn.event, "Timeout waiting for handshake response");
+ io_loop_stop(client->ioloop);
+}
+
+static void stats_client_wait(struct stats_client *client)
+{
+ struct ioloop *prev_ioloop = current_ioloop;
+ struct timeout *to;
+
+ i_assert(client->to_reconnect == NULL);
+
+ client->ioloop = io_loop_create();
+ to = timeout_add(STATS_CLIENT_TIMEOUT_MSECS, stats_client_timeout, client);
+ connection_switch_ioloop(&client->conn);
+ io_loop_run(client->ioloop);
+ io_loop_set_current(prev_ioloop);
+ connection_switch_ioloop(&client->conn);
+ if (client->to_reconnect != NULL)
+ client->to_reconnect = io_loop_move_timeout(&client->to_reconnect);
+ io_loop_set_current(client->ioloop);
+ timeout_remove(&to);
+ io_loop_destroy(&client->ioloop);
+}
+
+static void stats_client_send_registered_categories(struct stats_client *client)
+{
+ struct event_category *const *categories;
+ unsigned int i, count;
+
+ string_t *str = t_str_new(64);
+ categories = event_get_registered_categories(&count);
+ for (i = 0; i < count; i++)
+ stats_category_append(str, categories[i]);
+ o_stream_nsend(client->conn.output, str_data(str), str_len(str));
+}
+
+static void stats_client_connect(struct stats_client *client)
+{
+ if (connection_client_connect(&client->conn) == 0) {
+ /* read the handshake so the global debug filter is updated */
+ stats_client_send_registered_categories(client);
+ if (!client->handshake_received_at_least_once)
+ stats_client_wait(client);
+ } else if (!client->silent_notfound_errors ||
+ (errno != ENOENT && errno != ECONNREFUSED)) {
+ i_error("net_connect_unix(%s) failed: %m", client->conn.name);
+ }
+}
+
+struct stats_client *
+stats_client_init(const char *path, bool silent_notfound_errors)
+{
+ struct stats_client *client;
+
+ if (stats_clients == NULL)
+ stats_global_init();
+
+ client = i_new(struct stats_client, 1);
+ client->silent_notfound_errors = silent_notfound_errors;
+ connection_init_client_unix(stats_clients, &client->conn, path);
+ stats_client_connect(client);
+ return client;
+}
+
+static int stats_client_deinit_callback(struct connection *conn)
+{
+ struct ostream *output = conn->output;
+ int ret = o_stream_flush(output);
+ if (ret < 0) {
+ e_error(conn->event, "write() failed: %s",
+ o_stream_get_error(output));
+ }
+ if (ret != 0)
+ io_loop_stop(current_ioloop);
+ return ret;
+}
+
+void stats_client_deinit(struct stats_client **_client)
+{
+ struct stats_client *client = *_client;
+
+ *_client = NULL;
+
+ if (client->conn.output != NULL && !client->conn.output->closed &&
+ o_stream_get_buffer_used_size(client->conn.output) > 0) {
+ o_stream_set_flush_callback(client->conn.output,
+ stats_client_deinit_callback,
+ &client->conn);
+ o_stream_uncork(client->conn.output);
+ stats_client_wait(client);
+ }
+
+ event_filter_unref(&client->filter);
+ connection_deinit(&client->conn);
+ timeout_remove(&client->to_reconnect);
+ i_free(client);
+
+ if (stats_clients->connections == NULL)
+ stats_global_deinit();
+}