summaryrefslogtreecommitdiffstats
path: root/mgmtd/mgmt_txn.c
diff options
context:
space:
mode:
Diffstat (limited to 'mgmtd/mgmt_txn.c')
-rw-r--r--mgmtd/mgmt_txn.c368
1 files changed, 324 insertions, 44 deletions
diff --git a/mgmtd/mgmt_txn.c b/mgmtd/mgmt_txn.c
index c1cb33f..0f0cccb 100644
--- a/mgmtd/mgmt_txn.c
+++ b/mgmtd/mgmt_txn.c
@@ -29,8 +29,8 @@ enum mgmt_txn_event {
MGMTD_TXN_PROC_COMMITCFG,
MGMTD_TXN_PROC_GETCFG,
MGMTD_TXN_PROC_GETTREE,
+ MGMTD_TXN_PROC_RPC,
MGMTD_TXN_COMMITCFG_TIMEOUT,
- MGMTD_TXN_GETTREE_TIMEOUT,
};
PREDECL_LIST(mgmt_txn_reqs);
@@ -49,7 +49,6 @@ struct mgmt_set_cfg_req {
enum mgmt_commit_phase {
MGMTD_COMMIT_PHASE_PREPARE_CFG = 0,
MGMTD_COMMIT_PHASE_TXN_CREATE,
- MGMTD_COMMIT_PHASE_SEND_CFG,
MGMTD_COMMIT_PHASE_APPLY_CFG,
MGMTD_COMMIT_PHASE_TXN_DELETE,
MGMTD_COMMIT_PHASE_MAX
@@ -62,8 +61,6 @@ static inline const char *mgmt_commit_phase2str(enum mgmt_commit_phase cmt_phase
return "PREP-CFG";
case MGMTD_COMMIT_PHASE_TXN_CREATE:
return "CREATE-TXN";
- case MGMTD_COMMIT_PHASE_SEND_CFG:
- return "SEND-CFG";
case MGMTD_COMMIT_PHASE_APPLY_CFG:
return "APPLY-CFG";
case MGMTD_COMMIT_PHASE_TXN_DELETE:
@@ -95,6 +92,11 @@ DECLARE_LIST(mgmt_txn_batches, struct mgmt_txn_be_cfg_batch, list_linkage);
#define FOREACH_TXN_CFG_BATCH_IN_LIST(list, batch) \
frr_each_safe (mgmt_txn_batches, list, batch)
+struct mgmt_edit_req {
+ char xpath_created[XPATH_MAXLEN];
+ bool unlock;
+};
+
struct mgmt_commit_cfg_req {
Mgmtd__DatastoreId src_ds_id;
struct mgmt_ds_ctx *src_ds_ctx;
@@ -113,6 +115,12 @@ struct mgmt_commit_cfg_req {
enum mgmt_commit_phase be_phase[MGMTD_BE_CLIENT_ID_MAX];
/*
+ * Additional information when the commit is triggered by native edit
+ * request.
+ */
+ struct mgmt_edit_req *edit;
+
+ /*
* Set of config changes to commit. This is used only
* when changes are NOT to be determined by comparing
* candidate and running DSs. This is typically used
@@ -181,6 +189,15 @@ struct txn_req_get_tree {
struct lyd_node *client_results; /* result tree from clients */
};
+struct txn_req_rpc {
+ char *xpath; /* xpath of rpc/action to invoke */
+ uint64_t sent_clients; /* Bitmask of clients sent req to */
+ uint64_t recv_clients; /* Bitmask of clients recv reply from */
+ uint8_t result_type; /* LYD_FORMAT for results */
+ char *errstr; /* error string */
+ struct lyd_node *client_results; /* result tree from clients */
+};
+
struct mgmt_txn_req {
struct mgmt_txn_ctx *txn;
enum mgmt_txn_event req_event;
@@ -189,6 +206,7 @@ struct mgmt_txn_req {
struct mgmt_set_cfg_req *set_cfg;
struct mgmt_get_data_req *get_data;
struct txn_req_get_tree *get_tree;
+ struct txn_req_rpc *rpc;
struct mgmt_commit_cfg_req commit_cfg;
} req;
@@ -214,6 +232,7 @@ struct mgmt_txn_ctx {
struct event *proc_get_tree;
struct event *comm_cfg_timeout;
struct event *get_tree_timeout;
+ struct event *rpc_timeout;
struct event *clnup;
/* List of backend adapters involved in this transaction */
@@ -246,6 +265,10 @@ struct mgmt_txn_ctx {
*/
struct mgmt_txn_reqs_head get_tree_reqs;
/*
+ * List of pending rpc requests.
+ */
+ struct mgmt_txn_reqs_head rpc_reqs;
+ /*
* There will always be one commit-config allowed for a given
* transaction/session. No need to maintain lists for it.
*/
@@ -409,8 +432,16 @@ static struct mgmt_txn_req *mgmt_txn_req_alloc(struct mgmt_txn_ctx *txn,
" session-id: %" PRIu64,
txn_req->req_id, txn->txn_id, txn->session_id);
break;
+ case MGMTD_TXN_PROC_RPC:
+ txn_req->req.rpc = XCALLOC(MTYPE_MGMTD_TXN_RPC_REQ,
+ sizeof(struct txn_req_rpc));
+ assert(txn_req->req.rpc);
+ mgmt_txn_reqs_add_tail(&txn->rpc_reqs, txn_req);
+ __dbg("Added a new RPC req-id: %" PRIu64 " txn-id: %" PRIu64
+ " session-id: %" PRIu64,
+ txn_req->req_id, txn->txn_id, txn->session_id);
+ break;
case MGMTD_TXN_COMMITCFG_TIMEOUT:
- case MGMTD_TXN_GETTREE_TIMEOUT:
break;
}
@@ -449,6 +480,8 @@ static void mgmt_txn_req_free(struct mgmt_txn_req **txn_req)
cleanup = (ccreq->phase >= MGMTD_COMMIT_PHASE_TXN_CREATE &&
ccreq->phase < MGMTD_COMMIT_PHASE_TXN_DELETE);
+ XFREE(MTYPE_MGMTD_TXN_REQ, ccreq->edit);
+
FOREACH_MGMTD_BE_CLIENT_ID (id) {
/*
* Send TXN_DELETE to cleanup state for this
@@ -498,8 +531,16 @@ static void mgmt_txn_req_free(struct mgmt_txn_req **txn_req)
XFREE(MTYPE_MGMTD_XPATH, (*txn_req)->req.get_tree->xpath);
XFREE(MTYPE_MGMTD_TXN_GETTREE_REQ, (*txn_req)->req.get_tree);
break;
+ case MGMTD_TXN_PROC_RPC:
+ __dbg("Deleting RPC req-id: %" PRIu64 " txn-id: %" PRIu64,
+ (*txn_req)->req_id, (*txn_req)->txn->txn_id);
+ req_list = &(*txn_req)->txn->rpc_reqs;
+ lyd_free_all((*txn_req)->req.rpc->client_results);
+ XFREE(MTYPE_MGMTD_ERR, (*txn_req)->req.rpc->errstr);
+ XFREE(MTYPE_MGMTD_XPATH, (*txn_req)->req.rpc->xpath);
+ XFREE(MTYPE_MGMTD_TXN_RPC_REQ, (*txn_req)->req.rpc);
+ break;
case MGMTD_TXN_COMMITCFG_TIMEOUT:
- case MGMTD_TXN_GETTREE_TIMEOUT:
break;
}
@@ -610,7 +651,8 @@ static void mgmt_txn_process_set_cfg(struct event *thread)
->dst_ds_id,
txn_req->req.set_cfg
->dst_ds_ctx,
- false, false, true);
+ false, false, true,
+ NULL);
if (mm->perf_stats_en)
gettimeofday(&cmt_stats->last_start, NULL);
@@ -661,7 +703,8 @@ static int mgmt_txn_send_commit_cfg_reply(struct mgmt_txn_ctx *txn,
* b/c right now that is special cased.. that special casing should be
* removed; however...
*/
- if (!txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id &&
+ if (!txn->commit_cfg_req->req.commit_cfg.edit &&
+ !txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id &&
!txn->commit_cfg_req->req.commit_cfg.rollback &&
mgmt_fe_send_commit_cfg_reply(txn->session_id, txn->txn_id,
txn->commit_cfg_req->req.commit_cfg
@@ -677,7 +720,8 @@ static int mgmt_txn_send_commit_cfg_reply(struct mgmt_txn_ctx *txn,
txn->txn_id, txn->session_id);
}
- if (txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id &&
+ if (!txn->commit_cfg_req->req.commit_cfg.edit &&
+ txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id &&
!txn->commit_cfg_req->req.commit_cfg.rollback &&
mgmt_fe_send_set_cfg_reply(txn->session_id, txn->txn_id,
txn->commit_cfg_req->req.commit_cfg
@@ -691,6 +735,21 @@ static int mgmt_txn_send_commit_cfg_reply(struct mgmt_txn_ctx *txn,
txn->txn_id, txn->session_id);
}
+ if (txn->commit_cfg_req->req.commit_cfg.edit &&
+ mgmt_fe_adapter_send_edit_reply(txn->session_id, txn->txn_id,
+ txn->commit_cfg_req->req_id,
+ txn->commit_cfg_req->req.commit_cfg
+ .edit->unlock,
+ true,
+ txn->commit_cfg_req->req.commit_cfg
+ .edit->xpath_created,
+ success ? 0 : -1,
+ error_if_any) != 0) {
+ __log_err("Failed to send EDIT-REPLY txn-id: %" PRIu64
+ " session-id: %" PRIu64,
+ txn->txn_id, txn->session_id);
+ }
+
if (success) {
/* Stop the commit-timeout timer */
/* XXX why only on success? */
@@ -855,7 +914,9 @@ static int mgmt_txn_create_config_batches(struct mgmt_txn_req *txn_req,
__dbg("XPATH: %s, Value: '%s'", xpath, value ? value : "NIL");
- clients = mgmt_be_interested_clients(xpath, true);
+ clients =
+ mgmt_be_interested_clients(xpath,
+ MGMT_BE_XPATH_SUBSCR_TYPE_CFG);
chg_clients = 0;
@@ -919,8 +980,8 @@ static int mgmt_txn_create_config_batches(struct mgmt_txn_req *txn_req,
}
if (!chg_clients)
- __log_err("No connected daemon is interested in XPATH %s",
- xpath);
+ __dbg("Daemons interested in XPATH are not currently connected: %s",
+ xpath);
cmtcfg_req->clients |= chg_clients;
@@ -931,7 +992,7 @@ static int mgmt_txn_create_config_batches(struct mgmt_txn_req *txn_req,
if (!num_chgs) {
(void)mgmt_txn_send_commit_cfg_reply(txn_req->txn,
MGMTD_NO_CFG_CHANGES,
- "No changes found to commit!");
+ "No connected daemons interested in changes");
return -1;
}
@@ -1183,13 +1244,10 @@ static int mgmt_txn_send_be_cfg_data(struct mgmt_txn_ctx *txn,
cmtcfg_req->cmt_stats->last_num_cfgdata_reqs++;
}
- cmtcfg_req->be_phase[adapter->id] = MGMTD_COMMIT_PHASE_SEND_CFG;
-
/*
- * This could be the last Backend Client to send CFGDATA_CREATE_REQ to.
- * Try moving the commit to next phase.
+ * We don't advance the phase here, instead that is driven by the
+ * cfg_reply.
*/
- mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req);
return 0;
}
@@ -1284,6 +1342,33 @@ static int txn_get_tree_data_done(struct mgmt_txn_ctx *txn,
return ret;
}
+static int txn_rpc_done(struct mgmt_txn_ctx *txn, struct mgmt_txn_req *txn_req)
+{
+ struct txn_req_rpc *rpc = txn_req->req.rpc;
+ uint64_t req_id = txn_req->req_id;
+
+ /* cancel timer and send reply onward */
+ EVENT_OFF(txn->rpc_timeout);
+
+ if (rpc->errstr)
+ mgmt_fe_adapter_txn_error(txn->txn_id, req_id, false, -1,
+ rpc->errstr);
+ else if (mgmt_fe_adapter_send_rpc_reply(txn->session_id, txn->txn_id,
+ req_id, rpc->result_type,
+ rpc->client_results)) {
+ __log_err("Error sending the results of RPC for txn-id %" PRIu64
+ " req_id %" PRIu64 " to requested type %u",
+ txn->txn_id, req_id, rpc->result_type);
+
+ (void)mgmt_fe_adapter_txn_error(txn->txn_id, req_id, false, -1,
+ "Error converting results of RPC");
+ }
+
+ /* we're done with the request */
+ mgmt_txn_req_free(&txn_req);
+
+ return 0;
+}
static void txn_get_tree_timeout(struct event *thread)
{
@@ -1311,6 +1396,31 @@ static void txn_get_tree_timeout(struct event *thread)
txn_get_tree_data_done(txn, txn_req);
}
+static void txn_rpc_timeout(struct event *thread)
+{
+ struct mgmt_txn_ctx *txn;
+ struct mgmt_txn_req *txn_req;
+
+ txn_req = (struct mgmt_txn_req *)EVENT_ARG(thread);
+ txn = txn_req->txn;
+
+ assert(txn);
+ assert(txn->type == MGMTD_TXN_TYPE_RPC);
+
+ __log_err("Backend timeout txn-id: %" PRIu64 " ending rpc", txn->txn_id);
+
+ /*
+ * Send a get-tree data reply.
+ *
+ * NOTE: The transaction cleanup will be triggered from Front-end
+ * adapter.
+ */
+
+ txn_req->req.rpc->errstr =
+ XSTRDUP(MTYPE_MGMTD_ERR, "Operation on the backend timed-out");
+ txn_rpc_done(txn, txn_req);
+}
+
/*
* Send CFG_APPLY_REQs to all the backend client.
*
@@ -1390,24 +1500,6 @@ static void mgmt_txn_process_commit_cfg(struct event *thread)
*/
mgmt_txn_send_be_txn_create(txn);
break;
- case MGMTD_COMMIT_PHASE_SEND_CFG:
- if (mm->perf_stats_en)
- gettimeofday(&cmtcfg_req->cmt_stats->send_cfg_start,
- NULL);
- /*
- * All CFGDATA_CREATE_REQ should have been sent to
- * Backend by now.
- */
-#ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED
- __dbg("txn-id: %" PRIu64 " session-id: %" PRIu64
- " trigger sending CFG_VALIDATE_REQ to all backend clients",
- txn->txn_id, txn->session_id);
-#else /* ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED */
- __dbg("txn-id: %" PRIu64 " session-id: %" PRIu64
- " trigger sending CFG_APPLY_REQ to all backend clients",
- txn->txn_id, txn->session_id);
-#endif /* ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED */
- break;
case MGMTD_COMMIT_PHASE_APPLY_CFG:
if (mm->perf_stats_en)
gettimeofday(&cmtcfg_req->cmt_stats->apply_cfg_start,
@@ -1512,7 +1604,7 @@ static void mgmt_txn_send_getcfg_reply_data(struct mgmt_txn_req *txn_req,
case MGMTD_TXN_PROC_SETCFG:
case MGMTD_TXN_PROC_COMMITCFG:
case MGMTD_TXN_PROC_GETTREE:
- case MGMTD_TXN_GETTREE_TIMEOUT:
+ case MGMTD_TXN_PROC_RPC:
case MGMTD_TXN_COMMITCFG_TIMEOUT:
__log_err("Invalid Txn-Req-Event %u", txn_req->req_event);
break;
@@ -1718,6 +1810,7 @@ static struct mgmt_txn_ctx *mgmt_txn_create_new(uint64_t session_id,
mgmt_txn_reqs_init(&txn->set_cfg_reqs);
mgmt_txn_reqs_init(&txn->get_cfg_reqs);
mgmt_txn_reqs_init(&txn->get_tree_reqs);
+ mgmt_txn_reqs_init(&txn->rpc_reqs);
txn->commit_cfg_req = NULL;
txn->refcount = 0;
if (!mgmt_txn_mm->next_txn_id)
@@ -1886,12 +1979,8 @@ static void mgmt_txn_register_event(struct mgmt_txn_ctx *txn,
MGMTD_TXN_CFG_COMMIT_MAX_DELAY_SEC,
&txn->comm_cfg_timeout);
break;
- case MGMTD_TXN_GETTREE_TIMEOUT:
- event_add_timer(mgmt_txn_tm, txn_get_tree_timeout, txn,
- MGMTD_TXN_GET_TREE_MAX_DELAY_SEC,
- &txn->get_tree_timeout);
- break;
case MGMTD_TXN_PROC_GETTREE:
+ case MGMTD_TXN_PROC_RPC:
assert(!"code bug do not register this event");
break;
}
@@ -2044,7 +2133,7 @@ int mgmt_txn_send_commit_config_req(uint64_t txn_id, uint64_t req_id,
Mgmtd__DatastoreId dst_ds_id,
struct mgmt_ds_ctx *dst_ds_ctx,
bool validate_only, bool abort,
- bool implicit)
+ bool implicit, struct mgmt_edit_req *edit)
{
struct mgmt_txn_ctx *txn;
struct mgmt_txn_req *txn_req;
@@ -2068,6 +2157,7 @@ int mgmt_txn_send_commit_config_req(uint64_t txn_id, uint64_t req_id,
txn_req->req.commit_cfg.validate_only = validate_only;
txn_req->req.commit_cfg.abort = abort;
txn_req->req.commit_cfg.implicit = implicit;
+ txn_req->req.commit_cfg.edit = edit;
txn_req->req.commit_cfg.cmt_stats =
mgmt_fe_get_session_commit_stats(txn->session_id);
@@ -2451,6 +2541,110 @@ state:
return 0;
}
+int mgmt_txn_send_edit(uint64_t txn_id, uint64_t req_id,
+ Mgmtd__DatastoreId ds_id, struct mgmt_ds_ctx *ds_ctx,
+ Mgmtd__DatastoreId commit_ds_id,
+ struct mgmt_ds_ctx *commit_ds_ctx, bool unlock,
+ bool commit, LYD_FORMAT request_type, uint8_t flags,
+ uint8_t operation, const char *xpath, const char *data)
+{
+ struct mgmt_txn_ctx *txn;
+ struct mgmt_edit_req *edit;
+ struct nb_config *nb_config;
+ char errstr[BUFSIZ];
+ int ret;
+
+ txn = mgmt_txn_id2ctx(txn_id);
+ if (!txn)
+ return -1;
+
+ edit = XCALLOC(MTYPE_MGMTD_TXN_REQ, sizeof(struct mgmt_edit_req));
+
+ nb_config = mgmt_ds_get_nb_config(ds_ctx);
+ assert(nb_config);
+
+ ret = nb_candidate_edit_tree(nb_config, operation, request_type, xpath,
+ data, edit->xpath_created, errstr,
+ sizeof(errstr));
+ if (ret)
+ goto reply;
+
+ if (commit) {
+ edit->unlock = unlock;
+
+ mgmt_txn_send_commit_config_req(txn_id, req_id, ds_id, ds_ctx,
+ commit_ds_id, commit_ds_ctx,
+ false, false, true, edit);
+ return 0;
+ }
+reply:
+ mgmt_fe_adapter_send_edit_reply(txn->session_id, txn->txn_id, req_id,
+ unlock, commit, edit->xpath_created,
+ ret ? -1 : 0, errstr);
+
+ XFREE(MTYPE_MGMTD_TXN_REQ, edit);
+
+ return 0;
+}
+
+int mgmt_txn_send_rpc(uint64_t txn_id, uint64_t req_id, uint64_t clients,
+ LYD_FORMAT result_type, const char *xpath,
+ const char *data, size_t data_len)
+{
+ struct mgmt_txn_ctx *txn;
+ struct mgmt_txn_req *txn_req;
+ struct mgmt_msg_rpc *msg;
+ struct txn_req_rpc *rpc;
+ uint64_t id;
+ int ret;
+
+ txn = mgmt_txn_id2ctx(txn_id);
+ if (!txn)
+ return -1;
+
+ txn_req = mgmt_txn_req_alloc(txn, req_id, MGMTD_TXN_PROC_RPC);
+ rpc = txn_req->req.rpc;
+ rpc->xpath = XSTRDUP(MTYPE_MGMTD_XPATH, xpath);
+ rpc->result_type = result_type;
+
+ msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc, 0,
+ MTYPE_MSG_NATIVE_RPC);
+ msg->refer_id = txn_id;
+ msg->req_id = req_id;
+ msg->code = MGMT_MSG_CODE_RPC;
+ msg->request_type = result_type;
+
+ mgmt_msg_native_xpath_encode(msg, xpath);
+ if (data)
+ mgmt_msg_native_append(msg, data, data_len);
+
+ assert(clients);
+ FOREACH_BE_CLIENT_BITS (id, clients) {
+ ret = mgmt_be_send_native(id, msg);
+ if (ret) {
+ __log_err("Could not send rpc message to backend client %s",
+ mgmt_be_client_id2name(id));
+ continue;
+ }
+
+ __dbg("Sent rpc req to backend client %s",
+ mgmt_be_client_id2name(id));
+
+ /* record that we sent the request to the client */
+ rpc->sent_clients |= (1u << id);
+ }
+
+ mgmt_msg_native_free_msg(msg);
+
+ if (!rpc->sent_clients)
+ return txn_rpc_done(txn, txn_req);
+
+ event_add_timer(mgmt_txn_tm, txn_rpc_timeout, txn_req,
+ MGMTD_TXN_RPC_MAX_DELAY_SEC, &txn->rpc_timeout);
+
+ return 0;
+}
+
/*
* Error reply from the backend client.
*/
@@ -2461,6 +2655,7 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
enum mgmt_be_client_id id = adapter->id;
struct mgmt_txn_ctx *txn = mgmt_txn_id2ctx(txn_id);
struct txn_req_get_tree *get_tree;
+ struct txn_req_rpc *rpc;
struct mgmt_txn_req *txn_req;
if (!txn) {
@@ -2473,6 +2668,10 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
FOREACH_TXN_REQ_IN_LIST (&txn->get_tree_reqs, txn_req)
if (txn_req->req_id == req_id)
break;
+ if (!txn_req)
+ FOREACH_TXN_REQ_IN_LIST (&txn->rpc_reqs, txn_req)
+ if (txn_req->req_id == req_id)
+ break;
if (!txn_req) {
__log_err("Error reply from %s for txn-id %" PRIu64
" cannot find req_id %" PRIu64,
@@ -2493,13 +2692,23 @@ int mgmt_txn_notify_error(struct mgmt_be_client_adapter *adapter,
if (get_tree->recv_clients != get_tree->sent_clients)
return 0;
return txn_get_tree_data_done(txn, txn_req);
+ case MGMTD_TXN_PROC_RPC:
+ rpc = txn_req->req.rpc;
+ rpc->recv_clients |= (1u << id);
+ if (errstr) {
+ XFREE(MTYPE_MGMTD_ERR, rpc->errstr);
+ rpc->errstr = XSTRDUP(MTYPE_MGMTD_ERR, errstr);
+ }
+ /* check if done yet */
+ if (rpc->recv_clients != rpc->sent_clients)
+ return 0;
+ return txn_rpc_done(txn, txn_req);
/* non-native message events */
case MGMTD_TXN_PROC_SETCFG:
case MGMTD_TXN_PROC_COMMITCFG:
case MGMTD_TXN_PROC_GETCFG:
case MGMTD_TXN_COMMITCFG_TIMEOUT:
- case MGMTD_TXN_GETTREE_TIMEOUT:
default:
assert(!"non-native req event in native erorr path");
return -1;
@@ -2581,6 +2790,77 @@ int mgmt_txn_notify_tree_data_reply(struct mgmt_be_client_adapter *adapter,
return txn_get_tree_data_done(txn, txn_req);
}
+int mgmt_txn_notify_rpc_reply(struct mgmt_be_client_adapter *adapter,
+ struct mgmt_msg_rpc_reply *reply_msg,
+ size_t msg_len)
+{
+ uint64_t txn_id = reply_msg->refer_id;
+ uint64_t req_id = reply_msg->req_id;
+ enum mgmt_be_client_id id = adapter->id;
+ struct mgmt_txn_ctx *txn = mgmt_txn_id2ctx(txn_id);
+ struct mgmt_txn_req *txn_req;
+ struct txn_req_rpc *rpc;
+ struct lyd_node *tree;
+ size_t data_len = msg_len - sizeof(*reply_msg);
+ LY_ERR err = LY_SUCCESS;
+
+ if (!txn) {
+ __log_err("RPC reply from %s for a missing txn-id %" PRIu64,
+ adapter->name, txn_id);
+ return -1;
+ }
+
+ /* Find the request. */
+ FOREACH_TXN_REQ_IN_LIST (&txn->rpc_reqs, txn_req)
+ if (txn_req->req_id == req_id)
+ break;
+ if (!txn_req) {
+ __log_err("RPC reply from %s for txn-id %" PRIu64
+ " missing req_id %" PRIu64,
+ adapter->name, txn_id, req_id);
+ return -1;
+ }
+
+ rpc = txn_req->req.rpc;
+
+ tree = NULL;
+ if (data_len)
+ err = yang_parse_rpc(rpc->xpath, reply_msg->result_type,
+ reply_msg->data, true, &tree);
+ if (err) {
+ __log_err("RPC reply from %s for txn-id %" PRIu64
+ " req_id %" PRIu64 " error parsing result of type %u: %s",
+ adapter->name, txn_id, req_id, reply_msg->result_type,
+ ly_strerrcode(err));
+ }
+ if (!err && tree) {
+ if (!rpc->client_results)
+ rpc->client_results = tree;
+ else
+ err = lyd_merge_siblings(&rpc->client_results, tree,
+ LYD_MERGE_DESTRUCT);
+ if (err) {
+ __log_err("RPC reply from %s for txn-id %" PRIu64
+ " req_id %" PRIu64 " error merging result: %s",
+ adapter->name, txn_id, req_id,
+ ly_strerrcode(err));
+ }
+ }
+ if (err) {
+ XFREE(MTYPE_MGMTD_ERR, rpc->errstr);
+ rpc->errstr = XSTRDUP(MTYPE_MGMTD_ERR,
+ "Cannot parse result from the backend");
+ }
+
+ rpc->recv_clients |= (1u << id);
+
+ /* check if done yet */
+ if (rpc->recv_clients != rpc->sent_clients)
+ return 0;
+
+ return txn_rpc_done(txn, txn_req);
+}
+
void mgmt_txn_status_write(struct vty *vty)
{
struct mgmt_txn_ctx *txn;