summaryrefslogtreecommitdiffstats
path: root/src/lib-master/anvil-client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib-master/anvil-client.c')
-rw-r--r--src/lib-master/anvil-client.c275
1 files changed, 275 insertions, 0 deletions
diff --git a/src/lib-master/anvil-client.c b/src/lib-master/anvil-client.c
new file mode 100644
index 0000000..0cda77f
--- /dev/null
+++ b/src/lib-master/anvil-client.c
@@ -0,0 +1,275 @@
+/* Copyright (c) 2009-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "net.h"
+#include "istream.h"
+#include "ostream.h"
+#include "array.h"
+#include "aqueue.h"
+#include "anvil-client.h"
+
+struct anvil_query {
+ anvil_callback_t *callback;
+ void *context;
+};
+
+struct anvil_client {
+ char *path;
+ int fd;
+ struct istream *input;
+ struct ostream *output;
+ struct io *io;
+ struct timeout *to_query;
+
+ struct timeout *to_reconnect;
+ time_t last_reconnect;
+
+ ARRAY(struct anvil_query *) queries_arr;
+ struct aqueue *queries;
+
+ bool (*reconnect_callback)(void);
+ enum anvil_client_flags flags;
+};
+
+#define ANVIL_HANDSHAKE "VERSION\tanvil\t1\t0\n"
+#define ANVIL_INBUF_SIZE 1024
+#define ANVIL_RECONNECT_MIN_SECS 5
+#define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)
+
+static void anvil_client_disconnect(struct anvil_client *client);
+
+struct anvil_client *
+anvil_client_init(const char *path, bool (*reconnect_callback)(void),
+ enum anvil_client_flags flags)
+{
+ struct anvil_client *client;
+
+ client = i_new(struct anvil_client, 1);
+ client->path = i_strdup(path);
+ client->reconnect_callback = reconnect_callback;
+ client->flags = flags;
+ client->fd = -1;
+ i_array_init(&client->queries_arr, 32);
+ client->queries = aqueue_init(&client->queries_arr.arr);
+ return client;
+}
+
+void anvil_client_deinit(struct anvil_client **_client)
+{
+ struct anvil_client *client = *_client;
+
+ *_client = NULL;
+
+ anvil_client_disconnect(client);
+ array_free(&client->queries_arr);
+ aqueue_deinit(&client->queries);
+ i_free(client->path);
+ i_assert(client->to_reconnect == NULL);
+ i_free(client);
+}
+
+static void anvil_reconnect(struct anvil_client *client)
+{
+ anvil_client_disconnect(client);
+ if (client->reconnect_callback != NULL) {
+ if (!client->reconnect_callback()) {
+ /* no reconnection */
+ return;
+ }
+ }
+
+ if (ioloop_time - client->last_reconnect < ANVIL_RECONNECT_MIN_SECS) {
+ if (client->to_reconnect == NULL) {
+ client->to_reconnect =
+ timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
+ anvil_reconnect, client);
+ }
+ } else {
+ client->last_reconnect = ioloop_time;
+ (void)anvil_client_connect(client, FALSE);
+ }
+}
+
+static void anvil_input(struct anvil_client *client)
+{
+ struct anvil_query *const *queries;
+ struct anvil_query *query;
+ const char *line;
+ unsigned int count;
+
+ queries = array_get(&client->queries_arr, &count);
+ while ((line = i_stream_read_next_line(client->input)) != NULL) {
+ if (aqueue_count(client->queries) == 0) {
+ i_error("anvil: Unexpected input: %s", line);
+ continue;
+ }
+
+ query = queries[aqueue_idx(client->queries, 0)];
+ if (query->callback != NULL) T_BEGIN {
+ query->callback(line, query->context);
+ } T_END;
+ i_free(query);
+ aqueue_delete_tail(client->queries);
+ }
+ if (client->input->stream_errno != 0) {
+ i_error("read(%s) failed: %s", client->path,
+ i_stream_get_error(client->input));
+ anvil_reconnect(client);
+ } else if (client->input->eof) {
+ i_error("read(%s) failed: EOF", client->path);
+ anvil_reconnect(client);
+ } else if (client->to_query != NULL) {
+ if (aqueue_count(client->queries) == 0)
+ timeout_remove(&client->to_query);
+ else
+ timeout_reset(client->to_query);
+ }
+}
+
+int anvil_client_connect(struct anvil_client *client, bool retry)
+{
+ int fd;
+
+ i_assert(client->fd == -1);
+
+ fd = retry ? net_connect_unix_with_retries(client->path, 5000) :
+ net_connect_unix(client->path);
+ if (fd == -1) {
+ if (errno != ENOENT ||
+ (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
+ i_error("net_connect_unix(%s) failed: %m",
+ client->path);
+ }
+ return -1;
+ }
+
+ timeout_remove(&client->to_reconnect);
+
+ client->fd = fd;
+ client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
+ client->output = o_stream_create_fd(fd, SIZE_MAX);
+ client->io = io_add(fd, IO_READ, anvil_input, client);
+ if (o_stream_send_str(client->output, ANVIL_HANDSHAKE) < 0) {
+ i_error("write(%s) failed: %s", client->path,
+ o_stream_get_error(client->output));
+ anvil_reconnect(client);
+ return -1;
+ }
+ return 0;
+}
+
+static void anvil_client_cancel_queries(struct anvil_client *client)
+{
+ struct anvil_query *const *queries, *query;
+ unsigned int count;
+
+ queries = array_get(&client->queries_arr, &count);
+ while (aqueue_count(client->queries) > 0) {
+ query = queries[aqueue_idx(client->queries, 0)];
+ if (query->callback != NULL)
+ query->callback(NULL, query->context);
+ i_free(query);
+ aqueue_delete_tail(client->queries);
+ }
+ timeout_remove(&client->to_query);
+}
+
+static void anvil_client_disconnect(struct anvil_client *client)
+{
+ anvil_client_cancel_queries(client);
+ if (client->fd != -1) {
+ io_remove(&client->io);
+ i_stream_destroy(&client->input);
+ o_stream_destroy(&client->output);
+ net_disconnect(client->fd);
+ client->fd = -1;
+ }
+ timeout_remove(&client->to_reconnect);
+}
+
+static void anvil_client_timeout(struct anvil_client *client)
+{
+ i_assert(aqueue_count(client->queries) > 0);
+
+ i_error("%s: Anvil queries timed out after %u secs - aborting queries",
+ client->path, ANVIL_QUERY_TIMEOUT_MSECS/1000);
+ /* perhaps reconnect helps */
+ anvil_reconnect(client);
+}
+
+static int anvil_client_send(struct anvil_client *client, const char *cmd)
+{
+ struct const_iovec iov[2];
+
+ if (client->fd == -1) {
+ if (anvil_client_connect(client, FALSE) < 0)
+ return -1;
+ }
+
+ iov[0].iov_base = cmd;
+ iov[0].iov_len = strlen(cmd);
+ iov[1].iov_base = "\n";
+ iov[1].iov_len = 1;
+ if (o_stream_sendv(client->output, iov, 2) < 0) {
+ i_error("write(%s) failed: %s", client->path,
+ o_stream_get_error(client->output));
+ anvil_reconnect(client);
+ return -1;
+ }
+ return 0;
+}
+
+struct anvil_query *
+anvil_client_query(struct anvil_client *client, const char *query,
+ anvil_callback_t *callback, void *context)
+{
+ struct anvil_query *anvil_query;
+
+ anvil_query = i_new(struct anvil_query, 1);
+ anvil_query->callback = callback;
+ anvil_query->context = context;
+ aqueue_append(client->queries, &anvil_query);
+ if (anvil_client_send(client, query) < 0) {
+ /* connection failure. add a delayed failure callback.
+ the caller may not expect the callback to be called
+ immediately. */
+ timeout_remove(&client->to_query);
+ client->to_query =
+ timeout_add_short(0, anvil_client_cancel_queries, client);
+ } else if (client->to_query == NULL) {
+ client->to_query = timeout_add(ANVIL_QUERY_TIMEOUT_MSECS,
+ anvil_client_timeout, client);
+ }
+ return anvil_query;
+}
+
+void anvil_client_query_abort(struct anvil_client *client,
+ struct anvil_query **_query)
+{
+ struct anvil_query *query = *_query;
+ struct anvil_query *const *queries;
+ unsigned int i, count;
+
+ *_query = NULL;
+
+ count = aqueue_count(client->queries);
+ queries = array_front(&client->queries_arr);
+ for (i = 0; i < count; i++) {
+ if (queries[aqueue_idx(client->queries, i)] == query) {
+ query->callback = NULL;
+ return;
+ }
+ }
+ i_panic("anvil query to be aborted doesn't exist");
+}
+
+void anvil_client_cmd(struct anvil_client *client, const char *cmd)
+{
+ (void)anvil_client_send(client, cmd);
+}
+
+bool anvil_client_is_connected(struct anvil_client *client)
+{
+ return client->fd != -1;
+}