diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
commit | 4f5791ebd03eaec1c7da0865a383175b05102712 (patch) | |
tree | 8ce7b00f7a76baa386372422adebbe64510812d4 /ctdb/client | |
parent | Initial commit. (diff) | |
download | samba-4f5791ebd03eaec1c7da0865a383175b05102712.tar.xz samba-4f5791ebd03eaec1c7da0865a383175b05102712.zip |
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ctdb/client')
-rw-r--r-- | ctdb/client/client.h | 1416 | ||||
-rw-r--r-- | ctdb/client/client_call.c | 184 | ||||
-rw-r--r-- | ctdb/client/client_connect.c | 532 | ||||
-rw-r--r-- | ctdb/client/client_control.c | 439 | ||||
-rw-r--r-- | ctdb/client/client_control_sync.c | 2676 | ||||
-rw-r--r-- | ctdb/client/client_db.c | 2791 | ||||
-rw-r--r-- | ctdb/client/client_event.c | 444 | ||||
-rw-r--r-- | ctdb/client/client_message.c | 607 | ||||
-rw-r--r-- | ctdb/client/client_message_sync.c | 197 | ||||
-rw-r--r-- | ctdb/client/client_private.h | 99 | ||||
-rw-r--r-- | ctdb/client/client_sync.h | 526 | ||||
-rw-r--r-- | ctdb/client/client_tunnel.c | 693 | ||||
-rw-r--r-- | ctdb/client/client_util.c | 137 |
13 files changed, 10741 insertions, 0 deletions
diff --git a/ctdb/client/client.h b/ctdb/client/client.h new file mode 100644 index 0000000..5f17403 --- /dev/null +++ b/ctdb/client/client.h @@ -0,0 +1,1416 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __CTDB_CLIENT_H__ +#define __CTDB_CLIENT_H__ + +#include <talloc.h> +#include <tevent.h> + +#include "protocol/protocol.h" +#include "common/srvid.h" + +/** + * @file client.h + * + * @brief Client api to talk to ctdb daemon + * + * This API allows one to connect to ctdb daemon, perform various database + * operations, send controls to ctdb daemon and send messages to other ctdb + * clients. + */ + +/** + * @brief The abstract context that holds client connection to ctdb daemon + */ +struct ctdb_client_context; + +/** + * @brief The abstract context that holds a tunnel endpoint + */ +struct ctdb_tunnel_context; + +/** + * @brief The abstract context that represents a clustered database + */ +struct ctdb_db_context; + +/** + * @brief The abstract context that represents a record from a distributed + * database + */ +struct ctdb_record_handle; + +/** + * @brief The abstract context that represents a transaction on a replicated + * database + */ +struct ctdb_transaction_handle; + +/** + * @brief Client callback function + * + * This function can be registered to be invoked in case of ctdb daemon going + * away. + */ +typedef void (*ctdb_client_callback_func_t)(void *private_data); + +/** + * @brief Tunnel callback function + * + * This function is registered when a tunnel endpoint is set up. When the + * tunnel endpoint receives a message, this function is invoked. + */ +typedef void (*ctdb_tunnel_callback_func_t)(struct ctdb_tunnel_context *tctx, + uint32_t srcnode, uint32_t reqid, + uint8_t *buf, size_t buflen, + void *private_data); + +/** + * @brief Async computation start to initialize a connection to ctdb daemon + * + * This returns a ctdb client context. Freeing this context will free the + * connection to ctdb daemon and any memory associated with it. + * + * If the connection to ctdb daemon is lost, the client will terminate + * automatically as the library will call exit(). If the client code + * wants to perform cleanup or wants to re-establish a new connection, + * the client should register a disconnect callback function. + * + * @see ctdb_client_set_disconnect_callback + * + * When a disconnect callback function is registered, client library will + * not call exit(). It is the responsibility of the client code to take + * appropriate action. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] sockpath Path to ctdb daemon unix domain socket + * @return new tevent request, NULL on failure + */ +struct tevent_req *ctdb_client_init_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + const char *sockpath); + +/** + * @brief Async computation end to initialize a connection to ctdb daemon + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc memory context + * @param[out] result The new ctdb client context + * @return true on success, false on failure + */ +bool ctdb_client_init_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, + struct ctdb_client_context **result); + +/** + * @brief Sync wrapper to initialize ctdb connection + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] sockpath Path to ctdb daemon unix domain socket + * @param[out] result The new ctdb client context + * @return 0 on succcess, errno on failure + */ +int ctdb_client_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + const char *sockpath, + struct ctdb_client_context **result); + +/** + * @brief Register a callback in case of client disconnection + * + * This allows client code to know if the connection to ctdb daemon is lost. + * This is useful if the client wants to re-establish a new connection to ctdb + * daemon. + * + * @param[in] client Client connection context + * @param[in] func Callback function + * @param[in] private_data private data for callback function + */ +void ctdb_client_set_disconnect_callback(struct ctdb_client_context *client, + ctdb_client_callback_func_t func, + void *private_data); + +/** + * @brief Get the node number of the current node + * + * @param[in] client Client connection context + * return node number on success, CTDB_UNKNOWN_PNN on error + */ +uint32_t ctdb_client_pnn(struct ctdb_client_context *client); + +/** + * @brief Client event loop waiting for a flag + * + * This can used to wait for asynchronous computations to complete. + * When this function is called, it will run tevent event loop and wait + * till the done flag is set to true. This function will block and will + * not return as long as the done flag is false. + * + * @param[in] ev Tevent context + * @param[in] done Boolean flag to indicate when to stop waiting + */ +void ctdb_client_wait(struct tevent_context *ev, bool *done); + +/** + * @brief Client event loop waiting for function to return true with timeout + * + * This can be used to wait for asynchronous computations to complete. + * When this function is called, it will run tevent event loop and wait + * till the done function returns true or if the timeout occurs. + * + * This function will return when either + * - done function returns true, or + * - timeout has occurred. + * + * @param[in] ev Tevent context + * @param[in] done_func Function flag to indicate when to stop waiting + * @param[in] private_data Passed to done function + * @param[in] timeout How long to wait + * @return 0 on success, ETIMEDOUT on timeout, and errno on failure + */ +int ctdb_client_wait_func_timeout(struct tevent_context *ev, + bool (*done_func)(void *private_data), + void *private_data, + struct timeval timeout); + +/** + * @brief Client event loop waiting for a flag with timeout + * + * This can be used to wait for asynchronous computations to complete. + * When this function is called, it will run tevent event loop and wait + * till the done flag is set to true or if the timeout occurs. + * + * This function will return when either + * - done flag is set to true, or + * - timeout has occurred. + * + * @param[in] ev Tevent context + * @param[in] done Boolean flag to indicate when to stop waiting + * @param[in] timeout How long to wait + * @return 0 on success, ETIMEDOUT on timeout, and errno on failure + */ +int ctdb_client_wait_timeout(struct tevent_context *ev, bool *done, + struct timeval timeout); + +/** + * @brief Async computation start to wait till recovery is completed + * + * CTDB daemon does not perform many operations while in recovery (especially + * database operations). This computation allows one to wait till ctdb daemon has + * finished recovery. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @return new tevent request, or NULL on failure + */ +struct tevent_req *ctdb_recovery_wait_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client); + +/** + * @brief Async computation end to wait till recovery is completed + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_recovery_wait_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for ctdb_recovery_wait computation + * + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @return true on success, false on failure + */ +bool ctdb_recovery_wait(struct tevent_context *ev, + struct ctdb_client_context *client); + +/** + * @brief Async computation start to migrate a database record + * + * This sends a request to ctdb daemon to migrate a database record to + * the local node. CTDB daemon will locate the data master for the record + * and will migrate record (and the data master) to the current node. + * + * @see ctdb_fetch_lock_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] request CTDB request data + * @return a new tevent req, or NULL on failure + */ +struct tevent_req *ctdb_client_call_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_req_call *request); + +/** + * @brief Async computation end to migrate a database record + * + * @param[in] req Tevent request + * @param[in] mem_ctx Talloc memory context + * @param[out] reply CTDB reply data + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_client_call_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct ctdb_reply_call **reply, int *perr); + + +/** + * @brief Async computation start to send a message to remote client(s) + * + * This sends a message to ctdb clients on a remote node. All the + * messages are associated with a specific SRVID. All the clients on the + * remote node listening to that SRVID, will get the message. + * + * Clients can register and deregister for messages for a SRVID using + * ctdb_client_set_message_handler() and ctdb_client_remove_message_handler(). + * + * @see ctdb_client_set_message_handler_send, + * ctdb_client_remove_message_handler_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] destnode Remote node id + * @param[in] message Message to send + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct ctdb_req_message *message); + +/** + * @brief Async computation end to send a message to remote client(s) + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_client_message_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper to send a message to client(s) on remote node + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] destnode Node id + * @param[in] message Message to send + */ +int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, struct ctdb_req_message *message); + +/** + * @brief Async computation start to send a message to multiple nodes + * + * This sends a message to ctdb clients on multiple remote nodes. All the + * messages are associated with a specific SRVID. All the clients on remote + * nodes listening to that SRVID, will get the message. + * + * Clients can register and deregister for messages for a SRVID using + * ctdb_client_set_message_handler() and ctdb_client_remove_message_handler(). + * + * @see ctdb_client_set_message_handler_send, + * ctdb_client_remove_message_handler_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] pnn_list List of node ids + * @param[in] count Number of node ids + * @param[in] message Message to send + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_message_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message); + +/** + * @brief Async computation end to send a message to multiple nodes + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc memory context + * @param[out] perr_list The status from each node id + * @return true on success, false on failure + * + * If perr_list is not NULL, then the status (0 on success, errno on failure) + * of sending message to each of the node in the specified node list. The + * perr_list is an array of the same size as of pnn_list. + */ +bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list); + +/** + * @brief Sync wrapper to send a message to multiple nodes + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] pnn_list List of node ids + * @param[in] count Number of node ids + * @param[in] message Message to send + * @param[out] perr_list The status from each node id + * @return 0 on success, errno on failure + */ +int ctdb_client_message_multi(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message, + int **perr_list); + +/** + * @brief Async computation start to receive messages for a SRVID + * + * This computation informs ctdb that the client is interested in all messages + * for a specific SRVID. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] srvid SRVID + * @param[in] handler Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_set_message_handler_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, + srvid_handler_fn handler, + void *private_data); + +/** + * @brief Async computation end to receive messages for a SRVID + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_client_set_message_handler_recv(struct tevent_req *req, int *perr); + +/** + * Sync wrapper to receive messages for a SRVID + * + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] srvid SRVID + * @param[in] handler Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @return 0 on success, errno on failure + */ +int ctdb_client_set_message_handler(struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, srvid_handler_fn handler, + void *private_data); + +/** + * @brief Async computation start to stop receiving messages for a SRVID + * + * This computation informs ctdb that the client is no longer interested in + * messages for a specific SRVID. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] srvid SRVID + * @param[in] private_data Private data used to register callback + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_remove_message_handler_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, + void *private_data); + +/** + * @brief Async computation end to stop receiving messages for a SRVID + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_client_remove_message_handler_recv(struct tevent_req *req, + int *perr); + +/** + * Sync wrapper to stop receiving messages for a SRVID + * + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] srvid SRVID + * @param[in] private_data Private data used to register callback + * @return 0 on success, errno on failure + */ +int ctdb_client_remove_message_handler(struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, void *private_data); + +/** + * @brief Async computation start to send a control to ctdb daemon + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] destnode Node id + * @param[in] timeout How long to wait + * @param[in] request Control request + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_control_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct timeval timeout, + struct ctdb_req_control *request); + +/** + * @brief Async computation end to send a control to ctdb daemon + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc memory context + * @param[out] preply Control reply + * @return true on success, false on failure + */ +bool ctdb_client_control_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, + struct ctdb_reply_control **preply); + +/** + * @brief Sync wrapper to send a control to ctdb daemon + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] destnode Node id + * @param[in] timeout How long to wait + * @param[in] request Control request + * @param[out] preply Control reply + * @return 0 on success, errno on failure + */ +int ctdb_client_control(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct timeval timeout, + struct ctdb_req_control *request, + struct ctdb_reply_control **preply); + +/** + * @brief Async computation start to send a control to multiple nodes + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] pnn_list List of node ids + * @param[in] count Number of node ids + * @param[in] timeout How long to wait + * @param[in] request Control request + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_client_control_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct timeval timeout, + struct ctdb_req_control *request); + +/** + * @brief Async computation end to send a control to multiple nodes + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc memory context + * @param[out] perr_list Status from each node + * @param[out] preply Control reply from each node + * @return true on success, false on failure + */ +bool ctdb_client_control_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list, + struct ctdb_reply_control ***preply); + +/** + * @brief Sync wrapper to send a control to multiple nodes + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] pnn_list List of node ids + * @param[in] count Number of node ids + * @param[in] timeout How long to wait + * @param[in] request Control request + * @param[out] perr_list Status from each node + * @param[out] preply Control reply from each node + * @return 0 on success, errno on failure + */ +int ctdb_client_control_multi(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct timeval timeout, + struct ctdb_req_control *request, + int **perr_list, + struct ctdb_reply_control ***preply); + +/** + * @brief Check err_list for errors + * + * This is a convenience function to parse the err_list returned from + * functions that send requests to multiple nodes. + * + * If status from any of the node is non-zero, then return first non-zero + * status. + * + * If status from all the nodes is 0, then return 0. + * + * @param[in] pnn_list List of node ids + * @param[in] count Number of node ids + * @param[in] err_list Status from each node + * @param[out] pnn Node id in case of failure + * @return 0 if no failures, status from first failure + */ +int ctdb_client_control_multi_error(uint32_t *pnn_list, int count, + int *err_list, uint32_t *pnn); + +/** + * @brief Async computation start to setup a tunnel endpoint + * + * This computation sets up a tunnel endpoint corresponding to a tunnel_id. + * A tunnel is a ctdb transport to deliver new protocol between endpoints. + * + * For two endpoints to communicate using new protocol, + * 1. Set up tunnel endpoints + * 2. Send requests + * 3. Send replies + * 4. Destroy tunnel endpoints + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] tunnel_id Unique tunnel id + * @param[in] callback Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_setup_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, + void *private_data); + +/** + * @brief Async computation end to setup a tunnel + * + * @param[in] req Tevent request + * @param[in] perr errno in case of failure + * @param[out] result A new tunnel context + * @return true on success, false on failure + * + * Tunnel context should never be freed by user. + */ +bool ctdb_tunnel_setup_recv(struct tevent_req *req, int *perr, + struct ctdb_tunnel_context **result); + +/** + * @brief Sync wrapper for ctdb_tunnel_setup computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] tunnel_id Unique tunnel id + * @param[in] callback Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @param[out] result A new tunnel context + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, void *private_data, + struct ctdb_tunnel_context **result); + +/** + * @brief Async computation start to destroy a tunnel endpoint + * + * This computation destroys the tunnel endpoint. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_destroy_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx); + +/** + * @brief Async computation end to destroy a tunnel endpoint + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_tunnel_destroy_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for ctdb_tunnel_destroy computation + * + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_destroy(struct tevent_context *ev, + struct ctdb_tunnel_context *tctx); + +/** + * @brief Async computation start to send a request via a tunnel + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] timeout How long to wait + * @param[in] buf Message to send + * @param[in] buflen Size of the message to send + * @param[in] wait_for_reply Whether to wait for reply + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_request_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + uint32_t destnode, + struct timeval timeout, + uint8_t *buf, size_t buflen, + bool wait_for_reply); + +/** + * @brief Async computation end to send a request via a tunnel + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc context + * @param[out] buf Reply data if expected + * @param[out] buflen Size of reply data if expected + * @return true on success, false on failure + */ +bool ctdb_tunnel_request_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, uint8_t **buf, + size_t *buflen); + +/** + * @brief Sync wrapper for ctdb_tunnel_request computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] timeout How long to wait + * @param[in] buf Message to send + * @param[in] buflen Size of the message to send + * @param[in] wait_for_reply Whether to wait for reply + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_request(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, uint32_t destnode, + struct timeval timeout, uint8_t *buf, size_t buflen, + bool wait_for_reply); + +/** + * @brief Async computation start to send a reply via a tunnel + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] reqid Request id + * @param[in] timeout How long to wait + * @param[in] buf Reply data + * @param[in] buflen Size of reply data + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_reply_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + uint32_t destnode, uint32_t reqid, + struct timeval timeout, + uint8_t *buf, size_t buflen); + +/** + * @brief Async computation end to send a reply via a tunnel + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_tunnel_reply_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for ctdb_tunnel_reply computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] reqid Request id + * @param[in] timeout How long to wait + * @param[in] buf Reply data + * @param[in] buflen Size of reply data + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_reply(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, uint32_t destnode, + uint32_t reqid, struct timeval timeout, + uint8_t *buf, size_t buflen); + +/** + * @brief Async computation start to attach a database + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in[ client Client connection context + * @param[in] timeout How long to wait + * @param[in] db_name Name of the database + * @param[in] db_flags Database flags + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags); + +/** + * @brief Async computation end to attach a database + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[out] result New database context + * @return true on success, false on failure + */ +bool ctdb_attach_recv(struct tevent_req *req, int *perr, + struct ctdb_db_context **result); + +/** + * @brief Sync wrapper to attach a database + * + * @param[in] ev Tevent context + * @param[in[ client Client connection context + * @param[in] timeout How long to wait + * @param[in] db_name Name of the database + * @param[in] db_flags Database flags + * @param[out] result New database context + * @return 0 on success, errno on failure + */ +int ctdb_attach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags, + struct ctdb_db_context **result); + +/** + * @brief Async computation start to detach a database + * + * Only volatile databases can be detached at runtime. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in[ client Client connection context + * @param[in] timeout How long to wait + * @param[in] db_id Database id + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id); + +/** + * @brief Async computation end to detach a database + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_detach_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper to detach a database + * + * Only volatile databases can be detached at runtime. + * + * @param[in] ev Tevent context + * @param[in[ client Client connection context + * @param[in] timeout How long to wait + * @param[in] db_id Database id + * @return 0 on success, errno on failure + */ +int ctdb_detach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id); + + +/** + * @brief Get database id from database context + * + * @param[in] db Database context + * @return database id + */ +uint32_t ctdb_db_id(struct ctdb_db_context *db); + +/** + * @brief Traverse a database locally on the node + * + * This function traverses a database locally on the node and for each record + * calls the parser function. If the parser function returns 1, the traverse + * will terminate. If parser function returns 0, the traverse will continue + * till all records in database are parsed. + * + * This is useful for replicated databases, since each node has exactly the + * same records. + * + * @param[in] db Database context + * @param[in] readonly Is the traversal for reading or updating + * @param[in] extract_header Whether to extract ltdb header from record data + * @param[in] parser Record parsing function + * @param[in] private_data Private data for parser function + * @return 0 on success, non-zero return value from parser function + */ +int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, + bool extract_header, + ctdb_rec_parser_func_t parser, void *private_data); + +/** + * @brief Async computation start to a cluster-wide database traverse + * + * This function traverses a database on all the nodes and for each record + * calls the parser function. If the parser function returns 1, the traverse + * will terminate. If parser function returns 0, the traverse will continue + * till all records all on nodes are parsed. + * + * This is useful for distributed databases as the records are distributed + * among the cluster nodes. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] db Database context + * @param[in] destnode Node id + * @param[in] timeout How long to wait + * @param[in] parser Record parser function + * @param[in] private_data Private data for parser + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, + struct timeval timeout, + ctdb_rec_parser_func_t parser, + void *private_data); + +/** + * @brief Async computation end to a cluster-wide database traverse + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for a cluster-wide database traverse + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] db Database context + * @param[in] destnode Node id + * @param[in] timeout How long to wait + * @param[in] parser Record parser function + * @param[in] private_data Private data for parser + * @return 0 on success, errno on failure or non-zero status from parser + */ +int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, struct timeval timeout, + ctdb_rec_parser_func_t parser, void *private_data); + +/** + * @brief Fetch a record from a local database + * + * This function is primarily for internal use. + * Clients should use ctdb_fetch_lock() instead. + * + * @param[in] db Database context + * @param[in] key Record key + * @param[out] header Record header + * @param[in] mem_ctx Talloc memory context + * @param[out] data Record data + */ +int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, TDB_DATA *data); + +/** + * @brief Async computation start to fetch a locked record + * + * This function is used to fetch a record from a distributed database. + * + * If the record is already available on the local node, then lock the + * record and return the record handle. + * + * If the record is not available on the local node, send a CTDB request to + * migrate the record. Once the record is migrated to the local node, lock + * the record and return the record handle. + * + * At the end of the computation, a record handle is returned which holds + * the record lock. When the record handle is freed, the record is unlocked. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client context + * @param[in] db Database context + * @param[in] key Record key + * @param[in] readonly Whether to request readonly copy of the record + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + TDB_DATA key, bool readonly); + +/** + * @brief Async computation end to fetch a locked record + * + * @param[in] req Tevent request + * @param[out] header Record header + * @param[in] mem_ctx Talloc memory context + * @param[out] data Record data + * @param[out] perr errno in case of failure + * @return a new record handle, NULL on failure + */ +struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, + TDB_DATA *data, int *perr); + +/** + * @brief Sync wrapper to fetch a locked record + * + * @see ctdb_fetch_lock_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client context + * @param[in] db Database context + * @param[in] key Record key + * @param[in] readonly Whether to request readonly copy of the record + * @param[out] header Record header + * @param[out] data Record data + * return 0 on success, errno on failure + */ +int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, TDB_DATA key, bool readonly, + struct ctdb_record_handle **out, + struct ctdb_ltdb_header *header, TDB_DATA *data); + +/** + * @brief Update a locked record + * + * This function is used to update a record in a distributed database. + * + * This function should NOT be used to store null data, instead use + * ctdb_delete_record(). + * + * @param[in] h Record handle + * @param[in] data New record data + * @return 0 on success, errno on failure + */ +int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data); + +/** + * @brief Async computation start to delete a locked record + * + * This function is used to delete a record in a distributed database + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] h Record handle + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_record_handle *h); + +/** + * @brief Async computation end to delete a locked record + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_delete_record_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper to delete a locked record + * + * @see ctdb_delete_record_send + * + * @param[in] h Record handle + * @return 0 on success, errno on failure + */ +int ctdb_delete_record(struct ctdb_record_handle *h); + +/** + * @brief Async computation start to get a global database lock + * + * Functions related to global locks are primarily used internally for + * implementing transaction api. + * + * Clients should use transaction api directly. + * @see ctdb_transaction_start_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client context + * @param[in] db Database context for g_lock.tdb + * @param[in] keyname Record key + * @param[in] sid Server id + * @param[in] readonly Lock type + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id *sid, + bool readonly); + +/** + * @brief Async computation end to get a global database lock + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr); + +/** + * @brief Async computation start to release a global database lock + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] db Database context + * @param[in] keyname Record key + * @param[in] sid Server id + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id sid); + +/** + * @brief Async computation end to release a global database lock + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr); + +/** + * @brief Async computation start to start a transaction + * + * This function is used to start a transaction on a replicated database. + * + * To perform any updates on a replicated database + * - start transaction + * - fetch record (ctdb_transaction_fetch_record) + * - store record (ctdb_transaction_store_record) + * - delete record (ctdb_transaction_delete_record) + * - commit transaction (ctdb_transaction_commit_send), or + * - cancel transaction (ctdb_transaction_cancel_send) + * + * Starting a transaction will return a transaction handle. This is used + * for updating records under a transaction. This handle is automatically + * freed once the transacion is committed or cancelled. + * + * Clients should NOT free the transaction handle. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] timeout How long to wait + * @param[in] db Database context + * @param[in] readonly Is transaction readonly + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, + bool readonly); + +/** + * @brief Async computation end to start a transaction + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return a new transaction handle on success, NULL on failure + */ +struct ctdb_transaction_handle *ctdb_transaction_start_recv( + struct tevent_req *req, + int *perr); + +/** + * @brief Sync wrapper to start a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] timeout How long to wait + * @param[in] db Database context + * @param[in] readonly Is transaction readonly + * @param[out] result a new transaction handle + * @return 0 on success, errno on failure + */ +int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, bool readonly, + struct ctdb_transaction_handle **result); + +/** + * @brief Fetch a record under a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] h Transaction handle + * @param[in] key Record key + * @param[in] mem_ctx Talloc memory context + * @param[out] data Record data + * @return 0 on success, errno on failure + */ +int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h, + TDB_DATA key, + TALLOC_CTX *mem_ctx, TDB_DATA *data); + +/** + * @brief Store a record under a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] h Transaction handle + * @param[in] key Record key + * @param[in] data New record data + * @return 0 on success, errno on failure + */ +int ctdb_transaction_store_record(struct ctdb_transaction_handle *h, + TDB_DATA key, TDB_DATA data); + +/** + * @brief Delete a record under a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] h Transaction handle + * @param[in] key Record key + * @return 0 on success, errno on failure + */ +int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h, + TDB_DATA key); + +/** + * @brief Async computation start to commit a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] timeout How long to wait + * @param[in] h Transaction handle + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_transaction_commit_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h); + +/** + * @brief Async computation end to commit a transaction + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper to commit a transaction + * + * @see ctdb_transaction_commit_send + * + * @param[in] h Transaction handle + * @return 0 on success, errno on failure + */ +int ctdb_transaction_commit(struct ctdb_transaction_handle *h); + +/** + * @brief Async computation start to cancel a transaction + * + * @see ctdb_transaction_start_send + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] timeout How long to wait + * @param[in] h Transaction handle + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_transaction_cancel_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h); + +/** + * @brief Async computation end to cancel a transaction + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper to cancel a transaction + * + * @see ctdb_transaction_cancel_send + * + * @param[in] h Transaction handle + * @return 0 on success, errno on failure + */ +int ctdb_transaction_cancel(struct ctdb_transaction_handle *h); + +/** + * @brief Utility function to extract a list of node ids from nodemap + * + * @param[in] nodemap Node map + * @param[in] flags_mask Flags to match on + * @param[in] exclude_pnn Node id to exclude from the list + * @param[in] mem_ctx Talloc memory context + * @param[out] pnn_list List of node ids + * @return number of node ids on success, -1 on failure + */ +int list_of_nodes(struct ctdb_node_map *nodemap, + uint32_t flags_mask, uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list); + +/** + * @brief Utility function to extract a list of node ids for active nodes + * + * @param[in] nodemap Node map + * @param[in] exclude_pnn Node id to exclude from the list + * @param[in] mem_ctx Talloc memory context + * @param[out] pnn_list List of node ids + * @return number of node ids on success, -1 on failure + */ +int list_of_active_nodes(struct ctdb_node_map *nodemap, uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list); + +/** + * @brief Utility function to extract a list of node ids for connected nodes + * + * @param[in] nodemap Node map + * @param[in] exclude_pnn Node id to exclude from the list + * @param[in] mem_ctx Talloc memory context + * @param[out] pnn_list List of node ids + * @return number of node ids on success, -1 on failure + */ +int list_of_connected_nodes(struct ctdb_node_map *nodemap, + uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list); + +/** + * @brief Construct a new server id + * + * @param[in] client Client connection context + * @param[in] task_id Task id + * @return a new server id + */ +struct ctdb_server_id ctdb_client_get_server_id( + struct ctdb_client_context *client, + uint32_t task_id); + +/** + * @brief Check if two server ids are the same + * + * @param[in] sid1 Server id 1 + * @param[in] sid2 Server id 2 + * @return true if the server ids are same, false otherwise + */ +bool ctdb_server_id_equal(struct ctdb_server_id *sid1, + struct ctdb_server_id *sid2); + +/** + * @brief Check if the process with server id exists + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] sid Server id + * @param[out] exists Boolean flag to indicate if the process exists + * @return 0 on success, errno on failure + */ +int ctdb_server_id_exists(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_server_id *sid, bool *exists); + +#endif /* __CTDB_CLIENT_H__ */ diff --git a/ctdb/client/client_call.c b/ctdb/client/client_call.c new file mode 100644 index 0000000..088ba67 --- /dev/null +++ b/ctdb/client/client_call.c @@ -0,0 +1,184 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" + + +/* + * Handle REQ_CALL and REPLY_CALL + */ + +struct ctdb_client_call_state { + struct ctdb_client_context *client; + uint32_t reqid; + struct ctdb_reply_call *reply; + struct tevent_req *req; +}; + +static int ctdb_client_call_state_destructor( + struct ctdb_client_call_state *state); +static void ctdb_client_call_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_call_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_req_call *request) +{ + struct ctdb_req_header h; + struct tevent_req *req, *subreq; + struct ctdb_client_call_state *state; + uint32_t reqid; + uint8_t *buf; + size_t datalen, buflen; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_call_state); + if (req == NULL) { + return NULL; + } + + reqid = reqid_new(client->idr, state); + if (reqid == REQID_INVALID) { + talloc_free(req); + return NULL; + } + + state->client = client; + state->reqid = reqid; + state->req = req; + state->reply = talloc_zero(state, struct ctdb_reply_call); + if (tevent_req_nomem(state->reply, req)) { + return tevent_req_post(req, ev); + } + + talloc_set_destructor(state, ctdb_client_call_state_destructor); + + ctdb_req_header_fill(&h, 0, CTDB_REQ_CALL, CTDB_CURRENT_NODE, + client->pnn, reqid); + + datalen = ctdb_req_call_len(&h, request); + ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_call_push(&h, request, buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + subreq = comm_write_send(state, ev, client->comm, buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_client_call_done, req); + + return req; +} + +static int ctdb_client_call_state_destructor( + struct ctdb_client_call_state *state) +{ + reqid_remove(state->client->idr, state->reqid); + return 0; +} + +static void ctdb_client_call_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + bool status; + int ret; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + /* wait for the reply */ +} + +void ctdb_client_reply_call(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid) +{ + struct ctdb_req_header h; + struct ctdb_client_call_state *state; + int ret; + + state = reqid_find(client->idr, reqid, struct ctdb_client_call_state); + if (state == NULL) { + return; + } + + if (reqid != state->reqid) { + return; + } + + ret = ctdb_reply_call_pull(buf, buflen, &h, state, state->reply); + if (ret != 0) { + tevent_req_error(state->req, ret); + return; + } + + tevent_req_done(state->req); +} + +bool ctdb_client_call_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct ctdb_reply_call **reply, int *perr) +{ + struct ctdb_client_call_state *state = tevent_req_data( + req, struct ctdb_client_call_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + if (reply != NULL) { + *reply = talloc_steal(mem_ctx, state->reply); + } + + return true; +} diff --git a/ctdb/client/client_connect.c b/ctdb/client/client_connect.c new file mode 100644 index 0000000..a942871 --- /dev/null +++ b/ctdb/client/client_connect.c @@ -0,0 +1,532 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" +#include "common/logging.h" + +#include "lib/util/tevent_unix.h" +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" +#include "client/client_sync.h" + +static void client_read_handler(uint8_t *buf, size_t buflen, + void *private_data); +static void client_dead_handler(void *private_data); + +struct ctdb_client_init_state { + struct ctdb_client_context *client; +}; + +static int ctdb_client_context_destructor(struct ctdb_client_context *client); +static void ctdb_client_init_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_init_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + const char *sockpath) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_init_state *state; + struct ctdb_client_context *client; + struct ctdb_req_control request; + struct sockaddr_un addr; + size_t len; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_init_state); + if (req == NULL) { + return NULL; + } + + if (sockpath == NULL) { + D_ERR("socket path cannot be NULL\n"); + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + client = talloc_zero(state, struct ctdb_client_context); + if (tevent_req_nomem(client, req)) { + return tevent_req_post(req, ev); + } + + ret = reqid_init(client, INT_MAX-200, &client->idr); + if (ret != 0) { + D_ERR("reqid_init() failed, ret=%d\n", ret); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = srvid_init(client, &client->srv); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("srvid_init() failed, ret=%d\n", ret)); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = srvid_init(client, &client->tunnels); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("srvid_init() failed, ret=%d\n", ret)); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path)); + if (len != strlen(sockpath)) { + D_ERR("socket path too long, len=%zu\n", strlen(sockpath)); + talloc_free(client); + tevent_req_error(req, ENAMETOOLONG); + return tevent_req_post(req, ev); + } + + client->fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (client->fd == -1) { + ret = errno; + D_ERR("socket() failed, errno=%d\n", ret); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = connect(client->fd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret == -1) { + ret = errno; + DEBUG(DEBUG_ERR, ("connect() failed, errno=%d\n", ret)); + close(client->fd); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = comm_setup(client, ev, client->fd, client_read_handler, client, + client_dead_handler, client, &client->comm); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("comm_setup() failed, ret=%d\n", ret)); + close(client->fd); + talloc_free(client); + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + client->pnn = CTDB_UNKNOWN_PNN; + + talloc_set_destructor(client, ctdb_client_context_destructor); + + state->client = client; + + ctdb_req_control_get_pnn(&request); + subreq = ctdb_client_control_send(state, ev, client, + CTDB_CURRENT_NODE, + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + TALLOC_FREE(state->client); + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_client_init_done, req); + + return req; +} + +static int ctdb_client_context_destructor(struct ctdb_client_context *client) +{ + if (client->fd != -1) { + close(client->fd); + client->fd = -1; + } + return 0; +} + +static void ctdb_client_init_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_client_init_state *state = tevent_req_data( + req, struct ctdb_client_init_state); + struct ctdb_reply_control *reply; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_pnn(reply, &state->client->pnn); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_client_init_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, + struct ctdb_client_context **result) +{ + struct ctdb_client_init_state *state = tevent_req_data( + req, struct ctdb_client_init_state); + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + *result = talloc_steal(mem_ctx, state->client); + return true; +} + + +int ctdb_client_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + const char *sockpath, struct ctdb_client_context **out) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_client_init_send(mem_ctx, ev, sockpath); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_init_recv(req, &ret, mem_ctx, out); + TALLOC_FREE(req); + if (! status) { + return ret; + } + + return 0; +} + +static void client_read_handler(uint8_t *buf, size_t buflen, + void *private_data) +{ + struct ctdb_client_context *client = talloc_get_type_abort( + private_data, struct ctdb_client_context); + struct ctdb_req_header hdr; + size_t np; + int ret; + + ret = ctdb_req_header_pull(buf, buflen, &hdr, &np); + if (ret != 0) { + DEBUG(DEBUG_WARNING, ("invalid header, ret=%d\n", ret)); + return; + } + + if (buflen != hdr.length) { + DEBUG(DEBUG_WARNING, ("packet size mismatch %zu != %d\n", + buflen, hdr.length)); + return; + } + + ret = ctdb_req_header_verify(&hdr, 0); + if (ret != 0) { + DEBUG(DEBUG_WARNING, ("invalid header, ret=%d\n", ret)); + return; + } + + switch (hdr.operation) { + case CTDB_REPLY_CALL: + ctdb_client_reply_call(client, buf, buflen, hdr.reqid); + break; + + case CTDB_REQ_MESSAGE: + ctdb_client_req_message(client, buf, buflen, hdr.reqid); + break; + + case CTDB_REPLY_CONTROL: + ctdb_client_reply_control(client, buf, buflen, hdr.reqid); + break; + + case CTDB_REQ_TUNNEL: + ctdb_client_req_tunnel(client, buf, buflen, hdr.reqid); + break; + + default: + break; + } +} + +static void client_dead_handler(void *private_data) +{ + struct ctdb_client_context *client = talloc_get_type_abort( + private_data, struct ctdb_client_context); + ctdb_client_callback_func_t callback = client->callback; + void *callback_data = client->private_data; + + if (callback != NULL) { + callback(callback_data); + return; + } + + DEBUG(DEBUG_NOTICE, ("connection to daemon closed, exiting\n")); + exit(1); +} + +void ctdb_client_set_disconnect_callback(struct ctdb_client_context *client, + ctdb_client_callback_func_t callback, + void *private_data) +{ + client->callback = callback; + client->private_data = private_data; +} + +uint32_t ctdb_client_pnn(struct ctdb_client_context *client) +{ + return client->pnn; +} + +void ctdb_client_wait(struct tevent_context *ev, bool *done) +{ + while (! (*done)) { + tevent_loop_once(ev); + } +} + +static void ctdb_client_wait_timeout_handler(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, + void *private_data) +{ + bool *timed_out = (bool *)private_data; + + *timed_out = true; +} + +int ctdb_client_wait_func_timeout(struct tevent_context *ev, + bool (*done_func)(void *private_data), + void *private_data, + struct timeval timeout) +{ + TALLOC_CTX *mem_ctx; + struct tevent_timer *timer; + bool timed_out = false; + + mem_ctx = talloc_new(ev); + if (mem_ctx == NULL) { + return ENOMEM; + } + + timer = tevent_add_timer(ev, mem_ctx, timeout, + ctdb_client_wait_timeout_handler, + &timed_out); + if (timer == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + while (! (done_func(private_data)) && ! timed_out) { + tevent_loop_once(ev); + } + + talloc_free(mem_ctx); + + if (timed_out) { + return ETIMEDOUT; + } + + return 0; +} + +static bool client_wait_done(void *private_data) +{ + bool *done = (bool *)private_data; + + return *done; +} + +int ctdb_client_wait_timeout(struct tevent_context *ev, + bool *done, + struct timeval timeout) + +{ + int ret; + + ret = ctdb_client_wait_func_timeout(ev, + client_wait_done, + done, + timeout); + + return ret; +} + +struct ctdb_recovery_wait_state { + struct tevent_context *ev; + struct ctdb_client_context *client; +}; + +static void ctdb_recovery_wait_recmode(struct tevent_req *subreq); +static void ctdb_recovery_wait_retry(struct tevent_req *subreq); + +struct tevent_req *ctdb_recovery_wait_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client) +{ + struct tevent_req *req, *subreq; + struct ctdb_recovery_wait_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_recovery_wait_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + + ctdb_req_control_get_recmode(&request); + subreq = ctdb_client_control_send(state, ev, client, client->pnn, + tevent_timeval_zero(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_recovery_wait_recmode, req); + + return req; +} + +static void ctdb_recovery_wait_recmode(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_recovery_wait_state *state = tevent_req_data( + req, struct ctdb_recovery_wait_state); + struct ctdb_reply_control *reply; + int recmode; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_recmode(reply, &recmode); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + if (recmode == CTDB_RECOVERY_NORMAL) { + tevent_req_done(req); + return; + } + + subreq = tevent_wakeup_send(state, state->ev, + tevent_timeval_current_ofs(1, 0)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_recovery_wait_retry, req); +} + +static void ctdb_recovery_wait_retry(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_recovery_wait_state *state = tevent_req_data( + req, struct ctdb_recovery_wait_state); + struct ctdb_req_control request; + bool status; + + status = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ENOMEM); + return; + } + + ctdb_req_control_get_recmode(&request); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->client->pnn, + tevent_timeval_zero(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_recovery_wait_recmode, req); +} + +bool ctdb_recovery_wait_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +bool ctdb_recovery_wait(struct tevent_context *ev, + struct ctdb_client_context *client) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + bool status; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return false; + } + + req = ctdb_recovery_wait_send(mem_ctx, ev, client); + if (req == NULL) { + return false; + } + + tevent_req_poll(req, ev); + + status = ctdb_recovery_wait_recv(req, NULL); + + talloc_free(mem_ctx); + return status; +} diff --git a/ctdb/client/client_control.c b/ctdb/client/client_control.c new file mode 100644 index 0000000..ab0aac8 --- /dev/null +++ b/ctdb/client/client_control.c @@ -0,0 +1,439 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" + + +/* + * Handle REQ_CONTROL and REPLY_CONTROL + */ + +struct ctdb_client_control_state { + struct ctdb_client_context *client; + uint32_t opcode; + uint32_t flags; + uint32_t reqid; + struct ctdb_reply_control *reply; + struct tevent_req *req; +}; + +static int ctdb_client_control_state_destructor( + struct ctdb_client_control_state *state); +static void ctdb_client_control_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_control_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct timeval timeout, + struct ctdb_req_control *request) +{ + struct ctdb_req_header h; + struct tevent_req *req, *subreq; + struct ctdb_client_control_state *state; + uint32_t reqid; + uint8_t *buf; + size_t datalen, buflen; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_control_state); + if (req == NULL) { + return NULL; + } + + reqid = reqid_new(client->idr, state); + if (reqid == REQID_INVALID) { + talloc_free(req); + return NULL; + } + + state->client = client; + state->flags = request->flags; + state->opcode = request->opcode; + state->reqid = reqid; + state->req = req; + state->reply = talloc_zero(state, struct ctdb_reply_control); + if (tevent_req_nomem(state->reply, req)) { + return tevent_req_post(req, ev); + } + state->reply->rdata.opcode = request->rdata.opcode; + + talloc_set_destructor(state, ctdb_client_control_state_destructor); + + ctdb_req_header_fill(&h, 0, CTDB_REQ_CONTROL, destnode, + client->pnn, reqid); + + datalen = ctdb_req_control_len(&h, request); + ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_control_push(&h, request, buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (!tevent_timeval_is_zero(&timeout)) { + if (!tevent_req_set_endtime(req, ev, timeout)) { + return tevent_req_post(req, ev); + } + } + + subreq = comm_write_send(state, ev, client->comm, buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_client_control_done, req); + + return req; +} + +static int ctdb_client_control_state_destructor( + struct ctdb_client_control_state *state) +{ + reqid_remove(state->client->idr, state->reqid); + return 0; +} + +static void ctdb_client_control_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_client_control_state *state = tevent_req_data( + req, struct ctdb_client_control_state); + bool status; + int ret; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + /* Daemon will not reply, so we set status to 0 */ + if (state->flags & CTDB_CTRL_FLAG_NOREPLY) { + state->reply->status = 0; + tevent_req_done(req); + } + + /* wait for the reply or timeout */ +} + +void ctdb_client_reply_control(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid) +{ + struct ctdb_req_header h; + struct ctdb_client_control_state *state; + int ret; + + state = reqid_find(client->idr, reqid, + struct ctdb_client_control_state); + if (state == NULL) { + return; + } + + if (reqid != state->reqid) { + return; + } + + ret = ctdb_reply_control_pull(buf, buflen, state->opcode, &h, + state->reply, state->reply); + if (ret != 0) { + tevent_req_error(state->req, ret); + return; + } + + tevent_req_done(state->req); +} + +bool ctdb_client_control_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, + struct ctdb_reply_control **reply) +{ + struct ctdb_client_control_state *state = tevent_req_data( + req, struct ctdb_client_control_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + if (reply != NULL) { + *reply = talloc_steal(mem_ctx, state->reply); + } + + return true; +} + +/* + * Handle multiple nodes - there cannot be any return data + */ + +struct ctdb_client_control_multi_state { + uint32_t *pnn_list; + int count; + int done; + int err; + int *err_list; + struct ctdb_reply_control **reply; +}; + +struct control_index_state { + struct tevent_req *req; + int index; +}; + +static void ctdb_client_control_multi_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_control_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct timeval timeout, + struct ctdb_req_control *request) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_control_multi_state *state; + int i; + + if (pnn_list == NULL || count == 0) { + return NULL; + } + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_control_multi_state); + if (req == NULL) { + return NULL; + } + + state->pnn_list = pnn_list; + state->count = count; + state->done = 0; + state->err = 0; + state->err_list = talloc_zero_array(state, int, count); + if (tevent_req_nomem(state->err_list, req)) { + return tevent_req_post(req, ev); + } + state->reply = talloc_zero_array(state, struct ctdb_reply_control *, + count); + if (tevent_req_nomem(state->reply, req)) { + return tevent_req_post(req, ev); + } + + for (i=0; i<count; i++) { + struct control_index_state *substate; + + subreq = ctdb_client_control_send(state, ev, client, + pnn_list[i], timeout, + request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + substate = talloc(subreq, struct control_index_state); + if (tevent_req_nomem(substate, req)) { + return tevent_req_post(req, ev); + } + + substate->req = req; + substate->index = i; + + tevent_req_set_callback(subreq, ctdb_client_control_multi_done, + substate); + } + + return req; +} + +static void ctdb_client_control_multi_done(struct tevent_req *subreq) +{ + struct control_index_state *substate = tevent_req_callback_data( + subreq, struct control_index_state); + struct tevent_req *req = substate->req; + int idx = substate->index; + struct ctdb_client_control_multi_state *state = tevent_req_data( + req, struct ctdb_client_control_multi_state); + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state->reply, + &state->reply[idx]); + TALLOC_FREE(subreq); + if (! status) { + if (state->err == 0) { + state->err = ret; + state->err_list[idx] = state->err; + } + } else { + if (state->reply[idx]->status != 0) { + if (state->err == 0) { + state->err = state->reply[idx]->status; + state->err_list[idx] = state->err; + } + } + } + + state->done += 1; + + if (state->done == state->count) { + tevent_req_done(req); + } +} + +bool ctdb_client_control_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list, + struct ctdb_reply_control ***preply) +{ + struct ctdb_client_control_multi_state *state = tevent_req_data( + req, struct ctdb_client_control_multi_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + return false; + } + + if (perr != NULL) { + *perr = state->err; + } + + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + + if (preply != NULL) { + *preply = talloc_steal(mem_ctx, state->reply); + } + + if (state->err != 0) { + return false; + } + + return true; +} + +int ctdb_client_control_multi_error(uint32_t *pnn_list, int count, + int *err_list, uint32_t *pnn) +{ + int ret = 0, i; + + for (i=0; i<count; i++) { + if (err_list[i] != 0) { + ret = err_list[i]; + *pnn = pnn_list[i]; + } + } + + return ret; +} + +/* + * Sync version of control send/recv + */ + +int ctdb_client_control(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct timeval timeout, + struct ctdb_req_control *request, + struct ctdb_reply_control **reply) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_client_control_send(mem_ctx, ev, client, destnode, timeout, + request); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_control_recv(req, &ret, mem_ctx, reply); + if (! status) { + return ret; + } + + return 0; +} + +int ctdb_client_control_multi(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct timeval timeout, + struct ctdb_req_control *request, + int **perr_list, + struct ctdb_reply_control ***preply) +{ + struct tevent_req *req; + bool status; + int ret; + + req = ctdb_client_control_multi_send(mem_ctx, ev, client, + pnn_list, count, + timeout, request); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_control_multi_recv(req, &ret, mem_ctx, perr_list, + preply); + if (! status) { + return ret; + } + + return 0; +} diff --git a/ctdb/client/client_control_sync.c b/ctdb/client/client_control_sync.c new file mode 100644 index 0000000..c786fc7 --- /dev/null +++ b/ctdb/client/client_control_sync.c @@ -0,0 +1,2676 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/logging.h" + +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client_private.h" +#include "client/client.h" +#include "client/client_sync.h" + +int ctdb_ctrl_process_exists(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + pid_t pid, int *status) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_process_exists(&request, pid); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control PROCESS_EXISTS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_process_exists(reply, status); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control PROCESS_EXISTS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_statistics(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_statistics **stats) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_statistics(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STATISTICS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_statistics(reply, mem_ctx, stats); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STATISTICS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_ping(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *num_clients) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_ping(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control PING failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_ping(reply, num_clients); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control PING failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_getdbpath(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, + const char **db_path) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_getdbpath(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GETDBPATH failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_getdbpath(reply, mem_ctx, db_path); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GETDBPATH failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_getvnnmap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_vnn_map **vnnmap) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_getvnnmap(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GETVNNMAP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_getvnnmap(reply, mem_ctx, vnnmap); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GETVNNMAP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_getdebug(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *loglevel) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_debug(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DEBUG failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_debug(reply, loglevel); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DEBUG failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_setdebug(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int loglevel) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_debug(&request, loglevel); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DEBUG failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_debug(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DEBUG failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_dbmap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_dbid_map **dbmap) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_dbmap(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DBMAP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_dbmap(reply, mem_ctx, dbmap); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DBMAP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_recmode(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *recmode) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_recmode(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RECMODE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_recmode(reply, recmode); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RECMODE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_recmode(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int recmode) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_recmode(&request, recmode); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_RECMODE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_recmode(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_RECMODE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_statistics_reset(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_statistics_reset(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STATISTICS_RESET failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_statistics_reset(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STATISTICS_RESET failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_attach(&request, db_name); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_attach(reply, db_id); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_traverse_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start *traverse) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_traverse_start(&request, traverse); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_START failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_traverse_start(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_START failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_register_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_register_srvid(&request, srvid); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control REGISTER_SRVID failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_register_srvid(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control REGISTER_SRVID failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_deregister_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_deregister_srvid(&request, srvid); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEREGISTER_SRVID failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_deregister_srvid(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEREGISTER_SRVID failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_dbname(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, const char **db_name) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_dbname(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DBNAME failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_dbname(reply, mem_ctx, db_name); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DBNAME failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_enable_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_enable_seqnum(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control ENABLE_SEQNUM failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_enable_seqnum(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control ENABLE_SEQNUM failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_update_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_update_seqnum(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control UPDATE_SEQNUM failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_update_seqnum(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control UPDATE_SEQNUM failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_dump_memory(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char **mem_str) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_dump_memory(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DUMP_MEMORY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_dump_memory(reply, mem_ctx, mem_str); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DUMP_MEMORY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_pid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + pid_t *pid) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_pid(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PID failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_pid(reply, pid); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PID failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_freeze(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int priority) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_freeze(&request, priority); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control FREEZE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_freeze(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control FREEZE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_pnn(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t *pnn) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_pnn(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PNN failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_pnn(reply, pnn); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PNN failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_shutdown(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_shutdown(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SHUTDOWN failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_shutdown(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SHUTDOWN failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_tcp_add(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_tcp_add(&request, conn); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_ADD failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_tcp_add(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_ADD failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_tcp_remove(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_tcp_remove(&request, conn); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_REMOVE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_tcp_remove(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_REMOVE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_tunable(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tunable *tunable) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_tunable(&request, tunable); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_TUNABLE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_tunable(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_TUNABLE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_tunable(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *var, uint32_t *value) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_tunable(&request, var); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_TUNABLE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_tunable(reply, value); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_TUNABLE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_list_tunables(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_var_list **var_list) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_list_tunables(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control LIST_TUNABLES failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_list_tunables(reply, mem_ctx, var_list); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control LIST_TUNABLES failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_modify_flags(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t pnn, uint32_t old_flags, + uint32_t new_flags) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + struct ctdb_node_flag_change flag_change; + int ret; + + flag_change.pnn = pnn; + flag_change.old_flags = old_flags; + flag_change.new_flags = new_flags; + + ctdb_req_control_modify_flags(&request, &flag_change); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control MODIFY_FLAGS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_modify_flags(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control MODIFY_FLAGS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_all_tunables(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tunable_list **tun_list) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_all_tunables(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_ALL_TUNABLES failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_all_tunables(reply, mem_ctx, tun_list); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_ALL_TUNABLES failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_tcp_tickle_list(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + ctdb_sock_addr *addr, + struct ctdb_tickle_list **tickles) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_tcp_tickle_list(&request, addr); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_TCP_TICKLE_LIST failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_tcp_tickle_list(reply, mem_ctx, tickles); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_TCP_TICKLE_LIST failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_tcp_tickle_list(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tickle_list *tickles) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_tcp_tickle_list(&request, tickles); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_TCP_TICKLE_LIST failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_tcp_tickle_list(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_TCP_TICKLE_LIST failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_attach_persistent(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_attach_persistent(&request, db_name); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH_PERSISTENT failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_attach_persistent(reply, db_id); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH_PERSISTENT failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_send_gratuitous_arp(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_send_gratuitous_arp(&request, addr_info); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SEND_GRATUITOUS_ARP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_send_gratuitous_arp(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SEND_GRATUITOUS_ARP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_wipe_database(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint32_t tid) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + struct ctdb_transdb transdb; + int ret; + + transdb.db_id = db_id; + transdb.tid = tid; + + ctdb_req_control_wipe_database(&request, &transdb); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control WIPE_DATABASE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_wipe_database(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control WIPE_DATABASE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_uptime(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_uptime **uptime) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_uptime(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control UPTIME failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_uptime(reply, mem_ctx, uptime); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control UPTIME failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_start_recovery(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_start_recovery(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control START_RECOVERY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_start_recovery(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control START_RECOVERY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_end_recovery(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_end_recovery(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control END_RECOVERY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_end_recovery(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control END_RECOVERY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_reload_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_reload_nodes_file(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELOAD_NODES_FILE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_reload_nodes_file(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELOAD_NODES_FILE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_add_public_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_add_public_ip(&request, addr_info); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control ADD_PUBLIC_IP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_add_public_ip(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control ADD_PUBLIC_IP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_del_public_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_del_public_ip(&request, addr_info); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEL_PUBLIC_IP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_del_public_ip(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEL_PUBLIC_IP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_capabilities(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t *caps) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_capabilities(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_CAPABILITIES failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_capabilities(reply, caps); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_CAPABILITIES failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_release_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_public_ip *pubip) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_release_ip(&request, pubip); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELEASE_IP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_release_ip(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELEASE_IP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_takeover_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_public_ip *pubip) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_takeover_ip(&request, pubip); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TAKEOVER_IP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_takeover_ip(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TAKEOVER_IP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_public_ips(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + bool available_only, + struct ctdb_public_ip_list **pubip_list) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_public_ips(&request, available_only); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PUBLIC_IPS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_public_ips(reply, mem_ctx, pubip_list); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PUBLIC_IPS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_nodemap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_node_map **nodemap) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_nodemap(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_NODEMAP failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_nodemap(reply, mem_ctx, nodemap); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_NODEMAP failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_traverse_kill(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start *traverse) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_traverse_kill(&request, traverse); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_KILL failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_traverse_kill(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_KILL failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_reclock_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char **reclock_file) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_reclock_file(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RECLOCK_FILE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_reclock_file(reply, mem_ctx, reclock_file); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RECLOCK_FILE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_stop_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_stop_node(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STOP_NODE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_stop_node(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control STOP_NODE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_continue_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_continue_node(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control CONTINUE_NODE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_continue_node(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control CONTINUE_NODE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_lmasterrole(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t lmaster_role) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_lmasterrole(&request, lmaster_role); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_LMASTERROLE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_lmasterrole(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_LMASTERROLE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_recmasterrole(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t recmaster_role) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_recmasterrole(&request, recmaster_role); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_RECMASTERROLE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_recmasterrole(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_RECMASTERROLE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_ban_state(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_ban_state *ban_state) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_ban_state(&request, ban_state); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_BAN_STATE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_ban_state(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_BAN_STATE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_ban_state(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_ban_state **ban_state) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_ban_state(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_BAN_STATE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_ban_state(reply, mem_ctx, ban_state); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_BAN_STATE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_register_notify(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_notify_data *notify) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_register_notify(&request, notify); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control REGISTER_NOTIFY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_register_notify(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control REGISTER_NOTIFY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_deregister_notify(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_deregister_notify(&request, srvid); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEREGISTER_NOTIFY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_deregister_notify(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DEREGISTER_NOTIFY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_trans3_commit(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_rec_buffer *recbuf) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_trans3_commit(&request, recbuf); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRANS3_COMMIT failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_trans3_commit(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRANS3_COMMIT failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_db_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint64_t *seqnum) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_db_seqnum(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DB_SEQNUM failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_db_seqnum(reply, seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DB_SEQNUM failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_set_healthy(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_set_healthy(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_SET_HEALTHY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_set_healthy(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_SET_HEALTHY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_get_health(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, const char **reason) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_get_health(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_GET_HEALTH failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_get_health(reply, mem_ctx, reason); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_GET_HEALTH failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_public_ip_info(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + ctdb_sock_addr *addr, + struct ctdb_public_ip_info **ipinfo) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_public_ip_info(&request, addr); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PUBLIC_IP_INFO failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_public_ip_info(reply, mem_ctx, ipinfo); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_PUBLIC_IP_INFO failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_ifaces(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_iface_list **iface_list) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_ifaces(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_IFACES failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_ifaces(reply, mem_ctx, iface_list); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_IFACES failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_iface_link_state(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_iface *iface) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_iface_link_state(&request, iface); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_IFACE_LINK_STATE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_iface_link_state(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_IFACE_LINK_STATE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_tcp_add_delayed_update(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_tcp_add_delayed_update(&request, conn); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_ADD_DELAYED_UPDATE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_tcp_add_delayed_update(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TCP_ADD_DELAYED_UPDATE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_stat_history(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_statistics_list **stats_list) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_stat_history(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_STAT_HISTORY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_stat_history(reply, mem_ctx, stats_list); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_STAT_HISTORY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_schedule_for_deletion(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_key_data *key) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_schedule_for_deletion(&request, key); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SCHEDULE_FOR_DELETION failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_schedule_for_deletion(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SCHEDULE_FOR_DELETION failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_db_readonly(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_db_readonly(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DB_READONY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_db_readonly(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DB_READONY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_traverse_start_ext(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start_ext *traverse) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_traverse_start_ext(&request, traverse); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_START_EXT failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_traverse_start_ext(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TRAVERSE_START_EXT failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_db_statistics(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, + struct ctdb_db_statistics **dbstats) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_db_statistics(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DB_STATISTICS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_db_statistics(reply, mem_ctx, dbstats); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_DB_STATISTICS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_set_db_sticky(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_set_db_sticky(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DB_STICKY failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_set_db_sticky(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control SET_DB_STICKY failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_reload_public_ips(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_reload_public_ips(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELOAD_PUBLIC_IPS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_reload_public_ips(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control RELOAD_PUBLIC_IPS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_ipreallocated(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_ipreallocated(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control IPREALLOCATED failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_ipreallocated(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control IPREALLOCATED failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_runstate(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + enum ctdb_runstate *runstate) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_runstate(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RUNSTATE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_runstate(reply, runstate); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_RUNSTATE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_detach(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_DETACH failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_detach(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_DETACH failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_get_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_node_map **nodemap) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_get_nodes_file(&request); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_NODES_FILE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_get_nodes_file(reply, mem_ctx, nodemap); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control GET_NODES_FILE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_freeze(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_freeze(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_FREEZE failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_freeze(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_FREEZE failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_thaw(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_thaw(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_THAW failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_thaw(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_THAW failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_transaction_start(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_transdb *transdb) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_transaction_start(&request, transdb); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_START failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_transaction_start(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_START failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_transaction_commit(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_transdb *transdb) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_transaction_commit(&request, transdb); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_COMMIT failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_transaction_commit(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_COMMIT failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_transaction_cancel(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_transaction_cancel(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_CANCEL failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_transaction_cancel(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_TRANSACTION_CANCEL failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_pull(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pulldb_ext *pulldb, uint32_t *num_records) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_pull(&request, pulldb); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_PULL failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_pull(reply, num_records); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("Control DB_PULL failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_push_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pulldb_ext *pulldb) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_push_start(&request, pulldb); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_PUSH_START failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_push_start(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_PUSH_START failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_push_confirm(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint32_t *num_records) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_push_confirm(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_PUSH_CONFIRM failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_push_confirm(reply, num_records); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_PUSH_CONFIRM failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_open_flags(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, int *tdb_flags) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_open_flags(&request, db_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_OPEN_FLAGS failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_open_flags(reply, tdb_flags); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_OPEN_FLAGS failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_db_attach_replicated(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_db_attach_replicated(&request, db_name); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH_REPLICATED failed to node %u," + " ret=%d\n", destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_db_attach_replicated(reply, db_id); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control DB_ATTACH_REPLICATED failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_check_pid_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pid_srvid *pid_srvid, int *status) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_check_pid_srvid(&request, pid_srvid); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control CHECK_PID_SRVID failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_check_pid_srvid(reply, status); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control CHECK_PID_SRVID failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_tunnel_register(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t tunnel_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_tunnel_register(&request, tunnel_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TUNNEL_REGISTER failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_tunnel_register(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TUNNEL_REGISTER failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_tunnel_deregister(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t tunnel_id) +{ + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + int ret; + + ctdb_req_control_tunnel_deregister(&request, tunnel_id); + ret = ctdb_client_control(mem_ctx, ev, client, destnode, timeout, + &request, &reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TUNNEL_DEREGISTER failed to node %u, ret=%d\n", + destnode, ret)); + return ret; + } + + ret = ctdb_reply_control_tunnel_deregister(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Control TUNNEL_DEREGISTER failed, ret=%d\n", ret)); + return ret; + } + + return 0; +} + +int ctdb_ctrl_disable_node(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct timeval timeout) +{ + struct ctdb_req_control request = { + .opcode = 0, + }; + struct ctdb_reply_control *reply = NULL; + int ret; + + ctdb_req_control_disable_node(&request); + ret = ctdb_client_control(mem_ctx, + ev, + client, + destnode, + timeout, + &request, + &reply); + if (ret != 0) { + D_ERR("Control DISABLE_NODE failed to node %u, ret=%d\n", + destnode, + ret); + return ret; + } + + ret = ctdb_reply_control_disable_node(reply); + if (ret != 0) { + D_ERR("Control DISABLE_NODE failed, ret=%d\n", ret); + return ret; + } + + return 0; +} + +int ctdb_ctrl_enable_node(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct timeval timeout) +{ + struct ctdb_req_control request = { + .opcode = 0, + }; + struct ctdb_reply_control *reply = NULL; + int ret; + + ctdb_req_control_enable_node(&request); + ret = ctdb_client_control(mem_ctx, + ev, + client, + destnode, + timeout, + &request, + &reply); + if (ret != 0) { + D_ERR("Control ENABLE_NODE failed to node %u, ret=%d\n", + destnode, + ret); + return ret; + } + + ret = ctdb_reply_control_enable_node(reply); + if (ret != 0) { + D_ERR("Control ENABLE_NODE failed, ret=%d\n", ret); + return ret; + } + + return 0; +} diff --git a/ctdb/client/client_db.c b/ctdb/client/client_db.c new file mode 100644 index 0000000..0b06d6e --- /dev/null +++ b/ctdb/client/client_db.c @@ -0,0 +1,2791 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/logging.h" + +#include "lib/tdb_wrap/tdb_wrap.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/dlinklist.h" +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client_private.h" +#include "client/client.h" + +struct tdb_context *client_db_tdb(struct ctdb_db_context *db) +{ + return db->ltdb->tdb; +} + +static struct ctdb_db_context *client_db_handle( + struct ctdb_client_context *client, + const char *db_name) +{ + struct ctdb_db_context *db; + + for (db = client->db; db != NULL; db = db->next) { + if (strcmp(db_name, db->db_name) == 0) { + return db; + } + } + + return NULL; +} + +static bool ctdb_db_persistent(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { + return true; + } + return false; +} + +static bool ctdb_db_replicated(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + return true; + } + return false; +} + +static bool ctdb_db_volatile(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT || + db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + return false; + } + return true; +} + +struct ctdb_set_db_flags_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + uint32_t db_id; + uint8_t db_flags; + bool readonly_done, sticky_done; + uint32_t *pnn_list; + int count; +}; + +static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq); +static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq); +static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq); + +static struct tevent_req *ctdb_set_db_flags_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, struct timeval timeout, + uint32_t db_id, uint8_t db_flags) +{ + struct tevent_req *req, *subreq; + struct ctdb_set_db_flags_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_set_db_flags_state); + if (req == NULL) { + return NULL; + } + + if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->timeout = timeout; + state->db_id = db_id; + state->db_flags = db_flags; + + ctdb_req_control_get_nodemap(&request); + subreq = ctdb_client_control_send(state, ev, client, destnode, timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req); + + return req; +} + +static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + struct ctdb_node_map *nodemap; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN, + state, &state->pnn_list); + talloc_free(nodemap); + if (state->count <= 0) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x no connected nodes, count=%d\n", + state->db_id, state->count)); + tevent_req_error(req, ENOMEM); + return; + } + + if (state->db_flags & CTDB_DB_FLAGS_READONLY) { + ctdb_req_control_set_db_readonly(&request, state->db_id); + subreq = ctdb_client_control_multi_send( + state, state->ev, state->client, + state->pnn_list, state->count, + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, + ctdb_set_db_flags_readonly_done, req); + } else { + state->readonly_done = true; + } + + if (state->db_flags & CTDB_DB_FLAGS_STICKY) { + ctdb_req_control_set_db_sticky(&request, state->db_id); + subreq = ctdb_client_control_multi_send( + state, state->ev, state->client, + state->pnn_list, state->count, + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done, + req); + } else { + state->sticky_done = true; + } +} + +static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, + NULL); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->readonly_done = true; + + if (state->readonly_done && state->sticky_done) { + tevent_req_done(req); + } +} + +static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, + NULL); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->sticky_done = true; + + if (state->readonly_done && state->sticky_done) { + tevent_req_done(req); + } +} + +static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + return true; +} + +struct ctdb_attach_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + uint32_t destnode; + uint8_t db_flags; + struct ctdb_db_context *db; +}; + +static void ctdb_attach_dbid_done(struct tevent_req *subreq); +static void ctdb_attach_dbpath_done(struct tevent_req *subreq); +static void ctdb_attach_health_done(struct tevent_req *subreq); +static void ctdb_attach_flags_done(struct tevent_req *subreq); +static void ctdb_attach_open_flags_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags) +{ + struct tevent_req *req, *subreq; + struct ctdb_attach_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state); + if (req == NULL) { + return NULL; + } + + state->db = client_db_handle(client, db_name); + if (state->db != NULL) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->timeout = timeout; + state->destnode = ctdb_client_pnn(client); + state->db_flags = db_flags; + + state->db = talloc_zero(client, struct ctdb_db_context); + if (tevent_req_nomem(state->db, req)) { + return tevent_req_post(req, ev); + } + + state->db->db_name = talloc_strdup(state->db, db_name); + if (tevent_req_nomem(state->db, req)) { + return tevent_req_post(req, ev); + } + + state->db->db_flags = db_flags; + + if (ctdb_db_persistent(state->db)) { + ctdb_req_control_db_attach_persistent(&request, + state->db->db_name); + } else if (ctdb_db_replicated(state->db)) { + ctdb_req_control_db_attach_replicated(&request, + state->db->db_name); + } else { + ctdb_req_control_db_attach(&request, state->db->db_name); + } + + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req); + + return req; +} + +static void ctdb_attach_dbid_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n", + state->db->db_name, + (ctdb_db_persistent(state->db) + ? "DB_ATTACH_PERSISTENT" + : (ctdb_db_replicated(state->db) + ? "DB_ATTACH_REPLICATED" + : "DB_ATTACH")), + ret)); + tevent_req_error(req, ret); + return; + } + + if (ctdb_db_persistent(state->db)) { + ret = ctdb_reply_control_db_attach_persistent( + reply, &state->db->db_id); + } else if (ctdb_db_replicated(state->db)) { + ret = ctdb_reply_control_db_attach_replicated( + reply, &state->db->db_id); + } else { + ret = ctdb_reply_control_db_attach(reply, &state->db->db_id); + } + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_getdbpath(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req); +} + +static void ctdb_attach_dbpath_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_getdbpath(reply, state->db, + &state->db->db_path); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_get_health(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_health_done, req); +} + +static void ctdb_attach_health_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + const char *reason; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_get_health(reply, state, &reason); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + if (reason != NULL) { + /* Database unhealthy, avoid attach */ + DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n", + state->db->db_name, reason)); + tevent_req_error(req, EIO); + return; + } + + subreq = ctdb_set_db_flags_send(state, state->ev, state->client, + state->destnode, state->timeout, + state->db->db_id, state->db_flags); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_flags_done, req); +} + +static void ctdb_attach_flags_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_req_control request; + bool status; + int ret; + + status = ctdb_set_db_flags_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n", + state->db->db_name, state->db_flags)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_open_flags(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req); +} + +static void ctdb_attach_open_flags_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + bool status; + int ret, tdb_flags; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed," + " ret=%d\n", state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0, + tdb_flags, O_RDWR, 0); + if (tevent_req_nomem(state->db->ltdb, req)) { + DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n", + state->db->db_name)); + return; + } + DLIST_ADD(state->client->db, state->db); + + tevent_req_done(req); +} + +bool ctdb_attach_recv(struct tevent_req *req, int *perr, + struct ctdb_db_context **out) +{ + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + if (out != NULL) { + *out = state->db; + } + return true; +} + +int ctdb_attach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags, + struct ctdb_db_context **out) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + bool status; + int ret; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_attach_send(mem_ctx, ev, client, timeout, + db_name, db_flags); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_attach_recv(req, &ret, out); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + /* + ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func); + ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func); + ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func); + */ + + talloc_free(mem_ctx); + return 0; +} + +struct ctdb_detach_state { + struct ctdb_client_context *client; + struct tevent_context *ev; + struct timeval timeout; + uint32_t db_id; + const char *db_name; +}; + +static void ctdb_detach_dbname_done(struct tevent_req *subreq); +static void ctdb_detach_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id) +{ + struct tevent_req *req, *subreq; + struct ctdb_detach_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state); + if (req == NULL) { + return NULL; + } + + state->client = client; + state->ev = ev; + state->timeout = timeout; + state->db_id = db_id; + + ctdb_req_control_get_dbname(&request, db_id); + subreq = ctdb_client_control_send(state, ev, client, + ctdb_client_pnn(client), timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req); + + return req; +} + +static void ctdb_detach_dbname_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_detach_state *state = tevent_req_data( + req, struct ctdb_detach_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_detach(&request, state->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + ctdb_client_pnn(state->client), + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_detach_done, req); + +} + +static void ctdb_detach_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_detach_state *state = tevent_req_data( + req, struct ctdb_detach_state); + struct ctdb_reply_control *reply; + struct ctdb_db_context *db; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", + state->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_detach(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", + state->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + db = client_db_handle(state->client, state->db_name); + if (db != NULL) { + DLIST_REMOVE(state->client->db, db); + TALLOC_FREE(db); + } + + tevent_req_done(req); +} + +bool ctdb_detach_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_detach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_detach_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +uint32_t ctdb_db_id(struct ctdb_db_context *db) +{ + return db->db_id; +} + +struct ctdb_db_traverse_local_state { + ctdb_rec_parser_func_t parser; + void *private_data; + bool extract_header; + int error; +}; + +static int ctdb_db_traverse_local_handler(struct tdb_context *tdb, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct ctdb_db_traverse_local_state *state = + (struct ctdb_db_traverse_local_state *)private_data; + int ret; + + if (state->extract_header) { + struct ctdb_ltdb_header header; + + ret = ctdb_ltdb_header_extract(&data, &header); + if (ret != 0) { + state->error = ret; + return 1; + } + + ret = state->parser(0, &header, key, data, state->private_data); + } else { + ret = state->parser(0, NULL, key, data, state->private_data); + } + + if (ret != 0) { + state->error = ret; + return 1; + } + + return 0; +} + +int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, + bool extract_header, + ctdb_rec_parser_func_t parser, void *private_data) +{ + struct ctdb_db_traverse_local_state state; + int ret; + + state.parser = parser; + state.private_data = private_data; + state.extract_header = extract_header; + state.error = 0; + + if (readonly) { + ret = tdb_traverse_read(client_db_tdb(db), + ctdb_db_traverse_local_handler, + &state); + } else { + ret = tdb_traverse(client_db_tdb(db), + ctdb_db_traverse_local_handler, &state); + } + + if (ret == -1) { + return EIO; + } + + return state.error; +} + +struct ctdb_db_traverse_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + uint32_t destnode; + uint64_t srvid; + struct timeval timeout; + ctdb_rec_parser_func_t parser; + void *private_data; + int result; +}; + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq); +static void ctdb_db_traverse_started(struct tevent_req *subreq); +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data); +static void ctdb_db_traverse_remove_handler(struct tevent_req *req); +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq); + +struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, + struct timeval timeout, + ctdb_rec_parser_func_t parser, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_db_traverse_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_db_traverse_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->destnode = destnode; + state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid(); + state->timeout = timeout; + state->parser = parser; + state->private_data = private_data; + + subreq = ctdb_client_set_message_handler_send(state, ev, client, + state->srvid, + ctdb_db_traverse_handler, + req); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req); + + return req; +} + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_traverse_start_ext traverse; + struct ctdb_req_control request; + int ret = 0; + bool status; + + status = ctdb_client_set_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + traverse = (struct ctdb_traverse_start_ext) { + .db_id = ctdb_db_id(state->db), + .reqid = 0, + .srvid = state->srvid, + .withemptyrecords = false, + }; + + ctdb_req_control_traverse_start_ext(&request, &traverse); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (subreq == NULL) { + state->result = ENOMEM; + ctdb_db_traverse_remove_handler(req); + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_started, req); +} + +static void ctdb_db_traverse_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_reply_control *reply; + int ret = 0; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_reply_control_traverse_start_ext(reply); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n", + ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } +} + +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_rec_data *rec; + struct ctdb_ltdb_header header; + size_t np; + int ret; + + ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np); + if (ret != 0) { + return; + } + + if (rec->key.dsize == 0 && rec->data.dsize == 0) { + talloc_free(rec); + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_ltdb_header_extract(&rec->data, &header); + if (ret != 0) { + talloc_free(rec); + return; + } + + if (rec->data.dsize == 0) { + talloc_free(rec); + return; + } + + ret = state->parser(rec->reqid, &header, rec->key, rec->data, + state->private_data); + talloc_free(rec); + if (ret != 0) { + state->result = ret; + ctdb_db_traverse_remove_handler(req); + } +} + +static void ctdb_db_traverse_remove_handler(struct tevent_req *req) +{ + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct tevent_req *subreq; + + subreq = ctdb_client_remove_message_handler_send(state, state->ev, + state->client, + state->srvid, req); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req); +} + +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + int ret; + bool status; + + status = ctdb_client_remove_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + if (state->result != 0) { + tevent_req_error(req, state->result); + return; + } + + tevent_req_done(req); +} + +bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, struct timeval timeout, + ctdb_rec_parser_func_t parser, void *private_data) +{ + struct tevent_req *req; + int ret = 0; + bool status; + + req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode, + timeout, parser, private_data); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_db_traverse_recv(req, &ret); + if (! status) { + return ret; + } + + return 0; +} + +int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + TDB_DATA rec; + size_t np; + int ret; + + rec = tdb_fetch(client_db_tdb(db), key); + if (rec.dsize < sizeof(struct ctdb_ltdb_header)) { + /* No record present */ + if (rec.dptr != NULL) { + free(rec.dptr); + } + + if (tdb_error(client_db_tdb(db)) != TDB_ERR_NOEXIST) { + return EIO; + } + + *header = (struct ctdb_ltdb_header) { + .dmaster = CTDB_UNKNOWN_PNN, + }; + + if (data != NULL) { + *data = tdb_null; + } + return 0; + } + + ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np); + if (ret != 0) { + return ret; + } + + ret = 0; + if (data != NULL) { + data->dsize = rec.dsize - np; + data->dptr = talloc_memdup(mem_ctx, rec.dptr + np, + data->dsize); + if (data->dptr == NULL) { + ret = ENOMEM; + } + } + + free(rec.dptr); + return ret; +} + +/* + * Fetch a record from volatile database + * + * Steps: + * 1. Get a lock on the hash chain + * 2. If the record does not exist, migrate the record + * 3. If readonly=true and delegations do not exist, migrate the record. + * 4. If readonly=false and delegations exist, migrate the record. + * 5. If the local node is not dmaster, migrate the record. + * 6. Return record + */ + +struct ctdb_fetch_lock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_record_handle *h; + bool readonly; + uint32_t pnn; +}; + +static int ctdb_fetch_lock_check(struct tevent_req *req); +static void ctdb_fetch_lock_migrate(struct tevent_req *req); +static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + TDB_DATA key, bool readonly) +{ + struct ctdb_fetch_lock_state *state; + struct tevent_req *req; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + + state->h = talloc_zero(db, struct ctdb_record_handle); + if (tevent_req_nomem(state->h, req)) { + return tevent_req_post(req, ev); + } + state->h->ev = ev; + state->h->client = client; + state->h->db = db; + state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize); + if (tevent_req_nomem(state->h->key.dptr, req)) { + return tevent_req_post(req, ev); + } + state->h->key.dsize = key.dsize; + state->h->readonly = false; + + state->readonly = readonly; + state->pnn = ctdb_client_pnn(client); + + /* Check that database is not persistent */ + if (! ctdb_db_volatile(db)) { + DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n", + db->db_name)); + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + ret = ctdb_fetch_lock_check(req); + if (ret == 0) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + if (ret != EAGAIN) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + return req; +} + +static int ctdb_fetch_lock_check(struct tevent_req *req) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_record_handle *h = state->h; + struct ctdb_ltdb_header header; + TDB_DATA data = tdb_null; + size_t np; + int ret, err = 0; + bool do_migrate = false; + + ret = tdb_chainlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + err = EIO; + goto failed; + } + + data = tdb_fetch(client_db_tdb(h->db), h->key); + if (data.dptr == NULL) { + if (tdb_error(client_db_tdb(h->db)) == TDB_ERR_NOEXIST) { + goto migrate; + } else { + err = EIO; + goto failed; + } + } + + /* Got the record */ + ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np); + if (ret != 0) { + err = ret; + goto failed; + } + + if (! state->readonly) { + /* Read/write access */ + if (header.dmaster == state->pnn && + header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) { + goto migrate; + } + + if (header.dmaster != state->pnn) { + goto migrate; + } + } else { + /* Readonly access */ + if (header.dmaster != state->pnn && + ! (header.flags & (CTDB_REC_RO_HAVE_READONLY | + CTDB_REC_RO_HAVE_DELEGATIONS))) { + goto migrate; + } + } + + /* We are the dmaster or readonly delegation */ + h->header = header; + h->data = data; + if (header.flags & (CTDB_REC_RO_HAVE_READONLY | + CTDB_REC_RO_HAVE_DELEGATIONS)) { + h->readonly = true; + } + return 0; + +migrate: + do_migrate = true; + err = EAGAIN; + +failed: + if (data.dptr != NULL) { + free(data.dptr); + } + ret = tdb_chainunlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainunlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + return EIO; + } + + if (do_migrate) { + ctdb_fetch_lock_migrate(req); + } + return err; +} + +static void ctdb_fetch_lock_migrate(struct tevent_req *req) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_req_call request; + struct tevent_req *subreq; + + ZERO_STRUCT(request); + request.flags = CTDB_IMMEDIATE_MIGRATION; + if (state->readonly) { + request.flags |= CTDB_WANT_READONLY; + } + request.db_id = state->h->db->db_id; + request.callid = CTDB_NULL_FUNC; + request.key = state->h->key; + request.calldata = tdb_null; + + subreq = ctdb_client_call_send(state, state->ev, state->client, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + + tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req); +} + +static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_reply_call *reply; + int ret; + bool status; + + status = ctdb_client_call_recv(subreq, state, &reply, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + if (reply->status != 0) { + tevent_req_error(req, EIO); + return; + } + talloc_free(reply); + + ret = ctdb_fetch_lock_check(req); + if (ret != 0) { + if (ret != EAGAIN) { + tevent_req_error(req, ret); + } + return; + } + + tevent_req_done(req); +} + +static int ctdb_record_handle_destructor(struct ctdb_record_handle *h) +{ + int ret; + + ret = tdb_chainunlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainunlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + } + free(h->data.dptr); + return 0; +} + +struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, + TDB_DATA *data, int *perr) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_record_handle *h = state->h; + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + TALLOC_FREE(state->h); + *perr = err; + } + return NULL; + } + + if (header != NULL) { + *header = h->header; + } + if (data != NULL) { + size_t offset; + + offset = ctdb_ltdb_header_len(&h->header); + + data->dsize = h->data.dsize - offset; + if (data->dsize == 0) { + data->dptr = NULL; + } else { + data->dptr = talloc_memdup(mem_ctx, + h->data.dptr + offset, + data->dsize); + if (data->dptr == NULL) { + TALLOC_FREE(state->h); + if (perr != NULL) { + *perr = ENOMEM; + } + return NULL; + } + } + } + + talloc_set_destructor(h, ctdb_record_handle_destructor); + return h; +} + +int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, TDB_DATA key, bool readonly, + struct ctdb_record_handle **out, + struct ctdb_ltdb_header *header, TDB_DATA *data) +{ + struct tevent_req *req; + struct ctdb_record_handle *h; + int ret = 0; + + req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret); + if (h == NULL) { + return ret; + } + + *out = h; + return 0; +} + +int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data) +{ + uint8_t header[sizeof(struct ctdb_ltdb_header)]; + TDB_DATA rec[2]; + size_t np; + int ret; + + /* Cannot modify the record if it was obtained as a readonly copy */ + if (h->readonly) { + return EINVAL; + } + + /* Check if the new data is same */ + if (h->data.dsize == data.dsize && + memcmp(h->data.dptr, data.dptr, data.dsize) == 0) { + /* No need to do anything */ + return 0; + } + + ctdb_ltdb_header_push(&h->header, header, &np); + + rec[0].dsize = np; + rec[0].dptr = header; + + rec[1].dsize = data.dsize; + rec[1].dptr = data.dptr; + + ret = tdb_storev(client_db_tdb(h->db), h->key, rec, 2, TDB_REPLACE); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("store_record: %s tdb_storev failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + return EIO; + } + + return 0; +} + +struct ctdb_delete_record_state { + struct ctdb_record_handle *h; +}; + +static void ctdb_delete_record_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_record_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_delete_record_state *state; + struct ctdb_key_data key; + struct ctdb_req_control request; + uint8_t header[sizeof(struct ctdb_ltdb_header)]; + TDB_DATA rec; + size_t np; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_delete_record_state); + if (req == NULL) { + return NULL; + } + + state->h = h; + + /* Cannot delete the record if it was obtained as a readonly copy */ + if (h->readonly) { + DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n", + h->db->db_name)); + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + ctdb_ltdb_header_push(&h->header, header, &np); + + rec.dsize = np; + rec.dptr = header; + + ret = tdb_store(client_db_tdb(h->db), h->key, rec, TDB_REPLACE); + if (ret != 0) { + D_ERR("fetch_lock delete: %s tdb_store failed, %s\n", + h->db->db_name, + tdb_errorstr(client_db_tdb(h->db))); + tevent_req_error(req, EIO); + return tevent_req_post(req, ev); + } + + key.db_id = h->db->db_id; + key.header = h->header; + key.key = h->key; + + ctdb_req_control_schedule_for_deletion(&request, &key); + subreq = ctdb_client_control_send(state, ev, h->client, + ctdb_client_pnn(h->client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_delete_record_done, req); + + return req; +} + +static void ctdb_delete_record_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_delete_record_state *state = tevent_req_data( + req, struct ctdb_delete_record_state); + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, NULL, NULL); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("delete_record: %s SCHEDULE_FOR_DELETION failed, ret=%d\n", + state->h->db->db_name, + ret); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_delete_record_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + + +int ctdb_delete_record(struct ctdb_record_handle *h) +{ + struct tevent_context *ev = h->ev; + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_delete_record_send(mem_ctx, ev, h); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_delete_record_recv(req, &ret); + talloc_free(mem_ctx); + if (! status) { + return ret; + } + + return 0; +} + +/* + * Global lock functions + */ + +struct ctdb_g_lock_lock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + TDB_DATA key; + struct ctdb_server_id my_sid; + enum ctdb_g_lock_type lock_type; + struct ctdb_record_handle *h; + /* state for verification of active locks */ + struct ctdb_g_lock_list *lock_list; + unsigned int current; +}; + +static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq); +static void ctdb_g_lock_lock_process_locks(struct tevent_req *req); +static void ctdb_g_lock_lock_checked(struct tevent_req *subreq); +static int ctdb_g_lock_lock_update(struct tevent_req *req); +static void ctdb_g_lock_lock_retry(struct tevent_req *subreq); + +static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1, + enum ctdb_g_lock_type l2) +{ + if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) { + return false; + } + return true; +} + +struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id *sid, + bool readonly) +{ + struct tevent_req *req, *subreq; + struct ctdb_g_lock_lock_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_g_lock_lock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->key.dptr = discard_const(keyname); + state->key.dsize = strlen(keyname) + 1; + state->my_sid = *sid; + state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE); + + subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, + false); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); + + return req; +} + +static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + TDB_DATA data; + size_t np; + int ret = 0; + + state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); + TALLOC_FREE(subreq); + if (state->h == NULL) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + if (state->lock_list != NULL) { + TALLOC_FREE(state->lock_list); + state->current = 0; + } + + ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, + &state->lock_list, &np); + talloc_free(data.dptr); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ctdb_g_lock_lock_process_locks(req); +} + +static void ctdb_g_lock_lock_process_locks(struct tevent_req *req) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + struct tevent_req *subreq; + struct ctdb_g_lock *lock; + bool check_server = false; + int ret; + + while (state->current < state->lock_list->num) { + lock = &state->lock_list->lock[state->current]; + + /* We should not ask for the same lock more than once */ + if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n", + (char *)state->key.dptr)); + tevent_req_error(req, EDEADLK); + return; + } + + if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) { + check_server = true; + break; + } + + state->current += 1; + } + + if (check_server) { + struct ctdb_req_control request; + + ctdb_req_control_process_exists(&request, lock->sid.pid); + subreq = ctdb_client_control_send(state, state->ev, + state->client, + lock->sid.vnn, + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req); + return; + } + + /* There is no conflict, add ourself to the lock_list */ + state->lock_list->lock = talloc_realloc(state->lock_list, + state->lock_list->lock, + struct ctdb_g_lock, + state->lock_list->num + 1); + if (state->lock_list->lock == NULL) { + tevent_req_error(req, ENOMEM); + return; + } + + lock = &state->lock_list->lock[state->lock_list->num]; + lock->type = state->lock_type; + lock->sid = state->my_sid; + state->lock_list->num += 1; + + ret = ctdb_g_lock_lock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +static void ctdb_g_lock_lock_checked(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + struct ctdb_reply_control *reply; + int ret, value; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n", + (char *)state->key.dptr, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_process_exists(reply, &value); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + talloc_free(reply); + + if (value == 0) { + /* server process exists, need to retry */ + TALLOC_FREE(state->h); + subreq = tevent_wakeup_send(state, state->ev, + tevent_timeval_current_ofs(0,1000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req); + return; + } + + /* server process does not exist, remove conflicting entry */ + state->lock_list->lock[state->current] = + state->lock_list->lock[state->lock_list->num-1]; + state->lock_list->num -= 1; + + ret = ctdb_g_lock_lock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ctdb_g_lock_lock_process_locks(req); +} + +static int ctdb_g_lock_lock_update(struct tevent_req *req) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + TDB_DATA data; + size_t np; + int ret; + + data.dsize = ctdb_g_lock_list_len(state->lock_list); + data.dptr = talloc_size(state, data.dsize); + if (data.dptr == NULL) { + return ENOMEM; + } + + ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); + ret = ctdb_store_record(state->h, data); + talloc_free(data.dptr); + return ret; +} + +static void ctdb_g_lock_lock_retry(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + bool success; + + success = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (! success) { + tevent_req_error(req, ENOMEM); + return; + } + + subreq = ctdb_fetch_lock_send(state, state->ev, state->client, + state->db, state->key, false); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); +} + +bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + int err; + + TALLOC_FREE(state->h); + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +struct ctdb_g_lock_unlock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + TDB_DATA key; + struct ctdb_server_id my_sid; + struct ctdb_record_handle *h; + struct ctdb_g_lock_list *lock_list; +}; + +static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq); +static int ctdb_g_lock_unlock_update(struct tevent_req *req); +static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq); + +struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id sid) +{ + struct tevent_req *req, *subreq; + struct ctdb_g_lock_unlock_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_g_lock_unlock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->key.dptr = discard_const(keyname); + state->key.dsize = strlen(keyname) + 1; + state->my_sid = sid; + + subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, + false); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req); + + return req; +} + +static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + TDB_DATA data; + size_t np; + int ret = 0; + + state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); + TALLOC_FREE(subreq); + if (state->h == NULL) { + DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, + &state->lock_list, &np); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_g_lock_unlock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + if (state->lock_list->num == 0) { + subreq = ctdb_delete_record_send(state, state->ev, state->h); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted, + req); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +static int ctdb_g_lock_unlock_update(struct tevent_req *req) +{ + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + struct ctdb_g_lock *lock; + unsigned int i; + int ret; + + for (i=0; i<state->lock_list->num; i++) { + lock = &state->lock_list->lock[i]; + + if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { + break; + } + } + + if (i < state->lock_list->num) { + state->lock_list->lock[i] = + state->lock_list->lock[state->lock_list->num-1]; + state->lock_list->num -= 1; + } + + if (state->lock_list->num != 0) { + TDB_DATA data; + size_t np; + + data.dsize = ctdb_g_lock_list_len(state->lock_list); + data.dptr = talloc_size(state, data.dsize); + if (data.dptr == NULL) { + return ENOMEM; + } + + ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); + ret = ctdb_store_record(state->h, data); + talloc_free(data.dptr); + if (ret != 0) { + return ret; + } + } + + return 0; +} + +static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + int ret; + bool status; + + status = ctdb_delete_record_recv(subreq, &ret); + if (! status) { + DEBUG(DEBUG_ERR, + ("g_lock_unlock %s delete record failed, ret=%d\n", + (char *)state->key.dptr, ret)); + tevent_req_error(req, ret); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr) +{ + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + int err; + + TALLOC_FREE(state->h); + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +/* + * Persistent database functions + */ +struct ctdb_transaction_start_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + struct ctdb_transaction_handle *h; + uint32_t destnode; +}; + +static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq); +static void ctdb_transaction_g_lock_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, + bool readonly) +{ + struct ctdb_transaction_start_state *state; + struct tevent_req *req, *subreq; + struct ctdb_transaction_handle *h; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_start_state); + if (req == NULL) { + return NULL; + } + + if (ctdb_db_volatile(db)) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->destnode = ctdb_client_pnn(client); + + h = talloc_zero(db, struct ctdb_transaction_handle); + if (tevent_req_nomem(h, req)) { + return tevent_req_post(req, ev); + } + + h->ev = ev; + h->client = client; + h->db = db; + h->readonly = readonly; + h->updated = false; + + /* SRVID is unique for databases, so client can have transactions + * active for multiple databases */ + h->sid = ctdb_client_get_server_id(client, db->db_id); + + h->recbuf = ctdb_rec_buffer_init(h, db->db_id); + if (tevent_req_nomem(h->recbuf, req)) { + return tevent_req_post(req, ev); + } + + h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id); + if (tevent_req_nomem(h->lock_name, req)) { + return tevent_req_post(req, ev); + } + + state->h = h; + + subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req); + + return req; +} + +static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + bool status; + int ret; + + status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_start: %s attach g_lock.tdb failed\n", + state->h->db->db_name)); + tevent_req_error(req, ret); + return; + } + + subreq = ctdb_g_lock_lock_send(state, state->ev, state->client, + state->h->db_g_lock, + state->h->lock_name, + &state->h->sid, state->h->readonly); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req); +} + +static void ctdb_transaction_g_lock_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + int ret; + bool status; + + status = ctdb_g_lock_lock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_start: %s g_lock lock failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +struct ctdb_transaction_handle *ctdb_transaction_start_recv( + struct tevent_req *req, + int *perr) +{ + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return NULL; + } + + return state->h; +} + +int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, bool readonly, + struct ctdb_transaction_handle **out) +{ + struct tevent_req *req; + struct ctdb_transaction_handle *h; + int ret = 0; + + req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db, + readonly); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + h = ctdb_transaction_start_recv(req, &ret); + if (h == NULL) { + return ret; + } + + *out = h; + return 0; +} + +struct ctdb_transaction_record_fetch_state { + TDB_DATA key, data; + struct ctdb_ltdb_header header; + bool found; +}; + +static int ctdb_transaction_record_fetch_traverse( + uint32_t reqid, + struct ctdb_ltdb_header *nullheader, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct ctdb_transaction_record_fetch_state *state = + (struct ctdb_transaction_record_fetch_state *)private_data; + + if (state->key.dsize == key.dsize && + memcmp(state->key.dptr, key.dptr, key.dsize) == 0) { + int ret; + + ret = ctdb_ltdb_header_extract(&data, &state->header); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("record_fetch: Failed to extract header, " + "ret=%d\n", ret)); + return 1; + } + + state->data = data; + state->found = true; + } + + return 0; +} + +static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h, + TDB_DATA key, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_transaction_record_fetch_state state; + int ret; + + state.key = key; + state.found = false; + + ret = ctdb_rec_buffer_traverse(h->recbuf, + ctdb_transaction_record_fetch_traverse, + &state); + if (ret != 0) { + return ret; + } + + if (state.found) { + if (header != NULL) { + *header = state.header; + } + if (data != NULL) { + *data = state.data; + } + return 0; + } + + return ENOENT; +} + +int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h, + TDB_DATA key, + TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + TDB_DATA tmp_data; + struct ctdb_ltdb_header header; + int ret; + + ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data); + if (ret == 0) { + data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr, + tmp_data.dsize); + if (data->dptr == NULL) { + return ENOMEM; + } + data->dsize = tmp_data.dsize; + return 0; + } + + ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data); + if (ret != 0) { + return ret; + } + + ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data); + if (ret != 0) { + return ret; + } + + return 0; +} + +int ctdb_transaction_store_record(struct ctdb_transaction_handle *h, + TDB_DATA key, TDB_DATA data) +{ + TALLOC_CTX *tmp_ctx; + struct ctdb_ltdb_header header; + TDB_DATA old_data; + int ret; + + if (h->readonly) { + return EINVAL; + } + + tmp_ctx = talloc_new(h); + if (tmp_ctx == NULL) { + return ENOMEM; + } + + ret = ctdb_transaction_record_fetch(h, key, &header, &old_data); + if (ret != 0) { + ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data); + if (ret != 0) { + return ret; + } + } + + if (old_data.dsize == data.dsize && + memcmp(old_data.dptr, data.dptr, data.dsize) == 0) { + talloc_free(tmp_ctx); + return 0; + } + + header.dmaster = ctdb_client_pnn(h->client); + header.rsn += 1; + + ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data); + talloc_free(tmp_ctx); + if (ret != 0) { + return ret; + } + h->updated = true; + + return 0; +} + +int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h, + TDB_DATA key) +{ + return ctdb_transaction_store_record(h, key, tdb_null); +} + +static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t *seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; + + key.dptr = discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s seqnum fetch failed, ret=%d\n", + h->db->db_name, ret)); + return ret; + } + + if (data.dsize == 0) { + /* initial data */ + *seqnum = 0; + return 0; + } + + if (data.dsize != sizeof(uint64_t)) { + talloc_free(data.dptr); + return EINVAL; + } + + *seqnum = *(uint64_t *)data.dptr; + + talloc_free(data.dptr); + return 0; +} + +static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + + key.dptr = discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + data.dptr = (uint8_t *)&seqnum; + data.dsize = sizeof(seqnum); + + return ctdb_transaction_store_record(h, key, data); +} + +struct ctdb_transaction_commit_state { + struct tevent_context *ev; + struct timeval timeout; + struct ctdb_transaction_handle *h; + uint64_t seqnum; +}; + +static void ctdb_transaction_commit_done(struct tevent_req *subreq); +static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_commit_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_transaction_commit_state *state; + struct ctdb_req_control request; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_commit_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->timeout = timeout; + state->h = h; + + ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ctdb_req_control_trans3_commit(&request, h->recbuf); + subreq = ctdb_client_control_send(state, ev, h->client, + ctdb_client_pnn(h->client), + timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req); + + return req; +} + +static void ctdb_transaction_commit_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_commit_state *state = tevent_req_data( + req, struct ctdb_transaction_commit_state); + struct ctdb_transaction_handle *h = state->h; + struct ctdb_reply_control *reply; + uint64_t seqnum; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n", + h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_trans3_commit(reply); + talloc_free(reply); + + if (ret != 0) { + /* Control failed due to recovery */ + + ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + if (seqnum == state->seqnum) { + struct ctdb_req_control request; + + /* try again */ + ctdb_req_control_trans3_commit(&request, + state->h->recbuf); + subreq = ctdb_client_control_send( + state, state->ev, state->h->client, + ctdb_client_pnn(state->h->client), + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, + ctdb_transaction_commit_done, + req); + return; + } + + if (seqnum != state->seqnum + 1) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s seqnum mismatch " + "0x%"PRIx64" != 0x%"PRIx64" + 1\n", + state->h->db->db_name, seqnum, state->seqnum)); + tevent_req_error(req, EIO); + return; + } + } + + /* trans3_commit successful */ + subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client, + h->db_g_lock, h->lock_name, h->sid); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done, + req); +} + +static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_commit_state *state = tevent_req_data( + req, struct ctdb_transaction_commit_state); + int ret; + bool status; + + status = ctdb_g_lock_unlock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s g_lock unlock failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + talloc_free(state->h); + tevent_req_done(req); +} + +bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +int ctdb_transaction_commit(struct ctdb_transaction_handle *h) +{ + struct tevent_context *ev = h->ev; + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + if (h->readonly || ! h->updated) { + return ctdb_transaction_cancel(h); + } + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_transaction_commit_send(mem_ctx, ev, + tevent_timeval_zero(), h); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_transaction_commit_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +struct ctdb_transaction_cancel_state { + struct tevent_context *ev; + struct ctdb_transaction_handle *h; + struct timeval timeout; +}; + +static void ctdb_transaction_cancel_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_cancel_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_transaction_cancel_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_cancel_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->h = h; + state->timeout = timeout; + + subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client, + state->h->db_g_lock, + state->h->lock_name, state->h->sid); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_cancel_done, + req); + + return req; +} + +static void ctdb_transaction_cancel_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_cancel_state *state = tevent_req_data( + req, struct ctdb_transaction_cancel_state); + int ret; + bool status; + + status = ctdb_g_lock_unlock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_cancel: %s g_lock unlock failed, ret=%d\n", + state->h->db->db_name, ret)); + talloc_free(state->h); + tevent_req_error(req, ret); + return; + } + + talloc_free(state->h); + tevent_req_done(req); +} + +bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +int ctdb_transaction_cancel(struct ctdb_transaction_handle *h) +{ + struct tevent_context *ev = h->ev; + struct tevent_req *req; + TALLOC_CTX *mem_ctx; + int ret; + bool status; + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + talloc_free(h); + return ENOMEM; + } + + req = ctdb_transaction_cancel_send(mem_ctx, ev, + tevent_timeval_zero(), h); + if (req == NULL) { + talloc_free(mem_ctx); + talloc_free(h); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_transaction_cancel_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +/* + * TODO: + * + * In future Samba should register SERVER_ID. + * Make that structure same as struct srvid {}. + */ diff --git a/ctdb/client/client_event.c b/ctdb/client/client_event.c new file mode 100644 index 0000000..7111fe7 --- /dev/null +++ b/ctdb/client/client_event.c @@ -0,0 +1,444 @@ +/* + Eventd client api + + Copyright (C) Amitay Isaacs 2016 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" + +#include <talloc.h> +#include <tevent.h> + +#include "lib/util/debug.h" +#include "lib/util/tevent_unix.h" + +#include "common/logging.h" +#include "common/sock_client.h" + +#include "protocol/protocol_api.h" + +#include "client/client_event.h" + +struct ctdb_event_context { + struct sock_client_context *sockc; +}; + +static int ctdb_event_msg_request_push(void *request_data, uint32_t reqid, + TALLOC_CTX *mem_ctx, + uint8_t **buf, size_t *buflen, + void *private_data) +{ + struct ctdb_event_request *request = + (struct ctdb_event_request *)request_data; + int ret; + + sock_packet_header_set_reqid(&request->header, reqid); + + *buflen = ctdb_event_request_len(request); + *buf = talloc_size(mem_ctx, *buflen); + if (*buf == NULL) { + return ENOMEM; + } + + ret = ctdb_event_request_push(request, *buf, buflen); + if (ret != 0) { + return ret; + } + + return 0; +} + +static int ctdb_event_msg_reply_pull(uint8_t *buf, size_t buflen, + TALLOC_CTX *mem_ctx, void **reply_data, + void *private_data) +{ + struct ctdb_event_reply *reply; + int ret; + + reply = talloc_zero(mem_ctx, struct ctdb_event_reply); + if (reply == NULL) { + return ENOMEM; + } + + ret = ctdb_event_reply_pull(buf, buflen, reply, reply); + if (ret != 0) { + talloc_free(reply); + return ret; + } + + *reply_data = reply; + return 0; +} + +static int ctdb_event_msg_reply_reqid(uint8_t *buf, size_t buflen, + uint32_t *reqid, void *private_data) +{ + struct sock_packet_header header; + size_t np; + int ret; + + ret = sock_packet_header_pull(buf, buflen, &header, &np); + if (ret != 0) { + return ret; + } + + *reqid = header.reqid; + return 0; +} + +struct sock_client_proto_funcs event_proto_funcs = { + .request_push = ctdb_event_msg_request_push, + .reply_pull = ctdb_event_msg_reply_pull, + .reply_reqid = ctdb_event_msg_reply_reqid, +}; + + +int ctdb_event_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + const char *sockpath, struct ctdb_event_context **out) +{ + struct ctdb_event_context *eclient; + int ret; + + eclient = talloc_zero(mem_ctx, struct ctdb_event_context); + if (eclient == NULL) { + DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n")); + return ENOMEM; + } + + ret = sock_client_setup(eclient, ev, sockpath, + &event_proto_funcs, eclient, + &eclient->sockc); + if (ret != 0) { + talloc_free(eclient); + return ret; + } + + *out = eclient; + return 0; +} + +void ctdb_event_set_disconnect_callback(struct ctdb_event_context *eclient, + ctdb_client_callback_func_t callback, + void *private_data) +{ + sock_client_set_disconnect_callback(eclient->sockc, + callback, private_data); +} + +/* + * Handle eventd_request and eventd_reply + */ + +struct tevent_req *ctdb_event_msg_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient, + struct ctdb_event_request *request) +{ + struct tevent_req *req; + + req = sock_client_msg_send(mem_ctx, ev, eclient->sockc, + tevent_timeval_zero(), request); + return req; +} + +bool ctdb_event_msg_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, + struct ctdb_event_reply **reply) +{ + void *reply_data; + bool status; + + status = sock_client_msg_recv(req, perr, mem_ctx, &reply_data); + + if (status && reply != NULL) { + *reply = talloc_get_type_abort( + reply_data, struct ctdb_event_reply); + } + + return status; +} + +/* + * Run an event + */ + +struct tevent_req *ctdb_event_run_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient, + enum ctdb_event event, + uint32_t timeout, const char *arg_str) +{ + struct ctdb_event_request request; + struct ctdb_event_request_run rdata; + + rdata.event = event; + rdata.timeout = timeout; + rdata.arg_str = arg_str; + + request.rdata.command = CTDB_EVENT_COMMAND_RUN; + request.rdata.data.run = &rdata; + + return ctdb_event_msg_send(mem_ctx, ev, eclient, &request); +} + +bool ctdb_event_run_recv(struct tevent_req *req, int *perr, int *result) +{ + struct ctdb_event_reply *reply; + int ret; + bool status; + + status = ctdb_event_msg_recv(req, &ret, req, &reply); + if (! status) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (reply->rdata.command != CTDB_EVENT_COMMAND_RUN) { + if (perr != NULL) { + *perr = EPROTO; + } + talloc_free(reply); + return false; + } + + if (result != NULL) { + *result = reply->rdata.result; + } + + talloc_free(reply); + return true; +} + +/* + * Get event status + */ + +struct tevent_req *ctdb_event_status_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient, + enum ctdb_event event, + enum ctdb_event_status_state state) +{ + struct ctdb_event_request request; + struct ctdb_event_request_status rdata; + + rdata.event = event; + rdata.state = state; + + request.rdata.command = CTDB_EVENT_COMMAND_STATUS; + request.rdata.data.status = &rdata; + + return ctdb_event_msg_send(mem_ctx, ev, eclient, &request); +} + +bool ctdb_event_status_recv(struct tevent_req *req, int *perr, + int32_t *result, int *event_status, + TALLOC_CTX *mem_ctx, + struct ctdb_script_list **script_list) +{ + struct ctdb_event_reply *reply; + int ret; + bool status; + + status = ctdb_event_msg_recv(req, &ret, req, &reply); + if (! status) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (reply->rdata.command != CTDB_EVENT_COMMAND_STATUS) { + if (perr != NULL) { + *perr = EPROTO; + } + talloc_free(reply); + return false; + } + + if (result != NULL) { + *result = reply->rdata.result; + } + if (event_status != NULL) { + *event_status = reply->rdata.data.status->status; + } + if (script_list != NULL) { + *script_list = talloc_steal(mem_ctx, + reply->rdata.data.status->script_list); + } + + talloc_free(reply); + return true; +} + +/* + * Get script list + */ + +struct tevent_req *ctdb_event_script_list_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient) +{ + struct ctdb_event_request request; + + request.rdata.command = CTDB_EVENT_COMMAND_SCRIPT_LIST; + + return ctdb_event_msg_send(mem_ctx, ev, eclient, &request); +} + +bool ctdb_event_script_list_recv(struct tevent_req *req, int *perr, + int32_t *result, TALLOC_CTX *mem_ctx, + struct ctdb_script_list **script_list) +{ + struct ctdb_event_reply *reply; + int ret; + bool status; + + status = ctdb_event_msg_recv(req, &ret, req, &reply); + if (! status) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (reply->rdata.command != CTDB_EVENT_COMMAND_SCRIPT_LIST) { + if (perr != NULL) { + *perr = EPROTO; + } + talloc_free(reply); + return false; + } + + if (result != NULL) { + *result = reply->rdata.result; + } + if (script_list != NULL) { + *script_list = talloc_steal(mem_ctx, + reply->rdata.data.script_list->script_list); + } + + talloc_free(reply); + return true; +} + +/* + * Enable a script + */ + +struct tevent_req *ctdb_event_script_enable_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient, + const char *script_name) +{ + struct ctdb_event_request request; + struct ctdb_event_request_script_enable rdata; + + rdata.script_name = script_name; + + request.rdata.command = CTDB_EVENT_COMMAND_SCRIPT_ENABLE; + request.rdata.data.script_enable = &rdata; + + return ctdb_event_msg_send(mem_ctx, ev, eclient, &request); +} + +bool ctdb_event_script_enable_recv(struct tevent_req *req, int *perr, + int *result) +{ + struct ctdb_event_reply *reply; + int ret; + bool status; + + status = ctdb_event_msg_recv(req, &ret, req, &reply); + if (! status) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (reply->rdata.command != CTDB_EVENT_COMMAND_SCRIPT_ENABLE) { + if (perr != NULL) { + *perr = EPROTO; + } + talloc_free(reply); + return false; + } + + if (result != NULL) { + *result = reply->rdata.result; + } + + talloc_free(reply); + return true; +} + +/* + * Disable a script + */ + +struct tevent_req *ctdb_event_script_disable_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_event_context *eclient, + const char *script_name) +{ + struct ctdb_event_request request; + struct ctdb_event_request_script_disable rdata; + + rdata.script_name = script_name; + + request.rdata.command = CTDB_EVENT_COMMAND_SCRIPT_DISABLE; + request.rdata.data.script_disable = &rdata; + + return ctdb_event_msg_send(mem_ctx, ev, eclient, &request); +} + +bool ctdb_event_script_disable_recv(struct tevent_req *req, int *perr, + int *result) +{ + struct ctdb_event_reply *reply; + int ret; + bool status; + + status = ctdb_event_msg_recv(req, &ret, req, &reply); + if (! status) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (reply->rdata.command != CTDB_EVENT_COMMAND_SCRIPT_DISABLE) { + if (perr != NULL) { + *perr = EPROTO; + } + talloc_free(reply); + return false; + } + + if (result != NULL) { + *result = reply->rdata.result; + } + + talloc_free(reply); + return true; +} diff --git a/ctdb/client/client_message.c b/ctdb/client/client_message.c new file mode 100644 index 0000000..c2e975a --- /dev/null +++ b/ctdb/client/client_message.c @@ -0,0 +1,607 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" + + +/* + * Handle REQ_MESSAGE + */ + +struct ctdb_client_message_state { + struct ctdb_client_context *client; + uint32_t reqid; +}; + +static int ctdb_client_message_state_destructor( + struct ctdb_client_message_state *state); +static void ctdb_client_message_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, + struct ctdb_req_message *message) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_message_state *state; + struct ctdb_req_header h; + uint32_t reqid; + uint8_t *buf; + size_t datalen, buflen; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_message_state); + if (req == NULL) { + return NULL; + } + + reqid = reqid_new(client->idr, state); + if (reqid == REQID_INVALID) { + talloc_free(req); + return NULL; + } + + state->client = client; + state->reqid = reqid; + + talloc_set_destructor(state, ctdb_client_message_state_destructor); + + ctdb_req_header_fill(&h, 0, CTDB_REQ_MESSAGE, destnode, + client->pnn, reqid); + + datalen = ctdb_req_message_len(&h, message); + ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_message_push(&h, message, buf, &buflen); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + subreq = comm_write_send(state, ev, client->comm, buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_client_message_done, req); + + return req; +} + +static int ctdb_client_message_state_destructor( + struct ctdb_client_message_state *state) +{ + reqid_remove(state->client->idr, state->reqid); + return 0; +} + +static void ctdb_client_message_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + bool status; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_client_message_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +void ctdb_client_req_message(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid) +{ + struct ctdb_req_header h; + struct ctdb_req_message_data message; + TALLOC_CTX *tmp_ctx = talloc_new(client); + int ret; + + ret = ctdb_req_message_data_pull(buf, buflen, &h, tmp_ctx, &message); + if (ret != 0) { + return; + } + + srvid_dispatch(client->srv, message.srvid, CTDB_SRVID_ALL, + message.data); + talloc_free(tmp_ctx); +} + +/* + * Handle multiple nodes + */ + +struct ctdb_client_message_multi_state { + uint32_t *pnn_list; + int count; + int done; + int err; + int *err_list; +}; + +struct message_index_state { + struct tevent_req *req; + int index; +}; + +static void ctdb_client_message_multi_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_message_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_message_multi_state *state; + int i; + + if (pnn_list == NULL || count == 0) { + return NULL; + } + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_message_multi_state); + if (req == NULL) { + return NULL; + } + + state->pnn_list = pnn_list; + state->count = count; + state->done = 0; + state->err = 0; + state->err_list = talloc_zero_array(state, int, count); + if (tevent_req_nomem(state->err_list, req)) { + return tevent_req_post(req, ev); + } + + for (i=0; i<count; i++) { + struct message_index_state *substate; + + subreq = ctdb_client_message_send(state, ev, client, + pnn_list[i], message); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + substate = talloc(subreq, struct message_index_state); + if (tevent_req_nomem(substate, req)) { + return tevent_req_post(req, ev); + } + + substate->req = req; + substate->index = i; + + tevent_req_set_callback(subreq, ctdb_client_message_multi_done, + substate); + } + + return req; +} + +static void ctdb_client_message_multi_done(struct tevent_req *subreq) +{ + struct message_index_state *substate = tevent_req_callback_data( + subreq, struct message_index_state); + struct tevent_req *req = substate->req; + int idx = substate->index; + struct ctdb_client_message_multi_state *state = tevent_req_data( + req, struct ctdb_client_message_multi_state); + bool status; + int ret; + + status = ctdb_client_message_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + if (state->err == 0) { + state->err = ret; + state->err_list[idx] = state->err; + } + } + + state->done += 1; + + if (state->done == state->count) { + tevent_req_done(req); + } +} + +bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list) +{ + struct ctdb_client_message_multi_state *state = tevent_req_data( + req, struct ctdb_client_message_multi_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + return false; + } + + if (perr != NULL) { + *perr = state->err; + } + + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + + if (state->err != 0) { + return false; + } + + return true; +} + +/* + * sync version of message send + */ + +int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, struct ctdb_req_message *message) +{ + TALLOC_CTX *tmp_ctx; + struct tevent_req *req; + int ret; + bool status; + + tmp_ctx = talloc_new(client); + if (tmp_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_client_message_send(tmp_ctx, ev, client, destnode, message); + if (req == NULL) { + talloc_free(tmp_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_message_recv(req, &ret); + if (! status) { + talloc_free(tmp_ctx); + return ret; + } + + talloc_free(tmp_ctx); + return 0; +} + +int ctdb_client_message_multi(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message, + int **perr_list) +{ + struct tevent_req *req; + bool status; + int ret; + + req = ctdb_client_message_multi_send(mem_ctx, ev, client, + pnn_list, count, + message); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_message_multi_recv(req, &ret, mem_ctx, perr_list); + if (! status) { + return ret; + } + + return 0; +} + +struct ctdb_client_set_message_handler_state { + struct ctdb_client_context *client; + uint64_t srvid; + srvid_handler_fn handler; + void *private_data; +}; + +static void ctdb_client_set_message_handler_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_set_message_handler_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, + srvid_handler_fn handler, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_set_message_handler_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_set_message_handler_state); + if (req == NULL) { + return NULL; + } + + state->client = client; + state->srvid = srvid; + state->handler = handler; + state->private_data = private_data; + + ctdb_req_control_register_srvid(&request, srvid); + subreq = ctdb_client_control_send(state, ev, client, client->pnn, + tevent_timeval_zero(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_client_set_message_handler_done, + req); + + return req; +} + +static void ctdb_client_set_message_handler_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_client_set_message_handler_state *state = tevent_req_data( + req, struct ctdb_client_set_message_handler_state); + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_register_srvid(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_register(state->client->srv, state->client, state->srvid, + state->handler, state->private_data); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_client_set_message_handler_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + return true; +} + +struct ctdb_client_remove_message_handler_state { + struct ctdb_client_context *client; + uint64_t srvid; + void *private_data; +}; + +static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_remove_message_handler_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_remove_message_handler_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_remove_message_handler_state); + if (req == NULL) { + return NULL; + } + + state->client = client; + state->srvid = srvid; + state->private_data = private_data; + + ctdb_req_control_deregister_srvid(&request, srvid); + subreq = ctdb_client_control_send(state, ev, client, client->pnn, + tevent_timeval_zero(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, + ctdb_client_remove_message_handler_done, req); + + return req; +} + +static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_client_remove_message_handler_state *state = tevent_req_data( + req, struct ctdb_client_remove_message_handler_state); + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_deregister_srvid(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_deregister(state->client->srv, state->srvid, + state->private_data); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_client_remove_message_handler_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + return true; +} + +int ctdb_client_set_message_handler(struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, srvid_handler_fn handler, + void *private_data) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_client_set_message_handler_send(mem_ctx, ev, client, + srvid, handler, + private_data); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_set_message_handler_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +int ctdb_client_remove_message_handler(struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t srvid, void *private_data) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_client_remove_message_handler_send(mem_ctx, ev, client, + srvid, private_data); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_client_remove_message_handler_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} diff --git a/ctdb/client/client_message_sync.c b/ctdb/client/client_message_sync.c new file mode 100644 index 0000000..f4d2441 --- /dev/null +++ b/ctdb/client/client_message_sync.c @@ -0,0 +1,197 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/logging.h" + +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client_private.h" +#include "client/client.h" +#include "client/client_sync.h" + +int ctdb_message_recd_update_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_public_ip *pubip) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_RECD_UPDATE_IP; + message.data.pubip = pubip; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message RECD_UPDATE_IP failed to node %u\n", + destnode)); + } + + return ret; +} + +int ctdb_message_mem_dump(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_srvid_message *msg) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_MEM_DUMP; + message.data.msg = msg; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message MEM_DUMP failed to node %u\n", destnode)); + } + + return ret; +} + +int ctdb_message_reload_nodes(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_RELOAD_NODES; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message RELOAD_NODES failed to node %u\n", destnode)); + } + + return ret; +} + +int ctdb_message_takeover_run(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_srvid_message *msg) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_TAKEOVER_RUN; + message.data.msg = msg; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message TAKEOVER_RUN failed to node %u\n", destnode)); + } + + return ret; +} + +int ctdb_message_rebalance_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, uint32_t pnn) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_REBALANCE_NODE; + message.data.pnn = pnn; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message REBALANCE_NODE failed to node %u\n", + destnode)); + } + + return ret; +} + +int ctdb_message_disable_takeover_runs(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct ctdb_disable_message *disable) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_DISABLE_TAKEOVER_RUNS; + message.data.disable = disable; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message DISABLE_TAKEOVER_RUNS failed to node %u\n", + destnode)); + } + + return ret; +} + +int ctdb_message_disable_recoveries(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct ctdb_disable_message *disable) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_DISABLE_RECOVERIES; + message.data.disable = disable; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message DISABLE_RECOVERIES failed to node %u\n", + destnode)); + } + + return ret; +} + +int ctdb_message_disable_ip_check(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, uint32_t timeout) +{ + struct ctdb_req_message message; + int ret; + + message.srvid = CTDB_SRVID_DISABLE_IP_CHECK; + message.data.timeout = timeout; + + ret = ctdb_client_message(mem_ctx, ev, client, destnode, &message); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Message DISABLE_IP_CHECK failed to node %u\n", + destnode)); + } + + return ret; +} diff --git a/ctdb/client/client_private.h b/ctdb/client/client_private.h new file mode 100644 index 0000000..0bb2ad5 --- /dev/null +++ b/ctdb/client/client_private.h @@ -0,0 +1,99 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __CTDB_CLIENT_PRIVATE_H__ +#define __CTDB_CLIENT_PRIVATE_H__ + +#include "protocol/protocol.h" +#include "client/client.h" + +struct ctdb_db_context { + struct ctdb_db_context *prev, *next; + uint32_t db_id; + uint8_t db_flags; + const char *db_name; + const char *db_path; + struct tdb_wrap *ltdb; +}; + +struct ctdb_client_context { + struct reqid_context *idr; + struct srvid_context *srv; + struct srvid_context *tunnels; + struct comm_context *comm; + ctdb_client_callback_func_t callback; + void *private_data; + int fd; + uint32_t pnn; + struct ctdb_db_context *db; +}; + +struct ctdb_record_handle { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + struct ctdb_ltdb_header header; + TDB_DATA key; + TDB_DATA data; /* This is returned from tdb_fetch() */ + bool readonly; +}; + +struct ctdb_transaction_handle { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db, *db_g_lock; + struct ctdb_rec_buffer *recbuf; + struct ctdb_server_id sid; + const char *lock_name; + bool readonly; + bool updated; +}; + +struct ctdb_tunnel_context { + struct ctdb_client_context *client; + uint64_t tunnel_id; + ctdb_tunnel_callback_func_t callback; + void *private_data; +}; + +/* From client_call.c */ + +void ctdb_client_reply_call(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid); + +/* From client_db.c */ + +struct tdb_context *client_db_tdb(struct ctdb_db_context *db); + +/* From client_message.c */ + +void ctdb_client_req_message(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid); + +/* From client_control.c */ + +void ctdb_client_reply_control(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid); + +/* From client_tunnel.c */ + +void ctdb_client_req_tunnel(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid); + +#endif /* __CTDB_CLIENT_PRIVATE_H__ */ diff --git a/ctdb/client/client_sync.h b/ctdb/client/client_sync.h new file mode 100644 index 0000000..5b0ff42 --- /dev/null +++ b/ctdb/client/client_sync.h @@ -0,0 +1,526 @@ +/* + CTDB client code - sync api + + Copyright (C) Amitay Isaacs 2017 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __CTDB_CLIENT_SYNC_H__ +#define __CTDB_CLIENT_SYNC_H__ + +#include <talloc.h> +#include <tevent.h> + +/* from client/client_control_sync.c */ + +int ctdb_ctrl_process_exists(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + pid_t pid, int *status); + +int ctdb_ctrl_statistics(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_statistics **stats); + +int ctdb_ctrl_ping(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *num_clients); + +int ctdb_ctrl_getdbpath(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, const char **db_path); + +int ctdb_ctrl_getvnnmap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_vnn_map **vnnmap); + +int ctdb_ctrl_getdebug(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *loglevel); + +int ctdb_ctrl_setdebug(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int loglevel); + +int ctdb_ctrl_get_dbmap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_dbid_map **dbmap); + +int ctdb_ctrl_get_recmode(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int *recmode); + +int ctdb_ctrl_set_recmode(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int recmode); + +int ctdb_ctrl_statistics_reset(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_db_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id); + +int ctdb_ctrl_traverse_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start *traverse); + +int ctdb_ctrl_register_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid); + +int ctdb_ctrl_deregister_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid); + +int ctdb_ctrl_get_dbname(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, const char **db_name); + +int ctdb_ctrl_enable_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_update_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_dump_memory(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char **mem_str); + +int ctdb_ctrl_get_pid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + pid_t *pid); + +int ctdb_ctrl_freeze(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + int priority); + +int ctdb_ctrl_get_pnn(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t *pnn); + +int ctdb_ctrl_shutdown(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_tcp_add(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn); + +int ctdb_ctrl_tcp_remove(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn); + +int ctdb_ctrl_set_tunable(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tunable *tunable); + +int ctdb_ctrl_get_tunable(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *var, uint32_t *value); + +int ctdb_ctrl_list_tunables(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_var_list **var_list); + +int ctdb_ctrl_modify_flags(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t pnn, uint32_t old_flags, + uint32_t new_flags); + +int ctdb_ctrl_get_all_tunables(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tunable_list **tun_list); + +int ctdb_ctrl_get_tcp_tickle_list(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + ctdb_sock_addr *addr, + struct ctdb_tickle_list **tickles); + +int ctdb_ctrl_set_tcp_tickle_list(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_tickle_list *tickles); + +int ctdb_ctrl_db_attach_persistent(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id); + +int ctdb_ctrl_send_gratuitous_arp(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info); + +int ctdb_ctrl_wipe_database(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint32_t tid); + +int ctdb_ctrl_uptime(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_uptime **uptime); + +int ctdb_ctrl_start_recovery(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_end_recovery(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_reload_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_add_public_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info); + +int ctdb_ctrl_del_public_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_addr_info *addr_info); + +int ctdb_ctrl_get_capabilities(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t *caps); + +int ctdb_ctrl_release_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_public_ip *pubip); + +int ctdb_ctrl_takeover_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_public_ip *pubip); + +int ctdb_ctrl_get_public_ips(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + bool available_only, + struct ctdb_public_ip_list **pubip_list); + +int ctdb_ctrl_get_nodemap(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_node_map **nodemap); + +int ctdb_ctrl_traverse_kill(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start *traverse); + +int ctdb_ctrl_get_reclock_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char **reclock_file); + +int ctdb_ctrl_stop_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_continue_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_set_lmasterrole(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t lmaster_role); + +int ctdb_ctrl_set_recmasterrole(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t recmaster_role); + +int ctdb_ctrl_set_ban_state(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_ban_state *ban_state); + +int ctdb_ctrl_get_ban_state(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_ban_state **ban_state); + +int ctdb_ctrl_register_notify(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_notify_data *notify); + +int ctdb_ctrl_deregister_notify(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t srvid); + +int ctdb_ctrl_trans3_commit(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_rec_buffer *recbuf); + +int ctdb_ctrl_get_db_seqnum(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint64_t *seqnum); + +int ctdb_ctrl_db_set_healthy(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_db_get_health(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, const char **reason); + +int ctdb_ctrl_get_public_ip_info(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + ctdb_sock_addr *addr, + struct ctdb_public_ip_info **ipinfo); + +int ctdb_ctrl_get_ifaces(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_iface_list **iface_list); + +int ctdb_ctrl_set_iface_link_state(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_iface *iface); + +int ctdb_ctrl_tcp_add_delayed_update(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_connection *conn); + +int ctdb_ctrl_get_stat_history(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_statistics_list **stats_list); + +int ctdb_ctrl_schedule_for_deletion(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_key_data *key); + +int ctdb_ctrl_set_db_readonly(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_traverse_start_ext(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_traverse_start_ext *traverse); + +int ctdb_ctrl_get_db_statistics(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, + struct ctdb_db_statistics **dbstats); + +int ctdb_ctrl_set_db_sticky(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_reload_public_ips(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_ipreallocated(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout); + +int ctdb_ctrl_get_runstate(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + enum ctdb_runstate *runstate); + +int ctdb_ctrl_db_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_get_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_node_map **nodemap); + +int ctdb_ctrl_db_freeze(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, uint32_t db_id); + +int ctdb_ctrl_db_thaw(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, uint32_t db_id); + +int ctdb_ctrl_db_transaction_start(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_transdb *transdb); + +int ctdb_ctrl_db_transaction_commit(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_transdb *transdb); + +int ctdb_ctrl_db_transaction_cancel(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id); + +int ctdb_ctrl_db_pull(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pulldb_ext *pulldb, uint32_t *num_records); + +int ctdb_ctrl_db_push_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pulldb_ext *pulldb); + +int ctdb_ctrl_db_push_confirm(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, uint32_t *num_records); + +int ctdb_ctrl_db_open_flags(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint32_t db_id, int *tdb_flags); + +int ctdb_ctrl_db_attach_replicated(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + const char *db_name, uint32_t *db_id); + +int ctdb_ctrl_check_pid_srvid(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + struct ctdb_pid_srvid *pid_srvid, int *status); + +int ctdb_ctrl_tunnel_register(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t tunnel_id); + +int ctdb_ctrl_tunnel_deregister(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct timeval timeout, + uint64_t tunnel_id); + +int ctdb_ctrl_disable_node(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct timeval timeout); + +int ctdb_ctrl_enable_node(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct timeval timeout); + +/* from client/client_message_sync.c */ + +int ctdb_message_recd_update_ip(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_public_ip *pubip); + +int ctdb_message_mem_dump(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_srvid_message *msg); + +int ctdb_message_reload_nodes(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode); + +int ctdb_message_takeover_run(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, struct ctdb_srvid_message *msg); + +int ctdb_message_rebalance_node(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, uint32_t pnn); + +int ctdb_message_disable_takeover_runs(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct ctdb_disable_message *disable); + +int ctdb_message_disable_recoveries(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, + struct ctdb_disable_message *disable); + +int ctdb_message_disable_ip_check(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + int destnode, uint32_t timeout); + +#endif /* __CTDB_CLIENT_SYNC_H__ */ diff --git a/ctdb/client/client_tunnel.c b/ctdb/client/client_tunnel.c new file mode 100644 index 0000000..13c35fb --- /dev/null +++ b/ctdb/client/client_tunnel.c @@ -0,0 +1,693 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2016 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" + + +struct ctdb_tunnel_data { + struct ctdb_req_header hdr; + struct ctdb_req_tunnel *tunnel; + uint32_t reqid; +}; + +/* + * Tunnel setup and destroy + */ + +struct ctdb_tunnel_setup_state { + struct ctdb_client_context *client; + struct ctdb_tunnel_context *tctx; + uint64_t tunnel_id; +}; + +static void ctdb_tunnel_setup_register_done(struct tevent_req *subreq); +static void ctdb_tunnel_handler(uint64_t tunnel_id, TDB_DATA data, + void *private_data); + +struct tevent_req *ctdb_tunnel_setup_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_setup_state *state; + struct ctdb_tunnel_context *tctx; + struct ctdb_req_control request; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_setup_state); + if (req == NULL) { + return NULL; + } + + tctx = talloc_zero(client, struct ctdb_tunnel_context); + if (tevent_req_nomem(tctx, req)) { + return tevent_req_post(req, ev); + } + + tctx->client = client; + tctx->tunnel_id = tunnel_id; + tctx->callback = callback; + tctx->private_data = private_data; + + state->client = client; + state->tunnel_id = tunnel_id; + state->tctx = tctx; + + ret = srvid_exists(client->tunnels, tunnel_id, NULL); + if (ret == 0) { + tevent_req_error(req, EEXIST); + return tevent_req_post(req, ev); + } + + ctdb_req_control_tunnel_register(&request, tunnel_id); + subreq = ctdb_client_control_send(state, ev, client, + ctdb_client_pnn(client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_setup_register_done, req); + + return req; +} + +static void ctdb_tunnel_setup_register_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_setup_state *state = tevent_req_data( + req, struct ctdb_tunnel_setup_state); + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_tunnel_register(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_register(state->client->tunnels, state->client, + state->tunnel_id, + ctdb_tunnel_handler, state->tctx); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static void ctdb_tunnel_handler(uint64_t tunnel_id, TDB_DATA data, + void *private_data) +{ + struct ctdb_tunnel_context *tctx = talloc_get_type_abort( + private_data, struct ctdb_tunnel_context); + struct ctdb_tunnel_data *tunnel_data; + + if (tctx->tunnel_id != tunnel_id) { + return; + } + + if (data.dsize != sizeof(struct ctdb_tunnel_data)) { + return; + } + + tunnel_data = (struct ctdb_tunnel_data *)data.dptr; + + tctx->callback(tctx, tunnel_data->hdr.srcnode, tunnel_data->reqid, + tunnel_data->tunnel->data.dptr, + tunnel_data->tunnel->data.dsize, tctx->private_data); +} + +bool ctdb_tunnel_setup_recv(struct tevent_req *req, int *perr, + struct ctdb_tunnel_context **result) +{ + struct ctdb_tunnel_setup_state *state = tevent_req_data( + req, struct ctdb_tunnel_setup_state); + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + *result = state->tctx; + return true; +} + +int ctdb_tunnel_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, void *private_data, + struct ctdb_tunnel_context **result) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_setup_send(mem_ctx, ev, client, tunnel_id, + callback, private_data); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_setup_recv(req, &ret, result); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +struct ctdb_tunnel_destroy_state { + struct ctdb_tunnel_context *tctx; +}; + +static void ctdb_tunnel_destroy_deregister_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_destroy_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_destroy_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_destroy_state); + if (req == NULL) { + return NULL; + } + + state->tctx = tctx; + + ctdb_req_control_tunnel_deregister(&request, tctx->tunnel_id); + subreq = ctdb_client_control_send(state, ev, tctx->client, + ctdb_client_pnn(tctx->client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_destroy_deregister_done, + req); + + return req; +} + +static void ctdb_tunnel_destroy_deregister_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_destroy_state *state = tevent_req_data( + req, struct ctdb_tunnel_destroy_state); + struct ctdb_client_context *client = state->tctx->client; + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_tunnel_deregister(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_deregister(client->tunnels, state->tctx->tunnel_id, + state->tctx); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_tunnel_destroy_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + return true; +} + + +int ctdb_tunnel_destroy(struct tevent_context *ev, + struct ctdb_tunnel_context *tctx) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_destroy_send(ev, ev, tctx); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_destroy_recv(req, &ret); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +/* + * Callback when REQ_TUNNEL packet is received + */ + +static void ctdb_tunnel_request_reply(struct tevent_req *req, + struct ctdb_tunnel_data *tunnel_data); + +void ctdb_client_req_tunnel(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid) +{ + TALLOC_CTX *tmp_ctx = talloc_new(client); + struct ctdb_req_header h; + struct ctdb_req_tunnel *tunnel; + struct tevent_req *req; + struct ctdb_tunnel_data tunnel_data; + int ret; + + tunnel = talloc_zero(tmp_ctx, struct ctdb_req_tunnel); + if (tunnel == NULL) { + goto fail; + } + + ret = ctdb_req_tunnel_pull(buf, buflen, &h, tmp_ctx, tunnel); + if (ret != 0) { + goto fail; + } + + tunnel_data = (struct ctdb_tunnel_data) { + .hdr = h, + .tunnel = tunnel, + .reqid = reqid, + }; + + if (tunnel->flags & CTDB_TUNNEL_FLAG_REPLY) { + req = reqid_find(client->idr, reqid, struct tevent_req); + if (req == NULL) { + goto fail; + } + + ctdb_tunnel_request_reply(req, &tunnel_data); + + } else if (tunnel->flags & CTDB_TUNNEL_FLAG_REQUEST) { + + TDB_DATA data = { + .dsize = sizeof(struct ctdb_tunnel_data), + .dptr = (uint8_t *)&tunnel_data, + }; + + srvid_dispatch(client->tunnels, tunnel->tunnel_id, 0, data); + } + +fail: + TALLOC_FREE(tmp_ctx); +} + + +/* + * Send messages using tunnel + */ + +struct ctdb_tunnel_request_state { + struct ctdb_tunnel_context *tctx; + bool wait_for_reply; + uint32_t reqid; + struct ctdb_req_tunnel *tunnel; +}; + +static int ctdb_tunnel_request_state_destructor( + struct ctdb_tunnel_request_state *state); +static void ctdb_tunnel_request_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_request_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + uint32_t destnode, + struct timeval timeout, + uint8_t *buf, size_t buflen, + bool wait_for_reply) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_request_state *state; + struct ctdb_req_tunnel tunnel; + struct ctdb_req_header h; + uint8_t *pkt; + size_t datalen, pkt_len; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_request_state); + if (req == NULL) { + return NULL; + } + + state->tctx = tctx; + state->wait_for_reply = wait_for_reply; + state->reqid = reqid_new(tctx->client->idr, req); + if (state->reqid == REQID_INVALID) { + talloc_free(req); + return NULL; + } + + talloc_set_destructor(state, ctdb_tunnel_request_state_destructor); + + tunnel = (struct ctdb_req_tunnel) { + .tunnel_id = state->tctx->tunnel_id, + .flags = CTDB_TUNNEL_FLAG_REQUEST, + .data = (TDB_DATA) { + .dptr = buf, + .dsize = buflen, + }, + }; + + if (destnode == CTDB_BROADCAST_ALL || + destnode == CTDB_BROADCAST_ACTIVE || + destnode == CTDB_BROADCAST_CONNECTED) { + state->wait_for_reply = false; + } + if (! state->wait_for_reply) { + tunnel.flags |= CTDB_TUNNEL_FLAG_NOREPLY; + } + + ctdb_req_header_fill(&h, 0, CTDB_REQ_TUNNEL, destnode, + ctdb_client_pnn(state->tctx->client), + state->reqid); + + datalen = ctdb_req_tunnel_len(&h, &tunnel); + ret = ctdb_allocate_pkt(state, datalen, &pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_tunnel_push(&h, &tunnel, pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (!tevent_timeval_is_zero(&timeout)) { + if (!tevent_req_set_endtime(req, ev, timeout)) { + return tevent_req_post(req, ev); + } + } + + subreq = comm_write_send(state, ev, tctx->client->comm, + pkt, pkt_len); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_request_done, req); + + return req; +} + +static int ctdb_tunnel_request_state_destructor( + struct ctdb_tunnel_request_state *state) +{ + reqid_remove(state->tctx->client->idr, state->reqid); + return 0; +} + +static void ctdb_tunnel_request_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + int ret; + bool status; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + if (! state->wait_for_reply) { + tevent_req_done(req); + } + + /* Wait for the reply or timeout */ +} + +static void ctdb_tunnel_request_reply(struct tevent_req *req, + struct ctdb_tunnel_data *tunnel_data) +{ + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + + if (tunnel_data->reqid != state->reqid) { + return; + } + + state->tunnel = talloc_steal(state, tunnel_data->tunnel); + tevent_req_done(req); +} + +bool ctdb_tunnel_request_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, uint8_t **buf, + size_t *buflen) +{ + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (state->wait_for_reply) { + if (buf != NULL) { + *buf = talloc_steal(mem_ctx, state->tunnel->data.dptr); + } + if (buflen != NULL) { + *buflen = state->tunnel->data.dsize; + } + } + + return true; +} + +int ctdb_tunnel_request(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, uint32_t destnode, + struct timeval timeout, uint8_t *buf, size_t buflen, + bool wait_for_reply) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_request_send(mem_ctx, ev, tctx, destnode, + timeout, buf, buflen, wait_for_reply); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_request_recv(req, &ret, NULL, NULL, NULL); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +struct ctdb_tunnel_reply_state { +}; + +static void ctdb_tunnel_reply_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_reply_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + uint32_t destnode, uint32_t reqid, + struct timeval timeout, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_reply_state *state; + struct ctdb_req_tunnel tunnel; + struct ctdb_req_header h; + uint8_t *pkt; + size_t datalen, pkt_len; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_reply_state); + if (req == NULL) { + return NULL; + } + + tunnel = (struct ctdb_req_tunnel) { + .tunnel_id = tctx->tunnel_id, + .flags = CTDB_TUNNEL_FLAG_REPLY, + .data = (TDB_DATA) { + .dptr = buf, + .dsize = buflen, + }, + }; + + ctdb_req_header_fill(&h, 0, CTDB_REQ_TUNNEL, destnode, + ctdb_client_pnn(tctx->client), reqid); + + datalen = ctdb_req_tunnel_len(&h, &tunnel); + ret = ctdb_allocate_pkt(state, datalen, &pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_tunnel_push(&h, &tunnel, pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (!tevent_timeval_is_zero(&timeout)) { + if (!tevent_req_set_endtime(req, ev, timeout)) { + return tevent_req_post(req, ev); + } + } + + subreq = comm_write_send(state, ev, tctx->client->comm, pkt, pkt_len); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_reply_done, req); + + return req; +} + +static void ctdb_tunnel_reply_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + bool status; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_tunnel_reply_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_tunnel_reply(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, uint32_t destnode, + uint32_t reqid, struct timeval timeout, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_reply_send(mem_ctx, ev, tctx, destnode, reqid, + timeout, buf, buflen); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_reply_recv(req, &ret); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} diff --git a/ctdb/client/client_util.c b/ctdb/client/client_util.c new file mode 100644 index 0000000..35323ff --- /dev/null +++ b/ctdb/client/client_util.c @@ -0,0 +1,137 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/logging.h" + +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client_private.h" +#include "client/client.h" +#include "client/client_sync.h" + +int list_of_nodes(struct ctdb_node_map *nodemap, + uint32_t flags_mask, uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list) +{ + int num_nodes = 0; + uint32_t *list; + unsigned int i; + + /* Allocate the list of same number of nodes */ + list = talloc_array(mem_ctx, uint32_t, nodemap->num); + if (list == NULL) { + return -1; + } + + for (i=0; i<nodemap->num; i++) { + if (nodemap->node[i].flags & flags_mask) { + continue; + } + if (nodemap->node[i].pnn == exclude_pnn) { + continue; + } + list[num_nodes] = nodemap->node[i].pnn; + num_nodes++; + } + + *pnn_list = list; + return num_nodes; +} + +int list_of_active_nodes(struct ctdb_node_map *nodemap, uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list) +{ + return list_of_nodes(nodemap, NODE_FLAGS_INACTIVE, exclude_pnn, + mem_ctx, pnn_list); +} + +int list_of_connected_nodes(struct ctdb_node_map *nodemap, + uint32_t exclude_pnn, + TALLOC_CTX *mem_ctx, uint32_t **pnn_list) +{ + return list_of_nodes(nodemap, NODE_FLAGS_DISCONNECTED, exclude_pnn, + mem_ctx, pnn_list); +} + +struct ctdb_server_id ctdb_client_get_server_id( + struct ctdb_client_context *client, + uint32_t task_id) +{ + struct ctdb_server_id sid; + + sid.pid = getpid(); + sid.task_id = task_id; + sid.vnn = ctdb_client_pnn(client); + sid.unique_id = task_id; + sid.unique_id = (sid.unique_id << 32) | sid.pid; + + return sid; +} + +bool ctdb_server_id_equal(struct ctdb_server_id *sid1, + struct ctdb_server_id *sid2) +{ + if (sid1->pid != sid2->pid) { + return false; + } + if (sid1->task_id != sid2->task_id) { + return false; + } + if (sid1->vnn != sid2->vnn) { + return false; + } + if (sid1->unique_id != sid2->unique_id) { + return false; + } + + return true; +} + +int ctdb_server_id_exists(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_server_id *sid, bool *exists) +{ + int result; + int ret; + + ret = ctdb_ctrl_process_exists(mem_ctx, ev, client, sid->vnn, + tevent_timeval_zero(), + sid->pid, &result); + if (ret != 0) { + return ret; + } + + if (result == 1) { + *exists = true; + } else { + *exists = false; + } + + return 0; +} |