diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:51:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:51:24 +0000 |
commit | f7548d6d28c313cf80e6f3ef89aed16a19815df1 (patch) | |
tree | a3f6f2a3f247293bee59ecd28e8cd8ceb6ca064a /src/lib-sql/driver-cassandra.c | |
parent | Initial commit. (diff) | |
download | dovecot-upstream.tar.xz dovecot-upstream.zip |
Adding upstream version 1:2.3.19.1+dfsg1.upstream/1%2.3.19.1+dfsg1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib-sql/driver-cassandra.c')
-rw-r--r-- | src/lib-sql/driver-cassandra.c | 2579 |
1 files changed, 2579 insertions, 0 deletions
diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c new file mode 100644 index 0000000..4e66809 --- /dev/null +++ b/src/lib-sql/driver-cassandra.c @@ -0,0 +1,2579 @@ +/* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "istream.h" +#include "array.h" +#include "hostpid.h" +#include "hex-binary.h" +#include "str.h" +#include "ioloop.h" +#include "net.h" +#include "write-full.h" +#include "time-util.h" +#include "var-expand.h" +#include "safe-memset.h" +#include "settings-parser.h" +#include "sql-api-private.h" + +#ifdef BUILD_CASSANDRA +#include <stdio.h> +#include <fcntl.h> +#include <unistd.h> +#include <cassandra.h> +#include <pthread.h> + +#define IS_CONNECTED(db) \ + ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \ + (db)->api.state != SQL_DB_STATE_CONNECTING) + +#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60 +#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50 +#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60) + +#define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000) + +typedef void driver_cassandra_callback_t(CassFuture *future, void *context); + +enum cassandra_counter_type { + CASSANDRA_COUNTER_TYPE_QUERY_SENT, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE, + CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER, + CASSANDRA_COUNTER_TYPE_QUERY_SLOW, + + CASSANDRA_COUNTER_COUNT +}; +static const char *counter_names[CASSANDRA_COUNTER_COUNT] = { + "sent", + "recv_ok", + "recv_err_no_hosts", + "recv_err_queue_full", + "recv_err_client_timeout", + "recv_err_server_timeout", + "recv_err_server_unavailable", + "recv_err_other", + "slow", +}; + +enum cassandra_query_type { + CASSANDRA_QUERY_TYPE_READ, + CASSANDRA_QUERY_TYPE_READ_MORE, + CASSANDRA_QUERY_TYPE_WRITE, + CASSANDRA_QUERY_TYPE_DELETE, + + CASSANDRA_QUERY_TYPE_COUNT +}; + +static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = { + "read", "read-more", "write", "delete" +}; + +struct cassandra_callback { + unsigned int id; + struct timeout *to; + CassFuture *future; + struct cassandra_db *db; + driver_cassandra_callback_t *callback; + void *context; +}; + +struct cassandra_db { + struct sql_db api; + + char *hosts, *keyspace, *user, *password; + CassConsistency read_consistency, write_consistency, delete_consistency; + CassConsistency read_fallback_consistency, write_fallback_consistency; + CassConsistency delete_fallback_consistency; + CassLogLevel log_level; + bool debug_queries; + bool latency_aware_routing; + bool init_ssl; + unsigned int protocol_version; + unsigned int num_threads; + unsigned int connect_timeout_msecs, request_timeout_msecs; + unsigned int warn_timeout_msecs; + unsigned int heartbeat_interval_secs, idle_timeout_secs; + unsigned int execution_retry_interval_msecs, execution_retry_times; + unsigned int page_size; + in_port_t port; + + CassCluster *cluster; + CassSession *session; + CassTimestampGen *timestamp_gen; + CassSsl *ssl; + + int fd_pipe[2]; + struct io *io_pipe; + ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares; + ARRAY(struct cassandra_callback *) callbacks; + ARRAY(struct cassandra_result *) results; + unsigned int callback_ids; + + char *metrics_path; + char *ssl_ca_file; + char *ssl_cert_file; + char *ssl_private_key_file; + char *ssl_private_key_password; + CassSslVerifyFlags ssl_verify_flags; + + struct timeout *to_metrics; + uint64_t counters[CASSANDRA_COUNTER_COUNT]; + + struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT]; + time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT]; + unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT]; + + /* for synchronous queries: */ + struct ioloop *ioloop, *orig_ioloop; + struct sql_result *sync_result; + + char *error; +}; + +struct cassandra_result { + struct sql_result api; + CassStatement *statement; + const CassResult *result; + CassIterator *iterator; + char *query; + char *error; + CassConsistency consistency, fallback_consistency; + enum cassandra_query_type query_type; + struct timeval page0_start_time, start_time, finish_time; + unsigned int row_count, total_row_count, page_num; + cass_int64_t timestamp; + + pool_t row_pool; + ARRAY_TYPE(const_string) fields; + ARRAY(size_t) field_sizes; + + sql_query_callback_t *callback; + void *context; + + bool is_prepared:1; + bool query_sent:1; + bool finished:1; + bool paging_continues:1; +}; + +struct cassandra_transaction_context { + struct sql_transaction_context ctx; + int refcount; + + sql_commit_callback_t *callback; + void *context; + + struct cassandra_sql_statement *stmt; + char *query; + cass_int64_t query_timestamp; + char *error; + + bool begin_succeeded:1; + bool begin_failed:1; + bool failed:1; +}; + +struct cassandra_sql_arg { + unsigned int column_idx; + + char *value_str; + const unsigned char *value_binary; + size_t value_binary_size; + int64_t value_int64; +}; + +struct cassandra_sql_statement { + struct sql_statement stmt; + + struct cassandra_sql_prepared_statement *prep; + CassStatement *cass_stmt; + + ARRAY(struct cassandra_sql_arg) pending_args; + cass_int64_t timestamp; + + struct cassandra_result *result; +}; + +struct cassandra_sql_prepared_statement { + struct sql_prepared_statement prep_stmt; + + /* NULL, until the prepare is asynchronously finished */ + const CassPrepared *prepared; + /* statements waiting for prepare to finish */ + ARRAY(struct cassandra_sql_statement *) pending_statements; + /* an error here will cause the prepare to be retried on the next + execution attempt. */ + char *error; + + bool pending; +}; + +extern const struct sql_db driver_cassandra_db; +extern const struct sql_result driver_cassandra_result; + +static struct { + CassConsistency consistency; + const char *name; +} cass_consistency_names[] = { + { CASS_CONSISTENCY_ANY, "any" }, + { CASS_CONSISTENCY_ONE, "one" }, + { CASS_CONSISTENCY_TWO, "two" }, + { CASS_CONSISTENCY_THREE, "three" }, + { CASS_CONSISTENCY_QUORUM, "quorum" }, + { CASS_CONSISTENCY_ALL, "all" }, + { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" }, + { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" }, + { CASS_CONSISTENCY_SERIAL, "serial" }, + { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" }, + { CASS_CONSISTENCY_LOCAL_ONE, "local-one" } +}; + +static struct { + CassLogLevel log_level; + const char *name; +} cass_log_level_names[] = { + { CASS_LOG_CRITICAL, "critical" }, + { CASS_LOG_ERROR, "error" }, + { CASS_LOG_WARN, "warn" }, + { CASS_LOG_INFO, "info" }, + { CASS_LOG_DEBUG, "debug" }, + { CASS_LOG_TRACE, "trace" } +}; + +static struct event_category event_category_cassandra = { + .parent = &event_category_sql, + .name = "cassandra" +}; + +static pthread_t main_thread_id; +static bool main_thread_id_set; + +static void driver_cassandra_prepare_pending(struct cassandra_db *db); +static void +prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt); +static void driver_cassandra_result_send_query(struct cassandra_result *result); +static void driver_cassandra_send_queries(struct cassandra_db *db); +static void result_finish(struct cassandra_result *result); + +static void log_one_line(const CassLogMessage *message, + enum log_type log_type, const char *log_level_str, + const char *text, size_t text_len) +{ + /* NOTE: We may not be in the main thread. We can't use the + standard Dovecot functions that may use data stack. That's why + we can't use i_log_type() in here, but have to re-implement the + internal logging protocol. Otherwise preserve Cassandra's own + logging format. */ + fprintf(stderr, "\001%c%s %u.%03u %s(%s:%d:%s): %.*s\n", + log_type+1, my_pid, + (unsigned int)(message->time_ms / 1000), + (unsigned int)(message->time_ms % 1000), + log_level_str, + message->file, message->line, message->function, + (int)text_len, text); +} + +static void +driver_cassandra_log_handler(const CassLogMessage* message, + void *data ATTR_UNUSED) +{ + enum log_type log_type = LOG_TYPE_ERROR; + const char *log_level_str = ""; + + switch (message->severity) { + case CASS_LOG_DISABLED: + case CASS_LOG_LAST_ENTRY: + i_unreached(); + case CASS_LOG_CRITICAL: + log_type = LOG_TYPE_PANIC; + break; + case CASS_LOG_ERROR: + log_type = LOG_TYPE_ERROR; + break; + case CASS_LOG_WARN: + log_type = LOG_TYPE_WARNING; + break; + case CASS_LOG_INFO: + log_type = LOG_TYPE_INFO; + break; + case CASS_LOG_TRACE: + log_level_str = "[TRACE] "; + /* fall through */ + case CASS_LOG_DEBUG: + log_type = LOG_TYPE_DEBUG; + break; + } + + /* Log message may contain LFs, so log each line separately. */ + const char *p, *line = message->message; + while ((p = strchr(line, '\n')) != NULL) { + log_one_line(message, log_type, log_level_str, line, p - line); + line = p+1; + } + log_one_line(message, log_type, log_level_str, line, strlen(line)); +} + +static void driver_cassandra_init_log(void) +{ + failure_callback_t *fatal_callback, *error_callback; + failure_callback_t *info_callback, *debug_callback; + + i_get_failure_handlers(&fatal_callback, &error_callback, + &info_callback, &debug_callback); + if (i_failure_handler_is_internal(debug_callback)) { + /* Using internal logging protocol. Use it ourself to set log + levels correctly. */ + cass_log_set_callback(driver_cassandra_log_handler, NULL); + } +} + +static int consistency_parse(const char *str, CassConsistency *consistency_r) +{ + unsigned int i; + + for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) { + if (strcmp(cass_consistency_names[i].name, str) == 0) { + *consistency_r = cass_consistency_names[i].consistency; + return 0; + } + } + return -1; +} + +static int log_level_parse(const char *str, CassLogLevel *log_level_r) +{ + unsigned int i; + + for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) { + if (strcmp(cass_log_level_names[i].name, str) == 0) { + *log_level_r = cass_log_level_names[i].log_level; + return 0; + } + } + return -1; +} + +static void driver_cassandra_set_state(struct cassandra_db *db, + enum sql_db_state state) +{ + /* switch back to original ioloop in case the caller wants to + add/remove timeouts */ + if (db->ioloop != NULL) + io_loop_set_current(db->orig_ioloop); + sql_db_set_state(&db->api, state); + if (db->ioloop != NULL) + io_loop_set_current(db->ioloop); +} + +static void driver_cassandra_close(struct cassandra_db *db, const char *error) +{ + struct cassandra_sql_prepared_statement *prep_stmt; + struct cassandra_result *const *resultp; + + io_remove(&db->io_pipe); + if (db->fd_pipe[0] != -1) { + i_close_fd(&db->fd_pipe[0]); + i_close_fd(&db->fd_pipe[1]); + } + driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED); + + array_foreach_elem(&db->pending_prepares, prep_stmt) { + prep_stmt->pending = FALSE; + prep_stmt->error = i_strdup(error); + prepare_finish_pending_statements(prep_stmt); + } + array_clear(&db->pending_prepares); + + while (array_count(&db->results) > 0) { + resultp = array_front(&db->results); + if ((*resultp)->error == NULL) + (*resultp)->error = i_strdup(error); + result_finish(*resultp); + } + + if (db->ioloop != NULL) { + /* running a sync query, stop it */ + io_loop_stop(db->ioloop); + } +} + +static void driver_cassandra_log_error(struct cassandra_db *db, + CassFuture *future, const char *str) +{ + const char *message; + size_t size; + + cass_future_error_message(future, &message, &size); + e_error(db->api.event, "%s: %.*s", str, (int)size, message); +} + +static struct cassandra_callback * +cassandra_callback_detach(struct cassandra_db *db, unsigned int id) +{ + struct cassandra_callback *cb, *const *cbp; + + /* usually there are only a few callbacks, so don't bother with using + a hash table */ + array_foreach(&db->callbacks, cbp) { + cb = *cbp; + if (cb->id == id) { + array_delete(&db->callbacks, + array_foreach_idx(&db->callbacks, cbp), 1); + return cb; + } + } + return NULL; +} + +static void cassandra_callback_run(struct cassandra_callback *cb) +{ + timeout_remove(&cb->to); + cb->callback(cb->future, cb->context); + cass_future_free(cb->future); + i_free(cb); +} + +static void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED, + void *context) +{ + struct cassandra_callback *cb = context; + + if (pthread_equal(pthread_self(), main_thread_id) != 0) { + /* called immediately from the main thread. */ + cassandra_callback_detach(cb->db, cb->id); + cb->to = timeout_add_short(0, cassandra_callback_run, cb); + return; + } + + /* this isn't the main thread - communicate with main thread by + writing the callback id to the pipe. note that we must not use + almost any dovecot functions here because most of them are using + data-stack, which isn't thread-safe. especially don't use + i_error() here. */ + if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) { + const char *str = t_strdup_printf( + "cassandra: write(pipe) failed: %s\n", + strerror(errno)); + (void)write_full(STDERR_FILENO, str, strlen(str)); + } +} + +static void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id) +{ + struct cassandra_callback *cb; + + cb = cassandra_callback_detach(db, id); + if (cb == NULL) + i_panic("cassandra: Received unknown ID %u", id); + cassandra_callback_run(cb); +} + +static void driver_cassandra_input(struct cassandra_db *db) +{ + unsigned int ids[1024]; + ssize_t ret; + + ret = read(db->fd_pipe[0], ids, sizeof(ids)); + if (ret < 0) + e_error(db->api.event, "read(pipe) failed: %m"); + else if (ret == 0) + e_error(db->api.event, "read(pipe) failed: EOF"); + else if (ret % sizeof(ids[0]) != 0) + e_error(db->api.event, "read(pipe) returned wrong amount of data"); + else { + /* success */ + unsigned int i, count = ret / sizeof(ids[0]); + + for (i = 0; i < count && + db->api.state != SQL_DB_STATE_DISCONNECTED; i++) + driver_cassandra_input_id(db, ids[i]); + return; + } + driver_cassandra_close(db, "IPC pipe closed"); +} + +static void +driver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db, + driver_cassandra_callback_t *callback, + void *context) +{ + struct cassandra_callback *cb; + + i_assert(callback != NULL); + + cb = i_new(struct cassandra_callback, 1); + cb->future = future; + cb->callback = callback; + cb->context = context; + cb->db = db; + + array_push_back(&db->callbacks, &cb); + cb->id = ++db->callback_ids; + if (cb->id == 0) + cb->id = ++db->callback_ids; + + /* NOTE: The callback may be called immediately by this same thread. + This is checked within the callback. It may also be called at any + time after this call by another thread. So we must not access "cb" + again after this call. */ + cass_future_set_callback(future, driver_cassandra_future_callback, cb); +} + +static void connect_callback(CassFuture *future, void *context) +{ + struct cassandra_db *db = context; + + if (cass_future_error_code(future) != CASS_OK) { + driver_cassandra_log_error(db, future, + "Couldn't connect to Cassandra"); + driver_cassandra_close(db, "Couldn't connect to Cassandra"); + return; + } + driver_cassandra_set_state(db, SQL_DB_STATE_IDLE); + if (db->ioloop != NULL) { + /* driver_cassandra_sync_init() waiting for connection to + finish */ + io_loop_stop(db->ioloop); + } + driver_cassandra_prepare_pending(db); + driver_cassandra_send_queries(db); +} + +static int driver_cassandra_connect(struct sql_db *_db) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + CassFuture *future; + + i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED); + + if (pipe(db->fd_pipe) < 0) { + e_error(_db->event, "pipe() failed: %m"); + return -1; + } + db->io_pipe = io_add(db->fd_pipe[0], IO_READ, + driver_cassandra_input, db); + driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING); + + future = cass_session_connect_keyspace(db->session, db->cluster, + db->keyspace); + driver_cassandra_set_callback(future, db, connect_callback, db); + return 0; +} + +static void driver_cassandra_disconnect(struct sql_db *_db) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + + driver_cassandra_close(db, "Disconnected"); +} + +static const char * +driver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED, + const char *string) +{ + string_t *escaped; + unsigned int i; + + if (strchr(string, '\'') == NULL) + return string; + escaped = t_str_new(strlen(string)+10); + for (i = 0; string[i] != '\0'; i++) { + if (string[i] == '\'') + str_append_c(escaped, '\''); + str_append_c(escaped, string[i]); + } + return str_c(escaped); +} + +static int driver_cassandra_parse_connect_string(struct cassandra_db *db, + const char *connect_string, + const char **error_r) +{ + const char *const *args, *key, *value, *error; + string_t *hosts = t_str_new(64); + bool read_fallback_set = FALSE, write_fallback_set = FALSE; + bool delete_fallback_set = FALSE; + + db->log_level = CASS_LOG_WARN; + db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000; + db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000; + db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS; + + args = t_strsplit_spaces(connect_string, " "); + for (; *args != NULL; args++) { + value = strchr(*args, '='); + if (value == NULL) { + *error_r = t_strdup_printf( + "Missing value in connect string: %s", *args); + return -1; + } + key = t_strdup_until(*args, value++); + + if (str_begins(key, "ssl_")) + db->init_ssl = TRUE; + + if (strcmp(key, "host") == 0) { + if (str_len(hosts) > 0) + str_append_c(hosts, ','); + str_append(hosts, value); + } else if (strcmp(key, "port") == 0) { + if (net_str2port(value, &db->port) < 0) { + *error_r = t_strdup_printf( + "Invalid port: %s", value); + return -1; + } + } else if (strcmp(key, "dbname") == 0 || + strcmp(key, "keyspace") == 0) { + i_free(db->keyspace); + db->keyspace = i_strdup(value); + } else if (strcmp(key, "user") == 0) { + i_free(db->user); + db->user = i_strdup(value); + } else if (strcmp(key, "password") == 0) { + i_free(db->password); + db->password = i_strdup(value); + } else if (strcmp(key, "read_consistency") == 0) { + if (consistency_parse(value, &db->read_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown read_consistency: %s", value); + return -1; + } + } else if (strcmp(key, "read_fallback_consistency") == 0) { + if (consistency_parse(value, &db->read_fallback_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown read_fallback_consistency: %s", value); + return -1; + } + read_fallback_set = TRUE; + } else if (strcmp(key, "write_consistency") == 0) { + if (consistency_parse(value, + &db->write_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown write_consistency: %s", value); + return -1; + } + } else if (strcmp(key, "write_fallback_consistency") == 0) { + if (consistency_parse(value, + &db->write_fallback_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown write_fallback_consistency: %s", + value); + return -1; + } + write_fallback_set = TRUE; + } else if (strcmp(key, "delete_consistency") == 0) { + if (consistency_parse(value, + &db->delete_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown delete_consistency: %s", value); + return -1; + } + } else if (strcmp(key, "delete_fallback_consistency") == 0) { + if (consistency_parse(value, + &db->delete_fallback_consistency) < 0) { + *error_r = t_strdup_printf( + "Unknown delete_fallback_consistency: %s", + value); + return -1; + } + delete_fallback_set = TRUE; + } else if (strcmp(key, "log_level") == 0) { + if (log_level_parse(value, &db->log_level) < 0) { + *error_r = t_strdup_printf( + "Unknown log_level: %s", value); + return -1; + } + } else if (strcmp(key, "debug_queries") == 0) { + db->debug_queries = TRUE; + } else if (strcmp(key, "latency_aware_routing") == 0) { + db->latency_aware_routing = TRUE; + } else if (strcmp(key, "version") == 0) { + if (str_to_uint(value, &db->protocol_version) < 0) { + *error_r = t_strdup_printf( + "Invalid version: %s", value); + return -1; + } + } else if (strcmp(key, "num_threads") == 0) { + if (str_to_uint(value, &db->num_threads) < 0) { + *error_r = t_strdup_printf( + "Invalid num_threads: %s", value); + return -1; + } + } else if (strcmp(key, "heartbeat_interval") == 0) { + if (settings_get_time(value, &db->heartbeat_interval_secs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid heartbeat_interval '%s': %s", + value, error); + return -1; + } + } else if (strcmp(key, "idle_timeout") == 0) { + if (settings_get_time(value, &db->idle_timeout_secs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid idle_timeout '%s': %s", + value, error); + return -1; + } + } else if (strcmp(key, "connect_timeout") == 0) { + if (settings_get_time_msecs(value, + &db->connect_timeout_msecs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid connect_timeout '%s': %s", + value, error); + return -1; + } + } else if (strcmp(key, "request_timeout") == 0) { + if (settings_get_time_msecs(value, + &db->request_timeout_msecs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid request_timeout '%s': %s", + value, error); + return -1; + } + } else if (strcmp(key, "warn_timeout") == 0) { + if (settings_get_time_msecs(value, + &db->warn_timeout_msecs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid warn_timeout '%s': %s", + value, error); + return -1; + } + } else if (strcmp(key, "metrics") == 0) { + i_free(db->metrics_path); + db->metrics_path = i_strdup(value); + } else if (strcmp(key, "execution_retry_interval") == 0) { + if (settings_get_time_msecs(value, + &db->execution_retry_interval_msecs, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid execution_retry_interval '%s': %s", + value, error); + return -1; + } +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + *error_r = t_strdup_printf( + "This cassandra version does not support execution_retry_interval"); + return -1; +#endif + } else if (strcmp(key, "execution_retry_times") == 0) { + if (str_to_uint(value, &db->execution_retry_times) < 0) { + *error_r = t_strdup_printf( + "Invalid execution_retry_times %s", + value); + return -1; + } +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + *error_r = t_strdup_printf( + "This cassandra version does not support execution_retry_times"); + return -1; +#endif + } else if (strcmp(key, "page_size") == 0) { + if (str_to_uint(value, &db->page_size) < 0) { + *error_r = t_strdup_printf( + "Invalid page_size: %s", + value); + return -1; + } + } else if (strcmp(key, "ssl_ca") == 0) { + db->ssl_ca_file = i_strdup(value); + } else if (strcmp(key, "ssl_cert_file") == 0) { + db->ssl_cert_file = i_strdup(value); + } else if (strcmp(key, "ssl_private_key_file") == 0) { + db->ssl_private_key_file = i_strdup(value); + } else if (strcmp(key, "ssl_private_key_password") == 0) { + db->ssl_private_key_password = i_strdup(value); + } else if (strcmp(key, "ssl_verify") == 0) { + if (strcmp(value, "none") == 0) { + db->ssl_verify_flags = CASS_SSL_VERIFY_NONE; + } else if (strcmp(value, "cert") == 0) { + db->ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT; + } else if (strcmp(value, "cert-ip") == 0) { + db->ssl_verify_flags = + CASS_SSL_VERIFY_PEER_CERT | + CASS_SSL_VERIFY_PEER_IDENTITY; +#if HAVE_DECL_CASS_SSL_VERIFY_PEER_IDENTITY_DNS == 1 + } else if (strcmp(value, "cert-dns") == 0) { + db->ssl_verify_flags = + CASS_SSL_VERIFY_PEER_CERT | + CASS_SSL_VERIFY_PEER_IDENTITY_DNS; +#endif + } else { + *error_r = t_strdup_printf( + "Unsupported ssl_verify flags: '%s'", + value); + return -1; + } + } else { + *error_r = t_strdup_printf( + "Unknown connect string: %s", key); + return -1; + } + } + + if (!read_fallback_set) + db->read_fallback_consistency = db->read_consistency; + if (!write_fallback_set) + db->write_fallback_consistency = db->write_consistency; + if (!delete_fallback_set) + db->delete_fallback_consistency = db->delete_consistency; + + if (str_len(hosts) == 0) { + *error_r = t_strdup_printf("No hosts given in connect string"); + return -1; + } + if (db->keyspace == NULL) { + *error_r = t_strdup_printf("No dbname given in connect string"); + return -1; + } + + if ((db->ssl_cert_file != NULL && db->ssl_private_key_file == NULL) || + (db->ssl_cert_file == NULL && db->ssl_private_key_file != NULL)) { + *error_r = "ssl_cert_file and ssl_private_key_file need to be both set"; + return -1; + } + + db->hosts = i_strdup(str_c(hosts)); + return 0; +} + +static void +driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest) +{ +#define ADD_UINT64(_struct, _field) \ + str_printfa(dest, "\""#_field"\": %llu,", \ + (unsigned long long)metrics._struct._field); +#define ADD_DOUBLE(_struct, _field) \ + str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field); + CassMetrics metrics; + + cass_session_get_metrics(db->session, &metrics); + str_append(dest, "{ \"requests\": {"); + ADD_UINT64(requests, min); + ADD_UINT64(requests, max); + ADD_UINT64(requests, mean); + ADD_UINT64(requests, stddev); + ADD_UINT64(requests, median); + ADD_UINT64(requests, percentile_75th); + ADD_UINT64(requests, percentile_95th); + ADD_UINT64(requests, percentile_98th); + ADD_UINT64(requests, percentile_99th); + ADD_UINT64(requests, percentile_999th); + ADD_DOUBLE(requests, mean_rate); + ADD_DOUBLE(requests, one_minute_rate); + ADD_DOUBLE(requests, five_minute_rate); + ADD_DOUBLE(requests, fifteen_minute_rate); + str_truncate(dest, str_len(dest)-1); + + str_append(dest, "}, \"stats\": {"); + ADD_UINT64(stats, total_connections); + ADD_UINT64(stats, available_connections); + ADD_UINT64(stats, exceeded_pending_requests_water_mark); + ADD_UINT64(stats, exceeded_write_bytes_water_mark); + str_truncate(dest, str_len(dest)-1); + + str_append(dest, "}, \"errors\": {"); + ADD_UINT64(errors, connection_timeouts); + ADD_UINT64(errors, pending_request_timeouts); + ADD_UINT64(errors, request_timeouts); + str_truncate(dest, str_len(dest)-1); + + str_append(dest, "}, \"queries\": {"); + for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) { + str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i], + db->counters[i]); + } + str_truncate(dest, str_len(dest)-1); + str_append(dest, "}}"); +} + +static void driver_cassandra_metrics_write(struct cassandra_db *db) +{ + struct var_expand_table tab[] = { + { '\0', NULL, NULL } + }; + string_t *path = t_str_new(64); + string_t *data; + const char *error; + int fd; + + if (var_expand(path, db->metrics_path, tab, &error) <= 0) { + e_error(db->api.event, "Failed to expand metrics_path=%s: %s", + db->metrics_path, error); + return; + } + + fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600); + if (fd == -1) { + e_error(db->api.event, "creat(%s) failed: %m", str_c(path)); + return; + } + data = t_str_new(1024); + driver_cassandra_get_metrics_json(db, data); + if (write_full(fd, str_data(data), str_len(data)) < 0) + e_error(db->api.event, "write(%s) failed: %m", str_c(path)); + i_close_fd(&fd); +} + +static void driver_cassandra_free(struct cassandra_db **_db) +{ + struct cassandra_db *db = *_db; + *_db = NULL; + + event_unref(&db->api.event); + i_free(db->metrics_path); + i_free(db->hosts); + i_free(db->error); + i_free(db->keyspace); + i_free(db->user); + i_free(db->password); + i_free(db->ssl_ca_file); + i_free(db->ssl_cert_file); + i_free(db->ssl_private_key_file); + i_free_and_null(db->ssl_private_key_password); + array_free(&db->api.module_contexts); + if (db->ssl != NULL) + cass_ssl_free(db->ssl); + i_free(db); +} + +static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error_r) +{ + buffer_t *buf = t_buffer_create(512); + CassError c_err; + + db->ssl = cass_ssl_new(); + i_assert(db->ssl != NULL); + + if (db->ssl_ca_file != NULL) { + if (buffer_append_full_file(buf, db->ssl_ca_file, SIZE_MAX, + error_r) < 0) + return -1; + if ((c_err = cass_ssl_add_trusted_cert(db->ssl, str_c(buf))) != CASS_OK) { + *error_r = cass_error_desc(c_err); + return -1; + } + } + + if (db->ssl_private_key_file != NULL && db->ssl_cert_file != NULL) { + buffer_set_used_size(buf, 0); + if (buffer_append_full_file(buf, db->ssl_private_key_file, + SIZE_MAX, error_r) < 0) + return -1; + c_err = cass_ssl_set_private_key(db->ssl, str_c(buf), + db->ssl_private_key_password); + safe_memset(buffer_get_modifiable_data(buf, NULL), 0, buf->used); + if (c_err != CASS_OK) { + *error_r = cass_error_desc(c_err); + return -1; + } + + buffer_set_used_size(buf, 0); + if (buffer_append_full_file(buf, db->ssl_cert_file, SIZE_MAX, error_r) < 0) + return -1; + if ((c_err = cass_ssl_set_cert(db->ssl, str_c(buf))) != CASS_OK) { + *error_r = cass_error_desc(c_err); + return -1; + } + } + + cass_ssl_set_verify_flags(db->ssl, db->ssl_verify_flags); + + return 0; +} + +static int driver_cassandra_init_full_v(const struct sql_settings *set, + struct sql_db **db_r, + const char **error_r) +{ + struct cassandra_db *db; + int ret; + + db = i_new(struct cassandra_db, 1); + db->api = driver_cassandra_db; + db->fd_pipe[0] = db->fd_pipe[1] = -1; + db->api.event = event_create(set->event_parent); + event_add_category(db->api.event, &event_category_cassandra); + event_set_append_log_prefix(db->api.event, "cassandra: "); + + T_BEGIN { + ret = driver_cassandra_parse_connect_string(db, + set->connect_string, error_r); + } T_END_PASS_STR_IF(ret < 0, error_r); + + if (ret < 0) { + driver_cassandra_free(&db); + return -1; + } + + if (db->init_ssl && driver_cassandra_init_ssl(db, error_r) < 0) { + driver_cassandra_free(&db); + return -1; + } + + driver_cassandra_init_log(); + cass_log_set_level(db->log_level); + if (db->log_level >= CASS_LOG_DEBUG) + event_set_forced_debug(db->api.event, TRUE); + + if (db->protocol_version > 0 && db->protocol_version < 4) { + /* binding with column indexes requires v4 */ + db->api.v.prepared_statement_init = NULL; + db->api.v.prepared_statement_deinit = NULL; + db->api.v.statement_init_prepared = NULL; + } + + db->timestamp_gen = cass_timestamp_gen_monotonic_new(); + db->cluster = cass_cluster_new(); + +#ifdef HAVE_CASS_CLUSTER_SET_USE_HOSTNAME_RESOLUTION + if ((db->ssl_verify_flags & CASS_SSL_VERIFY_PEER_IDENTITY_DNS) != 0) { + CassError c_err; + if ((c_err = cass_cluster_set_use_hostname_resolution( + db->cluster, cass_true)) != CASS_OK) { + *error_r = cass_error_desc(c_err); + driver_cassandra_free(&db); + return -1; + } + } +#endif + cass_cluster_set_ssl(db->cluster, db->ssl); + cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen); + cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs); + cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs); + cass_cluster_set_contact_points(db->cluster, db->hosts); + if (db->user != NULL && db->password != NULL) + cass_cluster_set_credentials(db->cluster, db->user, db->password); + if (db->port != 0) + cass_cluster_set_port(db->cluster, db->port); + if (db->protocol_version != 0) + cass_cluster_set_protocol_version(db->cluster, db->protocol_version); + if (db->num_threads != 0) + cass_cluster_set_num_threads_io(db->cluster, db->num_threads); + if (db->latency_aware_routing) + cass_cluster_set_latency_aware_routing(db->cluster, cass_true); + if (db->heartbeat_interval_secs != 0) + cass_cluster_set_connection_heartbeat_interval(db->cluster, + db->heartbeat_interval_secs); + if (db->idle_timeout_secs != 0) + cass_cluster_set_connection_idle_timeout(db->cluster, + db->idle_timeout_secs); +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0) + cass_cluster_set_constant_speculative_execution_policy( + db->cluster, db->execution_retry_interval_msecs, + db->execution_retry_times); +#endif + if (db->ssl != NULL) { + e_debug(db->api.event, "Enabling TLS for cluster"); + cass_cluster_set_ssl(db->cluster, db->ssl); + } + db->session = cass_session_new(); + if (db->metrics_path != NULL) + db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, + db); + i_array_init(&db->results, 16); + i_array_init(&db->callbacks, 16); + i_array_init(&db->pending_prepares, 16); + if (!main_thread_id_set) { + main_thread_id = pthread_self(); + main_thread_id_set = TRUE; + } + + *db_r = &db->api; + return 0; +} + +static void driver_cassandra_deinit_v(struct sql_db *_db) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + + driver_cassandra_close(db, "Deinitialized"); + + i_assert(array_count(&db->callbacks) == 0); + array_free(&db->callbacks); + i_assert(array_count(&db->results) == 0); + array_free(&db->results); + i_assert(array_count(&db->pending_prepares) == 0); + array_free(&db->pending_prepares); + + cass_session_free(db->session); + cass_cluster_free(db->cluster); + cass_timestamp_gen_free(db->timestamp_gen); + timeout_remove(&db->to_metrics); + sql_connection_log_finished(_db); + driver_cassandra_free(&db); +} + +static void driver_cassandra_result_unlink(struct cassandra_db *db, + struct cassandra_result *result) +{ + struct cassandra_result *const *results; + unsigned int i, count; + + results = array_get(&db->results, &count); + for (i = 0; i < count; i++) { + if (results[i] == result) { + array_delete(&db->results, i, 1); + return; + } + } + i_unreached(); +} + +static void driver_cassandra_log_result(struct cassandra_result *result, + bool all_pages, long long reply_usecs) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + struct timeval now; + unsigned int row_count; + + i_gettimeofday(&now); + + string_t *str = t_str_new(128); + str_printfa(str, "Finished %squery '%s' (", + result->is_prepared ? "prepared " : "", result->query); + if (result->timestamp != 0) + str_printfa(str, "timestamp=%"PRId64", ", result->timestamp); + if (all_pages) { + str_printfa(str, "%u pages in total, ", result->page_num); + row_count = result->total_row_count; + } else { + if (result->page_num > 0 || result->paging_continues) + str_printfa(str, "page %u, ", result->page_num); + row_count = result->row_count; + } + str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs, + timeval_diff_usecs(&now, &result->finish_time), + result->error != NULL ? result->error : "success"); + + struct event_passthrough *e = + sql_query_finished_event(&db->api, result->api.event, + result->query, result->error == NULL, + NULL); + if (result->error != NULL) + e->add_str("error", result->error); + + struct event *event = e->event(); + if (db->debug_queries) + event_set_forced_debug(event, TRUE); + if (reply_usecs/1000 >= db->warn_timeout_msecs) { + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++; + e_warning(event, "%s", str_c(str)); + } else { + e_debug(event, "%s", str_c(str)); + } +} + +static void driver_cassandra_result_free(struct sql_result *_result) +{ + struct cassandra_db *db = (struct cassandra_db *)_result->db; + struct cassandra_result *result = (struct cassandra_result *)_result; + long long reply_usecs; + + i_assert(!result->api.callback); + i_assert(result->callback == NULL); + + if (_result == db->sync_result) + db->sync_result = NULL; + + reply_usecs = timeval_diff_usecs(&result->finish_time, + &result->start_time); + driver_cassandra_log_result(result, FALSE, reply_usecs); + + if (result->page_num > 0 && !result->paging_continues) { + /* Multi-page query finishes now. Log a debug/warning summary + message about it separate from the per-page messages. */ + reply_usecs = timeval_diff_usecs(&result->finish_time, + &result->page0_start_time); + driver_cassandra_log_result(result, TRUE, reply_usecs); + } + + if (result->result != NULL) + cass_result_free(result->result); + if (result->iterator != NULL) + cass_iterator_free(result->iterator); + if (result->statement != NULL) + cass_statement_free(result->statement); + pool_unref(&result->row_pool); + event_unref(&result->api.event); + i_free(result->query); + i_free(result->error); + i_free(result); +} + +static void result_finish(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + bool free_result = TRUE; + + result->finished = TRUE; + result->finish_time = ioloop_timeval; + driver_cassandra_result_unlink(db, result); + + i_assert((result->error != NULL) == (result->iterator == NULL)); + + result->api.callback = TRUE; + T_BEGIN { + result->callback(&result->api, result->context); + } T_END; + result->api.callback = FALSE; + + free_result = db->sync_result != &result->api; + if (db->ioloop != NULL) + io_loop_stop(db->ioloop); + + i_assert(!free_result || result->api.refcount > 0); + result->callback = NULL; + if (free_result) + sql_result_unref(&result->api); +} + +static void query_resend_with_fallback(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + time_t last_warning = + ioloop_time - db->last_fallback_warning[result->query_type]; + + if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) { + e_warning(db->api.event, + "%s - retrying future %s queries with consistency %s (instead of %s)", + result->error, cassandra_query_type_names[result->query_type], + cass_consistency_string(result->fallback_consistency), + cass_consistency_string(result->consistency)); + db->last_fallback_warning[result->query_type] = ioloop_time; + } + i_free_and_null(result->error); + db->fallback_failures[result->query_type]++; + + result->consistency = result->fallback_consistency; + driver_cassandra_result_send_query(result); +} + +static void counters_inc_error(struct cassandra_db *db, CassError error) +{ + switch (error) { + case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++; + break; + case CASS_ERROR_LIB_REQUEST_QUEUE_FULL: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++; + break; + case CASS_ERROR_LIB_REQUEST_TIMED_OUT: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++; + break; + case CASS_ERROR_SERVER_WRITE_TIMEOUT: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++; + break; + case CASS_ERROR_SERVER_UNAVAILABLE: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++; + break; + default: + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++; + break; + } +} + +static bool query_error_want_fallback(CassError error) +{ + switch (error) { + case CASS_ERROR_LIB_WRITE_ERROR: + case CASS_ERROR_LIB_REQUEST_TIMED_OUT: + /* Communication problems on client side. Maybe it will work + with fallback consistency? */ + return TRUE; + case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: + /* The client library couldn't connect to enough Cassandra + nodes. The error message text is the same as for + CASS_ERROR_SERVER_UNAVAILABLE. */ + return TRUE; + case CASS_ERROR_SERVER_SERVER_ERROR: + case CASS_ERROR_SERVER_OVERLOADED: + case CASS_ERROR_SERVER_IS_BOOTSTRAPPING: + case CASS_ERROR_SERVER_READ_TIMEOUT: + case CASS_ERROR_SERVER_READ_FAILURE: + case CASS_ERROR_SERVER_WRITE_FAILURE: + /* Servers are having trouble. Maybe with fallback consistency + we can reach non-troubled servers? */ + return TRUE; + case CASS_ERROR_SERVER_UNAVAILABLE: + /* Cassandra server knows that there aren't enough nodes + available. "All hosts in current policy attempted and were + either unavailable or failed". */ + return TRUE; + case CASS_ERROR_SERVER_WRITE_TIMEOUT: + /* Cassandra server couldn't reach all the needed nodes. + This may be because it hasn't yet detected that the servers + are down, or because the servers are just too busy. We'll + try the fallback consistency to avoid unnecessary temporary + errors. */ + return TRUE; + default: + return FALSE; + } +} + +static enum sql_result_error_type +driver_cassandra_error_is_uncertain(CassError error) +{ + switch (error) { + case CASS_ERROR_SERVER_WRITE_FAILURE: + /* This happens when some of the replicas that were contacted + * by the coordinator replied with an error. */ + case CASS_ERROR_SERVER_WRITE_TIMEOUT: + /* A Cassandra timeout during a write query. */ + case CASS_ERROR_SERVER_UNAVAILABLE: + /* The coordinator knows there are not enough replicas alive + * to perform a query with the requested consistency level. */ + case CASS_ERROR_LIB_REQUEST_TIMED_OUT: + /* A request sent from the driver has timed out. */ + case CASS_ERROR_LIB_WRITE_ERROR: + /* A write error occured. */ + return SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN; + default: + return SQL_RESULT_ERROR_TYPE_UNKNOWN; + } +} + +static void query_callback(CassFuture *future, void *context) +{ + struct cassandra_result *result = context; + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + CassError error = cass_future_error_code(future); + + if (error != CASS_OK) { + const char *errmsg; + size_t errsize; + int msecs; + + cass_future_error_message(future, &errmsg, &errsize); + i_free(result->error); + + msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time); + counters_inc_error(db, error); + /* Timeouts bring uncertainty whether the query succeeded or + not. Also _SERVER_UNAVAILABLE could have actually written + enough copies of the data for the query to succeed. */ + result->api.error_type = driver_cassandra_error_is_uncertain(error); + result->error = i_strdup_printf( + "Query '%s' failed: %.*s (in %u.%03u secs%s)", + result->query, (int)errsize, errmsg, msecs/1000, msecs%1000, + result->page_num == 0 ? + "" : + t_strdup_printf(", page %u", result->page_num)); + + if (query_error_want_fallback(error) && + result->fallback_consistency != result->consistency) { + /* retry with fallback consistency */ + query_resend_with_fallback(result); + return; + } + result_finish(result); + return; + } + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++; + + if (result->fallback_consistency != result->consistency) { + /* non-fallback query finished successfully. if there had been + any fallbacks, reset them. */ + db->fallback_failures[result->query_type] = 0; + } + + result->result = cass_future_get_result(future); + result->iterator = cass_iterator_from_result(result->result); + result_finish(result); +} + +static void driver_cassandra_init_statement(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + + cass_statement_set_consistency(result->statement, result->consistency); + +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + cass_statement_set_is_idempotent(result->statement, cass_true); +#endif + if (db->page_size > 0) + cass_statement_set_paging_size(result->statement, db->page_size); +} + +static void driver_cassandra_result_send_query(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + CassFuture *future; + + i_assert(result->statement != NULL); + + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++; + if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE) + driver_cassandra_init_statement(result); + + future = cass_session_execute(db->session, result->statement); + driver_cassandra_set_callback(future, db, query_callback, result); +} + +static bool +driver_cassandra_want_fallback_query(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + unsigned int failure_count = db->fallback_failures[result->query_type]; + unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS; + struct timeval tv; + + if (failure_count == 0) + return FALSE; + /* double the retries every time. */ + for (i = 1; i < failure_count; i++) { + msecs *= 2; + if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) { + msecs = CASSANDRA_FALLBACK_MAX_RETRY_MSECS; + break; + } + } + /* If last primary query sent timestamp + msecs is older than current + time, we need to retry the primary query. Note that this practically + prevents multiple primary queries from being attempted + simultaneously, because the caller updates primary_query_last_sent + immediately when returning. + + The only time when multiple primary queries can be running in + parallel is when the earlier query is being slow and hasn't finished + early enough. This could even be a wanted feature, since while the + first query might have to wait for a timeout, Cassandra could have + been fixed in the meantime and the second query finishes + successfully. */ + tv = db->primary_query_last_sent[result->query_type]; + timeval_add_msecs(&tv, msecs); + return timeval_cmp(&ioloop_timeval, &tv) < 0; +} + +static int driver_cassandra_send_query(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + int ret; + + if (!SQL_DB_IS_READY(&db->api)) { + if ((ret = sql_connect(&db->api)) <= 0) { + if (ret < 0) + driver_cassandra_close(db, + "Couldn't connect to Cassandra"); + return ret; + } + } + + if (result->page0_start_time.tv_sec == 0) + result->page0_start_time = ioloop_timeval; + result->start_time = ioloop_timeval; + result->row_pool = pool_alloconly_create("cassandra result", 512); + switch (result->query_type) { + case CASSANDRA_QUERY_TYPE_READ: + result->consistency = db->read_consistency; + result->fallback_consistency = db->read_fallback_consistency; + break; + case CASSANDRA_QUERY_TYPE_READ_MORE: + /* consistency is already set and we don't want to fallback + at this point anymore. */ + result->fallback_consistency = result->consistency; + break; + case CASSANDRA_QUERY_TYPE_WRITE: + result->consistency = db->write_consistency; + result->fallback_consistency = db->write_fallback_consistency; + break; + case CASSANDRA_QUERY_TYPE_DELETE: + result->consistency = db->delete_consistency; + result->fallback_consistency = db->delete_fallback_consistency; + break; + case CASSANDRA_QUERY_TYPE_COUNT: + i_unreached(); + } + + if (driver_cassandra_want_fallback_query(result)) + result->consistency = result->fallback_consistency; + else + db->primary_query_last_sent[result->query_type] = ioloop_timeval; + + driver_cassandra_result_send_query(result); + result->query_sent = TRUE; + return 1; +} + +static void driver_cassandra_send_queries(struct cassandra_db *db) +{ + struct cassandra_result *const *results; + unsigned int i, count; + + results = array_get(&db->results, &count); + for (i = 0; i < count; i++) { + if (!results[i]->query_sent && results[i]->statement != NULL) { + if (driver_cassandra_send_query(results[i]) <= 0) + break; + } + } +} + +static void exec_callback(struct sql_result *_result ATTR_UNUSED, + void *context ATTR_UNUSED) +{ +} + +static struct cassandra_result * +driver_cassandra_query_init(struct cassandra_db *db, const char *query, + enum cassandra_query_type query_type, + bool is_prepared, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_result *result; + + result = i_new(struct cassandra_result, 1); + result->api = driver_cassandra_result; + result->api.db = &db->api; + result->api.refcount = 1; + result->callback = callback; + result->context = context; + result->query_type = query_type; + result->query = i_strdup(query); + result->is_prepared = is_prepared; + result->api.event = event_create(db->api.event); + array_push_back(&db->results, &result); + return result; +} + +static void +driver_cassandra_query_full(struct sql_db *_db, const char *query, + enum cassandra_query_type query_type, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + struct cassandra_result *result; + + result = driver_cassandra_query_init(db, query, query_type, FALSE, + callback, context); + result->statement = cass_statement_new(query, 0); + (void)driver_cassandra_send_query(result); +} + +static void driver_cassandra_exec(struct sql_db *db, const char *query) +{ + driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, + exec_callback, NULL); +} + +static void driver_cassandra_query(struct sql_db *db, const char *query, + sql_query_callback_t *callback, void *context) +{ + driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, + callback, context); +} + +static void cassandra_query_s_callback(struct sql_result *result, void *context) +{ + struct cassandra_db *db = context; + + db->sync_result = result; +} + +static void driver_cassandra_sync_init(struct cassandra_db *db) +{ + if (sql_connect(&db->api) < 0) + return; + db->orig_ioloop = current_ioloop; + db->ioloop = io_loop_create(); + if (IS_CONNECTED(db)) + return; + i_assert(db->api.state == SQL_DB_STATE_CONNECTING); + + db->io_pipe = io_loop_move_io(&db->io_pipe); + /* wait for connecting to finish */ + io_loop_run(db->ioloop); +} + +static void driver_cassandra_sync_deinit(struct cassandra_db *db) +{ + if (db->orig_ioloop == NULL) + return; + if (db->io_pipe != NULL) { + io_loop_set_current(db->orig_ioloop); + db->io_pipe = io_loop_move_io(&db->io_pipe); + io_loop_set_current(db->ioloop); + } + io_loop_destroy(&db->ioloop); +} + +static struct sql_result * +driver_cassandra_sync_query(struct cassandra_db *db, const char *query, + enum cassandra_query_type query_type) +{ + struct sql_result *result; + + i_assert(db->sync_result == NULL); + + switch (db->api.state) { + case SQL_DB_STATE_CONNECTING: + case SQL_DB_STATE_BUSY: + i_unreached(); + case SQL_DB_STATE_DISCONNECTED: + sql_not_connected_result.refcount++; + return &sql_not_connected_result; + case SQL_DB_STATE_IDLE: + break; + } + + driver_cassandra_query_full(&db->api, query, query_type, + cassandra_query_s_callback, db); + if (db->sync_result == NULL) { + db->io_pipe = io_loop_move_io(&db->io_pipe); + io_loop_run(db->ioloop); + } + + result = db->sync_result; + if (result == &sql_not_connected_result) { + /* we don't end up in cassandra's free function, so sync_result + won't be set to NULL if we don't do it here. */ + db->sync_result = NULL; + } else if (result == NULL) { + result = &sql_not_connected_result; + result->refcount++; + } + return result; +} + +static struct sql_result * +driver_cassandra_query_s(struct sql_db *_db, const char *query) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + struct sql_result *result; + + driver_cassandra_sync_init(db); + result = driver_cassandra_sync_query(db, query, + CASSANDRA_QUERY_TYPE_READ); + driver_cassandra_sync_deinit(db); + return result; +} + +static int +driver_cassandra_get_value(struct cassandra_result *result, + const CassValue *value, const char **str_r, + size_t *len_r) +{ + const unsigned char *output; + void *output_dup; + size_t output_size; + CassError rc; + const char *type; + + if (cass_value_is_null(value) != 0) { + *str_r = NULL; + *len_r = 0; + return 0; + } + + switch (cass_data_type_type(cass_value_data_type(value))) { + case CASS_VALUE_TYPE_INT: { + cass_int32_t num; + + rc = cass_value_get_int32(value, &num); + if (rc == CASS_OK) { + const char *str = t_strdup_printf("%d", num); + output_size = strlen(str); + output = (const void *)str; + } + type = "int32"; + break; + } + case CASS_VALUE_TYPE_TIMESTAMP: + case CASS_VALUE_TYPE_BIGINT: { + cass_int64_t num; + + rc = cass_value_get_int64(value, &num); + if (rc == CASS_OK) { + const char *str = t_strdup_printf("%lld", (long long)num); + output_size = strlen(str); + output = (const void *)str; + } + type = "int64"; + break; + } + default: + rc = cass_value_get_bytes(value, &output, &output_size); + type = "bytes"; + break; + } + if (rc != CASS_OK) { + i_free(result->error); + result->error = i_strdup_printf("Couldn't get value as %s: %s", + type, cass_error_desc(rc)); + return -1; + } + output_dup = p_malloc(result->row_pool, output_size + 1); + memcpy(output_dup, output, output_size); + *str_r = output_dup; + *len_r = output_size; + return 0; +} + +static int driver_cassandra_result_next_page(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + + if (db->page_size == 0) { + /* no paging */ + return 0; + } + if (cass_result_has_more_pages(result->result) == cass_false) + return 0; + + /* callers that don't support sql_query_more() will still get a useful + error message. */ + i_free(result->error); + result->error = i_strdup( + "Paged query has more results, but not supported by the caller"); + return SQL_RESULT_NEXT_MORE; +} + +static int driver_cassandra_result_next_row(struct sql_result *_result) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + const CassRow *row; + const CassValue *value; + const char *str; + size_t size; + unsigned int i; + int ret = 1; + + if (result->iterator == NULL) + return -1; + + if (cass_iterator_next(result->iterator) == 0) + return driver_cassandra_result_next_page(result); + result->row_count++; + result->total_row_count++; + + p_clear(result->row_pool); + p_array_init(&result->fields, result->row_pool, 8); + p_array_init(&result->field_sizes, result->row_pool, 8); + + row = cass_iterator_get_row(result->iterator); + for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) { + if (driver_cassandra_get_value(result, value, &str, &size) < 0) { + ret = -1; + break; + } + array_push_back(&result->fields, &str); + array_push_back(&result->field_sizes, &size); + } + return ret; +} + +static void +driver_cassandra_result_more(struct sql_result **_result, bool async, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_db *db = (struct cassandra_db *)(*_result)->db; + struct cassandra_result *new_result; + struct cassandra_result *old_result = + (struct cassandra_result *)*_result; + + /* Initialize the next page as a new sql_result */ + new_result = driver_cassandra_query_init(db, old_result->query, + CASSANDRA_QUERY_TYPE_READ_MORE, + old_result->is_prepared, + callback, context); + + /* Preserve the statement and update its paging state */ + new_result->statement = old_result->statement; + old_result->statement = NULL; + cass_statement_set_paging_state(new_result->statement, + old_result->result); + old_result->paging_continues = TRUE; + /* The caller did support paging. Clear out the "...not supported by + the caller" error text, so it won't be in the debug log output. */ + i_free_and_null(old_result->error); + + new_result->timestamp = old_result->timestamp; + new_result->consistency = old_result->consistency; + new_result->page_num = old_result->page_num + 1; + new_result->page0_start_time = old_result->page0_start_time; + new_result->total_row_count = old_result->total_row_count; + + sql_result_unref(*_result); + *_result = NULL; + + if (async) + (void)driver_cassandra_send_query(new_result); + else { + i_assert(db->api.state == SQL_DB_STATE_IDLE); + driver_cassandra_sync_init(db); + (void)driver_cassandra_send_query(new_result); + if (new_result->result == NULL) { + db->io_pipe = io_loop_move_io(&db->io_pipe); + io_loop_run(db->ioloop); + } + driver_cassandra_sync_deinit(db); + + callback(&new_result->api, context); + } +} + +static unsigned int +driver_cassandra_result_get_fields_count(struct sql_result *_result) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + + return array_count(&result->fields); +} + +static const char * +driver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED, + unsigned int idx ATTR_UNUSED) +{ + i_unreached(); +} + +static int +driver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED, + const char *field_name ATTR_UNUSED) +{ + i_unreached(); +} + +static const char * +driver_cassandra_result_get_field_value(struct sql_result *_result, + unsigned int idx) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + + return array_idx_elem(&result->fields, idx); +} + +static const unsigned char * +driver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED, + unsigned int idx ATTR_UNUSED, + size_t *size_r ATTR_UNUSED) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + const char *str; + const size_t *sizep; + + str = array_idx_elem(&result->fields, idx); + sizep = array_idx(&result->field_sizes, idx); + *size_r = *sizep; + return (const void *)str; +} + +static const char * +driver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED, + const char *field_name ATTR_UNUSED) +{ + i_unreached(); +} + +static const char *const * +driver_cassandra_result_get_values(struct sql_result *_result) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + + return array_front(&result->fields); +} + +static const char *driver_cassandra_result_get_error(struct sql_result *_result) +{ + struct cassandra_result *result = (struct cassandra_result *)_result; + + if (result->error != NULL) + return result->error; + return "FIXME"; +} + +static struct sql_transaction_context * +driver_cassandra_transaction_begin(struct sql_db *db) +{ + struct cassandra_transaction_context *ctx; + + ctx = i_new(struct cassandra_transaction_context, 1); + ctx->ctx.db = db; + ctx->ctx.event = event_create(db->event); + ctx->refcount = 1; + return &ctx->ctx; +} + +static void +driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx) +{ + struct cassandra_transaction_context *ctx = *_ctx; + + *_ctx = NULL; + i_assert(ctx->refcount > 0); + if (--ctx->refcount > 0) + return; + + event_unref(&ctx->ctx.event); + i_free(ctx->query); + i_free(ctx->error); + i_free(ctx); +} + +static void +transaction_set_failed(struct cassandra_transaction_context *ctx, + const char *error) +{ + if (ctx->failed) { + i_assert(ctx->error != NULL); + } else { + i_assert(ctx->error == NULL); + ctx->failed = TRUE; + ctx->error = i_strdup(error); + } +} + +static void +transaction_commit_callback(struct sql_result *result, void *context) +{ + struct cassandra_transaction_context *ctx = context; + struct sql_commit_result commit_result; + + i_zero(&commit_result); + if (sql_result_next_row(result) < 0) { + commit_result.error = sql_result_get_error(result); + commit_result.error_type = sql_result_get_error_type(result); + e_debug(sql_transaction_finished_event(&ctx->ctx)-> + add_str("error", commit_result.error)->event(), + "Transaction failed"); + } else { + e_debug(sql_transaction_finished_event(&ctx->ctx)->event(), + "Transaction committed"); + } + ctx->callback(&commit_result, ctx->context); + driver_cassandra_transaction_unref(&ctx); +} + +static void +driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, + sql_commit_callback_t *callback, void *context) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + struct cassandra_db *db = (struct cassandra_db *)_ctx->db; + enum cassandra_query_type query_type; + struct sql_commit_result result; + + i_zero(&result); + ctx->callback = callback; + ctx->context = context; + + if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) { + if (ctx->failed) + result.error = ctx->error; + + e_debug(sql_transaction_finished_event(_ctx)-> + add_str("error", "Rolled back")->event(), + "Transaction rolled back"); + callback(&result, context); + driver_cassandra_transaction_unref(&ctx); + return; + } + + /* just a single query, send it */ + const char *query = ctx->query != NULL ? + ctx->query : sql_statement_get_query(&ctx->stmt->stmt); + if (strncasecmp(query, "DELETE ", 7) == 0) + query_type = CASSANDRA_QUERY_TYPE_DELETE; + else + query_type = CASSANDRA_QUERY_TYPE_WRITE; + + if (ctx->query != NULL) { + struct cassandra_result *cass_result; + + cass_result = driver_cassandra_query_init(db, query, query_type, + FALSE, transaction_commit_callback, ctx); + cass_result->statement = cass_statement_new(query, 0); + if (ctx->query_timestamp != 0) { + cass_result->timestamp = ctx->query_timestamp; + cass_statement_set_timestamp(cass_result->statement, + ctx->query_timestamp); + } + (void)driver_cassandra_send_query(cass_result); + } else { + ctx->stmt->result = + driver_cassandra_query_init(db, query, query_type, TRUE, + transaction_commit_callback, ctx); + if (ctx->stmt->cass_stmt == NULL) { + /* wait for prepare to finish */ + } else { + ctx->stmt->result->statement = ctx->stmt->cass_stmt; + ctx->stmt->result->timestamp = ctx->stmt->timestamp; + (void)driver_cassandra_send_query(ctx->stmt->result); + pool_unref(&ctx->stmt->stmt.pool); + } + } +} + +static void +driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx) +{ + struct sql_transaction_context *_ctx = &ctx->ctx; + struct cassandra_db *db = (struct cassandra_db *)_ctx->db; + struct sql_result *result = NULL; + enum cassandra_query_type query_type; + + /* just a single query, send it */ + if (strncasecmp(ctx->query, "DELETE ", 7) == 0) + query_type = CASSANDRA_QUERY_TYPE_DELETE; + else + query_type = CASSANDRA_QUERY_TYPE_WRITE; + driver_cassandra_sync_init(db); + result = driver_cassandra_sync_query(db, ctx->query, query_type); + driver_cassandra_sync_deinit(db); + + if (sql_result_next_row(result) < 0) + transaction_set_failed(ctx, sql_result_get_error(result)); + sql_result_unref(result); +} + +static int +driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx, + const char **error_r) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + + if (ctx->stmt != NULL) { + /* nothing should be using this - don't bother implementing */ + i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements"); + } + + if (ctx->query != NULL && !ctx->failed) + driver_cassandra_try_commit_s(ctx); + *error_r = t_strdup(ctx->error); + + i_assert(ctx->refcount == 1); + i_assert((*error_r != NULL) == ctx->failed); + driver_cassandra_transaction_unref(&ctx); + return *error_r == NULL ? 0 : -1; +} + +static void +driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + + i_assert(ctx->refcount == 1); + driver_cassandra_transaction_unref(&ctx); +} + +static void +driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query, + unsigned int *affected_rows) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + + i_assert(affected_rows == NULL); + + if (ctx->query != NULL || ctx->stmt != NULL) { + transaction_set_failed(ctx, "Multiple changes in transaction not supported"); + return; + } + ctx->query = i_strdup(query); +} + +static const char * +driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED, + const unsigned char *data, size_t size) +{ + string_t *str = t_str_new(128); + + str_append(str, "0x"); + binary_to_hex_append(str, data, size); + return str_c(str); +} + +static CassError +driver_cassandra_bind_int(struct cassandra_sql_statement *stmt, + unsigned int column_idx, int64_t value) +{ + const CassDataType *data_type; + CassValueType value_type; + + i_assert(stmt->prep != NULL); + + /* statements require exactly correct value type */ + data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, + column_idx); + value_type = cass_data_type_type(data_type); + + switch (value_type) { + case CASS_VALUE_TYPE_INT: + if (value < INT32_MIN || value > INT32_MAX) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int32(stmt->cass_stmt, column_idx, + value); + case CASS_VALUE_TYPE_TIMESTAMP: + case CASS_VALUE_TYPE_BIGINT: + return cass_statement_bind_int64(stmt->cass_stmt, column_idx, + value); + case CASS_VALUE_TYPE_SMALL_INT: + if (value < INT16_MIN || value > INT16_MAX) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int16(stmt->cass_stmt, column_idx, + value); + case CASS_VALUE_TYPE_TINY_INT: + if (value < INT8_MIN || value > INT8_MAX) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int8(stmt->cass_stmt, column_idx, + value); + default: + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + } +} + +static void prepare_finish_arg(struct cassandra_sql_statement *stmt, + const struct cassandra_sql_arg *arg) +{ + CassError rc; + + if (arg->value_str != NULL) { + rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx, + arg->value_str); + } else if (arg->value_binary != NULL) { + rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx, + arg->value_binary, + arg->value_binary_size); + } else { + rc = driver_cassandra_bind_int(stmt, arg->column_idx, + arg->value_int64); + } + if (rc != CASS_OK) { + e_error(stmt->stmt.db->event, + "Statement '%s': Failed to bind column %u: %s", + stmt->stmt.query_template, arg->column_idx, + cass_error_desc(rc)); + } +} + +static void prepare_finish_statement(struct cassandra_sql_statement *stmt) +{ + const struct cassandra_sql_arg *arg; + + if (stmt->prep->prepared == NULL) { + i_assert(stmt->prep->error != NULL); + + if (stmt->result != NULL) { + stmt->result->error = i_strdup(stmt->prep->error); + result_finish(stmt->result); + } + pool_unref(&stmt->stmt.pool); + return; + } + stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared); + + if (stmt->timestamp != 0) + cass_statement_set_timestamp(stmt->cass_stmt, stmt->timestamp); + + if (array_is_created(&stmt->pending_args)) { + array_foreach(&stmt->pending_args, arg) + prepare_finish_arg(stmt, arg); + } + if (stmt->result != NULL) { + stmt->result->statement = stmt->cass_stmt; + stmt->result->timestamp = stmt->timestamp; + (void)driver_cassandra_send_query(stmt->result); + pool_unref(&stmt->stmt.pool); + } +} + +static void +prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt) +{ + struct cassandra_sql_statement *stmt; + + array_foreach_elem(&prep_stmt->pending_statements, stmt) + prepare_finish_statement(stmt); + array_clear(&prep_stmt->pending_statements); +} + +static void prepare_callback(CassFuture *future, void *context) +{ + struct cassandra_sql_prepared_statement *prep_stmt = context; + CassError error = cass_future_error_code(future); + + if (error != CASS_OK) { + const char *errmsg; + size_t errsize; + + cass_future_error_message(future, &errmsg, &errsize); + i_free(prep_stmt->error); + prep_stmt->error = i_strndup(errmsg, errsize); + } else { + prep_stmt->prepared = cass_future_get_prepared(future); + } + + prepare_finish_pending_statements(prep_stmt); +} + +static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt) +{ + struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db; + CassFuture *future; + + if (!SQL_DB_IS_READY(&db->api)) { + if (!prep_stmt->pending) { + prep_stmt->pending = TRUE; + array_push_back(&db->pending_prepares, &prep_stmt); + + if (sql_connect(&db->api) < 0) + i_unreached(); + } + return; + } + + /* clear the current error in case we're retrying */ + i_free_and_null(prep_stmt->error); + + future = cass_session_prepare(db->session, + prep_stmt->prep_stmt.query_template); + driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt); +} + +static void driver_cassandra_prepare_pending(struct cassandra_db *db) +{ + struct cassandra_sql_prepared_statement *prep_stmt; + + i_assert(SQL_DB_IS_READY(&db->api)); + + array_foreach_elem(&db->pending_prepares, prep_stmt) { + prep_stmt->pending = FALSE; + prepare_start(prep_stmt); + } + array_clear(&db->pending_prepares); +} + +static struct sql_prepared_statement * +driver_cassandra_prepared_statement_init(struct sql_db *db, + const char *query_template) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + i_new(struct cassandra_sql_prepared_statement, 1); + prep_stmt->prep_stmt.db = db; + prep_stmt->prep_stmt.refcount = 1; + prep_stmt->prep_stmt.query_template = i_strdup(query_template); + i_array_init(&prep_stmt->pending_statements, 4); + prepare_start(prep_stmt); + return &prep_stmt->prep_stmt; +} + +static void +driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + (struct cassandra_sql_prepared_statement *)_prep_stmt; + + i_assert(array_count(&prep_stmt->pending_statements) == 0); + if (prep_stmt->prepared != NULL) + cass_prepared_free(prep_stmt->prepared); + array_free(&prep_stmt->pending_statements); + i_free(prep_stmt->error); + i_free(prep_stmt->prep_stmt.query_template); + i_free(prep_stmt); +} + +static struct sql_statement * +driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED, + const char *query_template ATTR_UNUSED) +{ + pool_t pool = pool_alloconly_create("cassandra sql statement", 1024); + struct cassandra_sql_statement *stmt = + p_new(pool, struct cassandra_sql_statement, 1); + stmt->stmt.pool = pool; + return &stmt->stmt; +} + +static struct sql_statement * +driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + (struct cassandra_sql_prepared_statement *)_prep_stmt; + pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024); + struct cassandra_sql_statement *stmt = + p_new(pool, struct cassandra_sql_statement, 1); + + stmt->stmt.pool = pool; + stmt->stmt.query_template = + p_strdup(stmt->stmt.pool, prep_stmt->prep_stmt.query_template); + stmt->prep = prep_stmt; + + if (prep_stmt->prepared != NULL) { + /* statement is already prepared. we can use it immediately. */ + stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared); + } else { + if (prep_stmt->error != NULL) + prepare_start(prep_stmt); + /* need to wait until prepare is finished */ + array_push_back(&prep_stmt->pending_statements, &stmt); + } + return &stmt->stmt; +} + +static void +driver_cassandra_statement_abort(struct sql_statement *_stmt) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) + cass_statement_free(stmt->cass_stmt); +} + +static void +driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt, + const struct timespec *ts) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + cass_int64_t ts_usecs = + (cass_int64_t)ts->tv_sec * 1000000ULL + + ts->tv_nsec / 1000; + + i_assert(stmt->result == NULL); + + if (stmt->cass_stmt != NULL) + cass_statement_set_timestamp(stmt->cass_stmt, ts_usecs); + stmt->timestamp = ts_usecs; +} + +static struct cassandra_sql_arg * +driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt, + unsigned int column_idx) +{ + struct cassandra_sql_arg *arg; + + if (!array_is_created(&stmt->pending_args)) + p_array_init(&stmt->pending_args, stmt->stmt.pool, 8); + arg = array_append_space(&stmt->pending_args); + arg->column_idx = column_idx; + return arg; +} + +static void +driver_cassandra_statement_bind_str(struct sql_statement *_stmt, + unsigned int column_idx, + const char *value) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + if (stmt->cass_stmt != NULL) + cass_statement_bind_string(stmt->cass_stmt, column_idx, value); + else if (stmt->prep != NULL) { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_str = p_strdup(_stmt->pool, value); + } +} + +static void +driver_cassandra_statement_bind_binary(struct sql_statement *_stmt, + unsigned int column_idx, + const void *value, size_t value_size) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) { + cass_statement_bind_bytes(stmt->cass_stmt, column_idx, + value, value_size); + } else if (stmt->prep != NULL) { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_binary = value_size == 0 ? &uchar_nul : + p_memdup(_stmt->pool, value, value_size); + arg->value_binary_size = value_size; + } +} + +static void +driver_cassandra_statement_bind_int64(struct sql_statement *_stmt, + unsigned int column_idx, int64_t value) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) + driver_cassandra_bind_int(stmt, column_idx, value); + else if (stmt->prep != NULL) { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_int64 = value; + } +} + +static void +driver_cassandra_statement_query(struct sql_statement *_stmt, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + struct cassandra_db *db = (struct cassandra_db *)_stmt->db; + const char *query = sql_statement_get_query(_stmt); + bool is_prepared = stmt->cass_stmt != NULL || stmt->prep != NULL; + + stmt->result = driver_cassandra_query_init(db, query, + CASSANDRA_QUERY_TYPE_READ, + is_prepared, + callback, context); + if (stmt->cass_stmt != NULL) { + stmt->result->statement = stmt->cass_stmt; + stmt->result->timestamp = stmt->timestamp; + } else if (stmt->prep != NULL) { + /* wait for prepare to finish */ + return; + } else { + stmt->result->statement = cass_statement_new(query, 0); + stmt->result->timestamp = stmt->timestamp; + if (stmt->timestamp != 0) { + cass_statement_set_timestamp(stmt->result->statement, + stmt->timestamp); + } + } + (void)driver_cassandra_send_query(stmt->result); + pool_unref(&_stmt->pool); +} + +static struct sql_result * +driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED) +{ + i_panic("cassandra: sql_statement_query_s() not supported"); +} + +static void +driver_cassandra_update_stmt(struct sql_transaction_context *_ctx, + struct sql_statement *_stmt, + unsigned int *affected_rows) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + i_assert(affected_rows == NULL); + + if (ctx->query != NULL || ctx->stmt != NULL) { + transaction_set_failed(ctx, + "Multiple changes in transaction not supported"); + return; + } + if (stmt->prep != NULL) + ctx->stmt = stmt; + else { + ctx->query = i_strdup(sql_statement_get_query(_stmt)); + ctx->query_timestamp = stmt->timestamp; + pool_unref(&_stmt->pool); + } +} + +static bool driver_cassandra_have_work(struct cassandra_db *db) +{ + return array_not_empty(&db->pending_prepares) || + array_not_empty(&db->callbacks) || + array_not_empty(&db->results); +} + +static void driver_cassandra_wait(struct sql_db *_db) +{ + struct cassandra_db *db = (struct cassandra_db *)_db; + + if (!driver_cassandra_have_work(db)) + return; + + struct ioloop *prev_ioloop = current_ioloop; + db->ioloop = io_loop_create(); + db->io_pipe = io_loop_move_io(&db->io_pipe); + while (driver_cassandra_have_work(db)) + io_loop_run(db->ioloop); + + io_loop_set_current(prev_ioloop); + db->io_pipe = io_loop_move_io(&db->io_pipe); + io_loop_set_current(db->ioloop); + io_loop_destroy(&db->ioloop); +} + +const struct sql_db driver_cassandra_db = { + .name = "cassandra", + .flags = SQL_DB_FLAG_PREP_STATEMENTS, + + .v = { + .init_full = driver_cassandra_init_full_v, + .deinit = driver_cassandra_deinit_v, + .connect = driver_cassandra_connect, + .disconnect = driver_cassandra_disconnect, + .escape_string = driver_cassandra_escape_string, + .exec = driver_cassandra_exec, + .query = driver_cassandra_query, + .query_s = driver_cassandra_query_s, + .wait = driver_cassandra_wait, + + .transaction_begin = driver_cassandra_transaction_begin, + .transaction_commit = driver_cassandra_transaction_commit, + .transaction_commit_s = driver_cassandra_transaction_commit_s, + .transaction_rollback = driver_cassandra_transaction_rollback, + + .update = driver_cassandra_update, + + .escape_blob = driver_cassandra_escape_blob, + + .prepared_statement_init = driver_cassandra_prepared_statement_init, + .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit, + .statement_init = driver_cassandra_statement_init, + .statement_init_prepared = driver_cassandra_statement_init_prepared, + .statement_abort = driver_cassandra_statement_abort, + .statement_set_timestamp = driver_cassandra_statement_set_timestamp, + .statement_bind_str = driver_cassandra_statement_bind_str, + .statement_bind_binary = driver_cassandra_statement_bind_binary, + .statement_bind_int64 = driver_cassandra_statement_bind_int64, + .statement_query = driver_cassandra_statement_query, + .statement_query_s = driver_cassandra_statement_query_s, + .update_stmt = driver_cassandra_update_stmt, + } +}; + +const struct sql_result driver_cassandra_result = { + .v = { + driver_cassandra_result_free, + driver_cassandra_result_next_row, + driver_cassandra_result_get_fields_count, + driver_cassandra_result_get_field_name, + driver_cassandra_result_find_field, + driver_cassandra_result_get_field_value, + driver_cassandra_result_get_field_value_binary, + driver_cassandra_result_find_field_value, + driver_cassandra_result_get_values, + driver_cassandra_result_get_error, + driver_cassandra_result_more, + } +}; + +const char *driver_cassandra_version = DOVECOT_ABI_VERSION; + +void driver_cassandra_init(void); +void driver_cassandra_deinit(void); + +void driver_cassandra_init(void) +{ + sql_driver_register(&driver_cassandra_db); +} + +void driver_cassandra_deinit(void) +{ + sql_driver_unregister(&driver_cassandra_db); +} + +#endif |