diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-09 13:16:35 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-09 13:16:35 +0000 |
commit | e2bbf175a2184bd76f6c54ccf8456babeb1a46fc (patch) | |
tree | f0b76550d6e6f500ada964a3a4ee933a45e5a6f1 /mgmtd/mgmt_txn.c | |
parent | Initial commit. (diff) | |
download | frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.tar.xz frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.zip |
Adding upstream version 9.1.upstream/9.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'mgmtd/mgmt_txn.c')
-rw-r--r-- | mgmtd/mgmt_txn.c | 2644 |
1 files changed, 2644 insertions, 0 deletions
diff --git a/mgmtd/mgmt_txn.c b/mgmtd/mgmt_txn.c new file mode 100644 index 0000000..452f9c8 --- /dev/null +++ b/mgmtd/mgmt_txn.c @@ -0,0 +1,2644 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Transactions + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "hash.h" +#include "jhash.h" +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_memory.h" +#include "mgmtd/mgmt_txn.h" + +#define MGMTD_TXN_DBG(fmt, ...) \ + DEBUGD(&mgmt_debug_txn, "TXN: %s: " fmt, __func__, ##__VA_ARGS__) +#define MGMTD_TXN_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) + +#define MGMTD_TXN_LOCK(txn) mgmt_txn_lock(txn, __FILE__, __LINE__) +#define MGMTD_TXN_UNLOCK(txn) mgmt_txn_unlock(txn, __FILE__, __LINE__) + +enum mgmt_txn_event { + MGMTD_TXN_PROC_SETCFG = 1, + MGMTD_TXN_PROC_COMMITCFG, + MGMTD_TXN_PROC_GETCFG, + MGMTD_TXN_PROC_GETDATA, + MGMTD_TXN_COMMITCFG_TIMEOUT, + MGMTD_TXN_CLEANUP +}; + +PREDECL_LIST(mgmt_txn_reqs); + +struct mgmt_set_cfg_req { + Mgmtd__DatastoreId ds_id; + struct mgmt_ds_ctx *ds_ctx; + struct nb_cfg_change cfg_changes[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + uint16_t num_cfg_changes; + bool implicit_commit; + Mgmtd__DatastoreId dst_ds_id; + struct mgmt_ds_ctx *dst_ds_ctx; + struct mgmt_setcfg_stats *setcfg_stats; +}; + +enum mgmt_commit_phase { + MGMTD_COMMIT_PHASE_PREPARE_CFG = 0, + MGMTD_COMMIT_PHASE_TXN_CREATE, + MGMTD_COMMIT_PHASE_SEND_CFG, + MGMTD_COMMIT_PHASE_APPLY_CFG, + MGMTD_COMMIT_PHASE_TXN_DELETE, + MGMTD_COMMIT_PHASE_MAX +}; + +static inline const char *mgmt_commit_phase2str(enum mgmt_commit_phase cmt_phase) +{ + switch (cmt_phase) { + case MGMTD_COMMIT_PHASE_PREPARE_CFG: + return "PREP-CFG"; + case MGMTD_COMMIT_PHASE_TXN_CREATE: + return "CREATE-TXN"; + case MGMTD_COMMIT_PHASE_SEND_CFG: + return "SEND-CFG"; + case MGMTD_COMMIT_PHASE_APPLY_CFG: + return "APPLY-CFG"; + case MGMTD_COMMIT_PHASE_TXN_DELETE: + return "DELETE-TXN"; + case MGMTD_COMMIT_PHASE_MAX: + return "Invalid/Unknown"; + } + + return "Invalid/Unknown"; +} + +PREDECL_LIST(mgmt_txn_batches); + +struct mgmt_txn_be_cfg_batch { + struct mgmt_txn_ctx *txn; + uint64_t batch_id; + enum mgmt_be_client_id be_id; + struct mgmt_be_client_adapter *be_adapter; + uint xp_subscr[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + Mgmtd__YangCfgDataReq cfg_data[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + Mgmtd__YangCfgDataReq *cfg_datap[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + Mgmtd__YangData data[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + Mgmtd__YangDataValue value[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + size_t num_cfg_data; + int buf_space_left; + enum mgmt_commit_phase comm_phase; + struct mgmt_txn_batches_item list_linkage; +}; + +DECLARE_LIST(mgmt_txn_batches, struct mgmt_txn_be_cfg_batch, list_linkage); + +#define FOREACH_TXN_CFG_BATCH_IN_LIST(list, batch) \ + frr_each_safe (mgmt_txn_batches, list, batch) + +struct mgmt_commit_cfg_req { + Mgmtd__DatastoreId src_ds_id; + struct mgmt_ds_ctx *src_ds_ctx; + Mgmtd__DatastoreId dst_ds_id; + struct mgmt_ds_ctx *dst_ds_ctx; + uint32_t nb_txn_id; + uint8_t validate_only : 1; + uint8_t abort : 1; + uint8_t implicit : 1; + uint8_t rollback : 1; + + /* Track commit phases */ + enum mgmt_commit_phase curr_phase; + enum mgmt_commit_phase next_phase; + + /* + * Set of config changes to commit. This is used only + * when changes are NOT to be determined by comparing + * candidate and running DSs. This is typically used + * for downloading all relevant configs for a new backend + * client that has recently come up and connected with + * MGMTD. + */ + struct nb_config_cbs *cfg_chgs; + + /* + * Details on all the Backend Clients associated with + * this commit. + */ + struct mgmt_be_client_subscr_info subscr_info; + + /* + * List of backend batches for this commit to be validated + * and applied at the backend. + * + * FIXME: Need to re-think this design for the case set of + * validators for a given YANG data item is different from + * the set of notifiers for the same. We may need to have + * separate list of batches for VALIDATE and APPLY. + */ + struct mgmt_txn_batches_head curr_batches[MGMTD_BE_CLIENT_ID_MAX]; + struct mgmt_txn_batches_head next_batches[MGMTD_BE_CLIENT_ID_MAX]; + /* + * The last batch added for any backend client. This is always on + * 'curr_batches' + */ + struct mgmt_txn_be_cfg_batch *last_be_cfg_batch[MGMTD_BE_CLIENT_ID_MAX]; + struct hash *batches; + uint64_t next_batch_id; + + struct mgmt_commit_stats *cmt_stats; +}; + +struct mgmt_get_data_reply { + /* Buffer space for preparing data reply */ + int num_reply; + int last_batch; + Mgmtd__YangDataReply data_reply; + Mgmtd__YangData reply_data[MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH]; + Mgmtd__YangData *reply_datap[MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH]; + Mgmtd__YangDataValue reply_value[MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH]; + char *reply_xpathp[MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH]; +}; + +struct mgmt_get_data_req { + Mgmtd__DatastoreId ds_id; + struct nb_config *cfg_root; + char *xpaths[MGMTD_MAX_NUM_DATA_REQ_IN_BATCH]; + int num_xpaths; + + /* + * Buffer space for preparing reply. + * NOTE: Should only be malloc-ed on demand to reduce + * memory footprint. Freed up via mgmt_trx_req_free() + */ + struct mgmt_get_data_reply *reply; + + int total_reply; +}; + +struct mgmt_txn_req { + struct mgmt_txn_ctx *txn; + enum mgmt_txn_event req_event; + uint64_t req_id; + union { + struct mgmt_set_cfg_req *set_cfg; + struct mgmt_get_data_req *get_data; + struct mgmt_commit_cfg_req commit_cfg; + } req; + + bool pending_be_proc; + struct mgmt_txn_reqs_item list_linkage; +}; + +DECLARE_LIST(mgmt_txn_reqs, struct mgmt_txn_req, list_linkage); + +#define FOREACH_TXN_REQ_IN_LIST(list, req) \ + frr_each_safe (mgmt_txn_reqs, list, req) + +struct mgmt_txn_ctx { + uint64_t session_id; /* One transaction per client session */ + uint64_t txn_id; + enum mgmt_txn_type type; + + /* struct mgmt_master *mm; */ + + struct event *proc_set_cfg; + struct event *proc_comm_cfg; + struct event *proc_get_cfg; + struct event *proc_get_data; + struct event *comm_cfg_timeout; + struct event *clnup; + + /* List of backend adapters involved in this transaction */ + struct mgmt_txn_badapters_head be_adapters; + + int refcount; + + struct mgmt_txns_item list_linkage; + + /* + * List of pending set-config requests for a given + * transaction/session. Just one list for requests + * not processed at all. There's no backend interaction + * involved. + */ + struct mgmt_txn_reqs_head set_cfg_reqs; + /* + * List of pending get-config requests for a given + * transaction/session. Just one list for requests + * not processed at all. There's no backend interaction + * involved. + */ + struct mgmt_txn_reqs_head get_cfg_reqs; + /* + * List of pending get-data requests for a given + * transaction/session Two lists, one for requests + * not processed at all, and one for requests that + * has been sent to backend for processing. + */ + struct mgmt_txn_reqs_head get_data_reqs; + struct mgmt_txn_reqs_head pending_get_datas; + /* + * There will always be one commit-config allowed for a given + * transaction/session. No need to maintain lists for it. + */ + struct mgmt_txn_req *commit_cfg_req; +}; + +DECLARE_LIST(mgmt_txns, struct mgmt_txn_ctx, list_linkage); + +#define FOREACH_TXN_IN_LIST(mm, txn) \ + frr_each_safe (mgmt_txns, &(mm)->txn_list, (txn)) + +static int mgmt_txn_send_commit_cfg_reply(struct mgmt_txn_ctx *txn, + enum mgmt_result result, + const char *error_if_any); + +static inline const char *mgmt_txn_commit_phase_str(struct mgmt_txn_ctx *txn, + bool curr) +{ + if (!txn->commit_cfg_req) + return "None"; + + return (mgmt_commit_phase2str( + curr ? txn->commit_cfg_req->req.commit_cfg.curr_phase + : txn->commit_cfg_req->req.commit_cfg.next_phase)); +} + +static void mgmt_txn_lock(struct mgmt_txn_ctx *txn, const char *file, int line); +static void mgmt_txn_unlock(struct mgmt_txn_ctx **txn, const char *file, + int line); +static int mgmt_txn_send_be_txn_delete(struct mgmt_txn_ctx *txn, + struct mgmt_be_client_adapter *adapter); + +static struct event_loop *mgmt_txn_tm; +static struct mgmt_master *mgmt_txn_mm; + +static void mgmt_txn_register_event(struct mgmt_txn_ctx *txn, + enum mgmt_txn_event event); + +static int +mgmt_move_be_commit_to_next_phase(struct mgmt_txn_ctx *txn, + struct mgmt_be_client_adapter *adapter); + +static struct mgmt_txn_be_cfg_batch * +mgmt_txn_cfg_batch_alloc(struct mgmt_txn_ctx *txn, enum mgmt_be_client_id id, + struct mgmt_be_client_adapter *be_adapter) +{ + struct mgmt_txn_be_cfg_batch *batch; + + batch = XCALLOC(MTYPE_MGMTD_TXN_CFG_BATCH, + sizeof(struct mgmt_txn_be_cfg_batch)); + assert(batch); + batch->be_id = id; + + batch->txn = txn; + MGMTD_TXN_LOCK(txn); + assert(txn->commit_cfg_req); + mgmt_txn_batches_add_tail(&txn->commit_cfg_req->req.commit_cfg + .curr_batches[id], + batch); + batch->be_adapter = be_adapter; + batch->buf_space_left = MGMTD_BE_CFGDATA_MAX_MSG_LEN; + if (be_adapter) + mgmt_be_adapter_lock(be_adapter); + + txn->commit_cfg_req->req.commit_cfg.last_be_cfg_batch[id] = batch; + if (!txn->commit_cfg_req->req.commit_cfg.next_batch_id) + txn->commit_cfg_req->req.commit_cfg.next_batch_id++; + batch->batch_id = txn->commit_cfg_req->req.commit_cfg.next_batch_id++; + hash_get(txn->commit_cfg_req->req.commit_cfg.batches, batch, + hash_alloc_intern); + + return batch; +} + +static void mgmt_txn_cfg_batch_free(struct mgmt_txn_be_cfg_batch **batch) +{ + size_t indx; + struct mgmt_commit_cfg_req *cmtcfg_req; + + MGMTD_TXN_DBG(" freeing batch-id: %" PRIu64 " txn-id %" PRIu64, + (*batch)->batch_id, (*batch)->txn->txn_id); + + assert((*batch)->txn && (*batch)->txn->type == MGMTD_TXN_TYPE_CONFIG); + + cmtcfg_req = &(*batch)->txn->commit_cfg_req->req.commit_cfg; + hash_release(cmtcfg_req->batches, *batch); + mgmt_txn_batches_del(&cmtcfg_req->curr_batches[(*batch)->be_id], *batch); + mgmt_txn_batches_del(&cmtcfg_req->next_batches[(*batch)->be_id], *batch); + + if ((*batch)->be_adapter) + mgmt_be_adapter_unlock(&(*batch)->be_adapter); + + for (indx = 0; indx < (*batch)->num_cfg_data; indx++) { + if ((*batch)->data[indx].xpath) { + free((*batch)->data[indx].xpath); + (*batch)->data[indx].xpath = NULL; + } + } + + MGMTD_TXN_UNLOCK(&(*batch)->txn); + + XFREE(MTYPE_MGMTD_TXN_CFG_BATCH, *batch); + *batch = NULL; +} + +static unsigned int mgmt_txn_cfgbatch_hash_key(const void *data) +{ + const struct mgmt_txn_be_cfg_batch *batch = data; + + return jhash2((uint32_t *)&batch->batch_id, + sizeof(batch->batch_id) / sizeof(uint32_t), 0); +} + +static bool mgmt_txn_cfgbatch_hash_cmp(const void *d1, const void *d2) +{ + const struct mgmt_txn_be_cfg_batch *batch1 = d1; + const struct mgmt_txn_be_cfg_batch *batch2 = d2; + + return (batch1->batch_id == batch2->batch_id); +} + +static void mgmt_txn_cfgbatch_hash_free(void *data) +{ + struct mgmt_txn_be_cfg_batch *batch = data; + + mgmt_txn_cfg_batch_free(&batch); +} + +static inline struct mgmt_txn_be_cfg_batch * +mgmt_txn_cfgbatch_id2ctx(struct mgmt_txn_ctx *txn, uint64_t batch_id) +{ + struct mgmt_txn_be_cfg_batch key = { 0 }; + struct mgmt_txn_be_cfg_batch *batch; + + if (!txn->commit_cfg_req) + return NULL; + + key.batch_id = batch_id; + batch = hash_lookup(txn->commit_cfg_req->req.commit_cfg.batches, &key); + + return batch; +} + +static void mgmt_txn_cleanup_be_cfg_batches(struct mgmt_txn_ctx *txn, + enum mgmt_be_client_id id) +{ + struct mgmt_txn_be_cfg_batch *batch; + struct mgmt_txn_batches_head *list; + + list = &txn->commit_cfg_req->req.commit_cfg.curr_batches[id]; + FOREACH_TXN_CFG_BATCH_IN_LIST (list, batch) + mgmt_txn_cfg_batch_free(&batch); + + mgmt_txn_batches_fini(list); + + list = &txn->commit_cfg_req->req.commit_cfg.next_batches[id]; + FOREACH_TXN_CFG_BATCH_IN_LIST (list, batch) + mgmt_txn_cfg_batch_free(&batch); + + mgmt_txn_batches_fini(list); + + txn->commit_cfg_req->req.commit_cfg.last_be_cfg_batch[id] = NULL; +} + +static struct mgmt_txn_req *mgmt_txn_req_alloc(struct mgmt_txn_ctx *txn, + uint64_t req_id, + enum mgmt_txn_event req_event) +{ + struct mgmt_txn_req *txn_req; + enum mgmt_be_client_id id; + + txn_req = XCALLOC(MTYPE_MGMTD_TXN_REQ, sizeof(struct mgmt_txn_req)); + assert(txn_req); + txn_req->txn = txn; + txn_req->req_id = req_id; + txn_req->req_event = req_event; + txn_req->pending_be_proc = false; + + switch (txn_req->req_event) { + case MGMTD_TXN_PROC_SETCFG: + txn_req->req.set_cfg = XCALLOC(MTYPE_MGMTD_TXN_SETCFG_REQ, + sizeof(struct mgmt_set_cfg_req)); + assert(txn_req->req.set_cfg); + mgmt_txn_reqs_add_tail(&txn->set_cfg_reqs, txn_req); + MGMTD_TXN_DBG("Added a new SETCFG req-id: %" PRIu64 + " txn-id: %" PRIu64 ", session-id: %" PRIu64, + txn_req->req_id, txn->txn_id, txn->session_id); + break; + case MGMTD_TXN_PROC_COMMITCFG: + txn->commit_cfg_req = txn_req; + MGMTD_TXN_DBG("Added a new COMMITCFG req-id: %" PRIu64 + " txn-id: %" PRIu64 " session-id: %" PRIu64, + txn_req->req_id, txn->txn_id, txn->session_id); + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + mgmt_txn_batches_init( + &txn_req->req.commit_cfg.curr_batches[id]); + mgmt_txn_batches_init( + &txn_req->req.commit_cfg.next_batches[id]); + } + + txn_req->req.commit_cfg.batches = + hash_create(mgmt_txn_cfgbatch_hash_key, + mgmt_txn_cfgbatch_hash_cmp, + "MGMT Config Batches"); + break; + case MGMTD_TXN_PROC_GETCFG: + txn_req->req.get_data = + XCALLOC(MTYPE_MGMTD_TXN_GETDATA_REQ, + sizeof(struct mgmt_get_data_req)); + assert(txn_req->req.get_data); + mgmt_txn_reqs_add_tail(&txn->get_cfg_reqs, txn_req); + MGMTD_TXN_DBG("Added a new GETCFG req-id: %" PRIu64 + " txn-id: %" PRIu64 " session-id: %" PRIu64, + txn_req->req_id, txn->txn_id, txn->session_id); + break; + case MGMTD_TXN_PROC_GETDATA: + txn_req->req.get_data = + XCALLOC(MTYPE_MGMTD_TXN_GETDATA_REQ, + sizeof(struct mgmt_get_data_req)); + assert(txn_req->req.get_data); + mgmt_txn_reqs_add_tail(&txn->get_data_reqs, txn_req); + MGMTD_TXN_DBG("Added a new GETDATA req-id: %" PRIu64 + " txn-id: %" PRIu64 " session-id: %" PRIu64, + txn_req->req_id, txn->txn_id, txn->session_id); + break; + case MGMTD_TXN_COMMITCFG_TIMEOUT: + case MGMTD_TXN_CLEANUP: + break; + } + + MGMTD_TXN_LOCK(txn); + + return txn_req; +} + +static void mgmt_txn_req_free(struct mgmt_txn_req **txn_req) +{ + int indx; + struct mgmt_txn_reqs_head *req_list = NULL; + struct mgmt_txn_reqs_head *pending_list = NULL; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + struct mgmt_commit_cfg_req *ccreq; + bool cleanup; + + switch ((*txn_req)->req_event) { + case MGMTD_TXN_PROC_SETCFG: + for (indx = 0; indx < (*txn_req)->req.set_cfg->num_cfg_changes; + indx++) { + if ((*txn_req)->req.set_cfg->cfg_changes[indx].value) { + MGMTD_TXN_DBG("Freeing value for %s at %p ==> '%s'", + (*txn_req) + ->req.set_cfg + ->cfg_changes[indx] + .xpath, + (*txn_req) + ->req.set_cfg + ->cfg_changes[indx] + .value, + (*txn_req) + ->req.set_cfg + ->cfg_changes[indx] + .value); + free((void *)(*txn_req) + ->req.set_cfg->cfg_changes[indx] + .value); + } + } + req_list = &(*txn_req)->txn->set_cfg_reqs; + MGMTD_TXN_DBG("Deleting SETCFG req-id: %" PRIu64 + " txn-id: %" PRIu64, + (*txn_req)->req_id, (*txn_req)->txn->txn_id); + XFREE(MTYPE_MGMTD_TXN_SETCFG_REQ, (*txn_req)->req.set_cfg); + break; + case MGMTD_TXN_PROC_COMMITCFG: + MGMTD_TXN_DBG("Deleting COMMITCFG req-id: %" PRIu64 + " txn-id: %" PRIu64, + (*txn_req)->req_id, (*txn_req)->txn->txn_id); + + ccreq = &(*txn_req)->req.commit_cfg; + cleanup = (ccreq->curr_phase >= MGMTD_COMMIT_PHASE_TXN_CREATE && + ccreq->curr_phase < MGMTD_COMMIT_PHASE_TXN_DELETE); + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + /* + * Send TXN_DELETE to cleanup state for this + * transaction on backend + */ + + /* + * Get rid of the batches first so we don't end up doing + * anything more with them + */ + mgmt_txn_cleanup_be_cfg_batches((*txn_req)->txn, id); + if (ccreq->batches) { + hash_clean(ccreq->batches, + mgmt_txn_cfgbatch_hash_free); + hash_free(ccreq->batches); + ccreq->batches = NULL; + } + + /* + * If we were in the middle of the state machine then + * send a txn delete message + */ + adapter = mgmt_be_get_adapter_by_id(id); + if (adapter && cleanup && + ccreq->subscr_info.xpath_subscr[id]) + mgmt_txn_send_be_txn_delete((*txn_req)->txn, + adapter); + } + break; + case MGMTD_TXN_PROC_GETCFG: + for (indx = 0; indx < (*txn_req)->req.get_data->num_xpaths; + indx++) { + if ((*txn_req)->req.get_data->xpaths[indx]) + free((void *)(*txn_req) + ->req.get_data->xpaths[indx]); + } + req_list = &(*txn_req)->txn->get_cfg_reqs; + MGMTD_TXN_DBG("Deleting GETCFG req-id: %" PRIu64 + " txn-id: %" PRIu64, + (*txn_req)->req_id, (*txn_req)->txn->txn_id); + if ((*txn_req)->req.get_data->reply) + XFREE(MTYPE_MGMTD_TXN_GETDATA_REPLY, + (*txn_req)->req.get_data->reply); + + if ((*txn_req)->req.get_data->cfg_root) + nb_config_free((*txn_req)->req.get_data->cfg_root); + + XFREE(MTYPE_MGMTD_TXN_GETDATA_REQ, (*txn_req)->req.get_data); + break; + case MGMTD_TXN_PROC_GETDATA: + for (indx = 0; indx < (*txn_req)->req.get_data->num_xpaths; + indx++) { + if ((*txn_req)->req.get_data->xpaths[indx]) + free((void *)(*txn_req) + ->req.get_data->xpaths[indx]); + } + pending_list = &(*txn_req)->txn->pending_get_datas; + req_list = &(*txn_req)->txn->get_data_reqs; + MGMTD_TXN_DBG("Deleting GETDATA req-id: %" PRIu64 + " txn-id: %" PRIu64, + (*txn_req)->req_id, (*txn_req)->txn->txn_id); + if ((*txn_req)->req.get_data->reply) + XFREE(MTYPE_MGMTD_TXN_GETDATA_REPLY, + (*txn_req)->req.get_data->reply); + XFREE(MTYPE_MGMTD_TXN_GETDATA_REQ, (*txn_req)->req.get_data); + break; + case MGMTD_TXN_COMMITCFG_TIMEOUT: + case MGMTD_TXN_CLEANUP: + break; + } + + if ((*txn_req)->pending_be_proc && pending_list) { + mgmt_txn_reqs_del(pending_list, *txn_req); + MGMTD_TXN_DBG("Removed req-id: %" PRIu64 + " from pending-list (left:%zu)", + (*txn_req)->req_id, + mgmt_txn_reqs_count(pending_list)); + } else if (req_list) { + mgmt_txn_reqs_del(req_list, *txn_req); + MGMTD_TXN_DBG("Removed req-id: %" PRIu64 + " from request-list (left:%zu)", + (*txn_req)->req_id, mgmt_txn_reqs_count(req_list)); + } + + (*txn_req)->pending_be_proc = false; + MGMTD_TXN_UNLOCK(&(*txn_req)->txn); + XFREE(MTYPE_MGMTD_TXN_REQ, (*txn_req)); + *txn_req = NULL; +} + +static void mgmt_txn_process_set_cfg(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + struct mgmt_ds_ctx *ds_ctx; + struct nb_config *nb_config; + char err_buf[1024]; + bool error; + int num_processed = 0; + size_t left; + struct mgmt_commit_stats *cmt_stats; + int ret = 0; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + cmt_stats = mgmt_fe_get_session_commit_stats(txn->session_id); + + MGMTD_TXN_DBG("Processing %zu SET_CONFIG requests txn-id:%" PRIu64 + " session-id: %" PRIu64, + mgmt_txn_reqs_count(&txn->set_cfg_reqs), txn->txn_id, + txn->session_id); + + FOREACH_TXN_REQ_IN_LIST (&txn->set_cfg_reqs, txn_req) { + assert(txn_req->req_event == MGMTD_TXN_PROC_SETCFG); + ds_ctx = txn_req->req.set_cfg->ds_ctx; + if (!ds_ctx) { + mgmt_fe_send_set_cfg_reply(txn->session_id, txn->txn_id, + txn_req->req.set_cfg->ds_id, + txn_req->req_id, + MGMTD_INTERNAL_ERROR, + "No such datastore!", + txn_req->req.set_cfg + ->implicit_commit); + goto mgmt_txn_process_set_cfg_done; + } + + nb_config = mgmt_ds_get_nb_config(ds_ctx); + if (!nb_config) { + mgmt_fe_send_set_cfg_reply(txn->session_id, txn->txn_id, + txn_req->req.set_cfg->ds_id, + txn_req->req_id, + MGMTD_INTERNAL_ERROR, + "Unable to retrieve DS Config Tree!", + txn_req->req.set_cfg + ->implicit_commit); + goto mgmt_txn_process_set_cfg_done; + } + + error = false; + nb_candidate_edit_config_changes(nb_config, + txn_req->req.set_cfg->cfg_changes, + (size_t)txn_req->req.set_cfg + ->num_cfg_changes, + NULL, NULL, 0, err_buf, + sizeof(err_buf), &error); + if (error) { + mgmt_fe_send_set_cfg_reply(txn->session_id, txn->txn_id, + txn_req->req.set_cfg->ds_id, + txn_req->req_id, + MGMTD_INTERNAL_ERROR, err_buf, + txn_req->req.set_cfg + ->implicit_commit); + goto mgmt_txn_process_set_cfg_done; + } + + if (txn_req->req.set_cfg->implicit_commit) { + assert(mgmt_txn_reqs_count(&txn->set_cfg_reqs) == 1); + assert(txn_req->req.set_cfg->dst_ds_ctx); + + /* We expect the user to have locked the DST DS */ + if (!mgmt_ds_is_locked(txn_req->req.set_cfg->dst_ds_ctx, + txn->session_id)) { + MGMTD_TXN_ERR("DS %u not locked for implicit commit txn-id: %" PRIu64 + " session-id: %" PRIu64 " err: %s", + txn_req->req.set_cfg->dst_ds_id, + txn->txn_id, txn->session_id, + strerror(ret)); + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_DS_LOCK_FAILED, + "running DS not locked for implicit commit"); + goto mgmt_txn_process_set_cfg_done; + } + + mgmt_txn_send_commit_config_req(txn->txn_id, + txn_req->req_id, + txn_req->req.set_cfg + ->ds_id, + txn_req->req.set_cfg + ->ds_ctx, + txn_req->req.set_cfg + ->dst_ds_id, + txn_req->req.set_cfg + ->dst_ds_ctx, + false, false, true); + + if (mm->perf_stats_en) + gettimeofday(&cmt_stats->last_start, NULL); + cmt_stats->commit_cnt++; + } else if (mgmt_fe_send_set_cfg_reply(txn->session_id, + txn->txn_id, + txn_req->req.set_cfg->ds_id, + txn_req->req_id, + MGMTD_SUCCESS, NULL, + false) != 0) { + MGMTD_TXN_ERR("Failed to send SET_CONFIG_REPLY txn-id %" PRIu64 + " session-id: %" PRIu64, + txn->txn_id, txn->session_id); + } + +mgmt_txn_process_set_cfg_done: + + /* + * Note: The following will remove it from the list as well. + */ + mgmt_txn_req_free(&txn_req); + + num_processed++; + if (num_processed == MGMTD_TXN_MAX_NUM_SETCFG_PROC) + break; + } + + left = mgmt_txn_reqs_count(&txn->set_cfg_reqs); + if (left) { + MGMTD_TXN_DBG("Processed maximum number of Set-Config requests (%d/%d/%d). Rescheduling for rest.", + num_processed, MGMTD_TXN_MAX_NUM_SETCFG_PROC, + (int)left); + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_SETCFG); + } +} + +static int mgmt_txn_send_commit_cfg_reply(struct mgmt_txn_ctx *txn, + enum mgmt_result result, + const char *error_if_any) +{ + bool success, create_cmt_info_rec; + + if (!txn->commit_cfg_req) + return -1; + + success = (result == MGMTD_SUCCESS || result == MGMTD_NO_CFG_CHANGES); + + /* TODO: these replies should not be send if it's a rollback + * b/c right now that is special cased.. that special casing should be + * removed; however... + */ + if (!txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id && + !txn->commit_cfg_req->req.commit_cfg.rollback && + mgmt_fe_send_commit_cfg_reply(txn->session_id, txn->txn_id, + txn->commit_cfg_req->req.commit_cfg + .src_ds_id, + txn->commit_cfg_req->req.commit_cfg + .dst_ds_id, + txn->commit_cfg_req->req_id, + txn->commit_cfg_req->req.commit_cfg + .validate_only, + result, error_if_any) != 0) { + MGMTD_TXN_ERR("Failed to send COMMIT-CONFIG-REPLY txn-id: %" PRIu64 + " session-id: %" PRIu64, + txn->txn_id, txn->session_id); + } + + if (txn->commit_cfg_req->req.commit_cfg.implicit && txn->session_id && + !txn->commit_cfg_req->req.commit_cfg.rollback && + mgmt_fe_send_set_cfg_reply(txn->session_id, txn->txn_id, + txn->commit_cfg_req->req.commit_cfg + .src_ds_id, + txn->commit_cfg_req->req_id, + success ? MGMTD_SUCCESS + : MGMTD_INTERNAL_ERROR, + error_if_any, true) != 0) { + MGMTD_TXN_ERR("Failed to send SET-CONFIG-REPLY txn-id: %" PRIu64 + " session-id: %" PRIu64, + txn->txn_id, txn->session_id); + } + + if (success) { + /* Stop the commit-timeout timer */ + /* XXX why only on success? */ + EVENT_OFF(txn->comm_cfg_timeout); + + create_cmt_info_rec = + (result != MGMTD_NO_CFG_CHANGES && + !txn->commit_cfg_req->req.commit_cfg.rollback); + + /* + * Successful commit: Merge Src DS into Dst DS if and only if + * this was not a validate-only or abort request. + */ + if ((txn->session_id && + !txn->commit_cfg_req->req.commit_cfg.validate_only && + !txn->commit_cfg_req->req.commit_cfg.abort) || + txn->commit_cfg_req->req.commit_cfg.rollback) { + mgmt_ds_copy_dss(txn->commit_cfg_req->req.commit_cfg + .src_ds_ctx, + txn->commit_cfg_req->req.commit_cfg + .dst_ds_ctx, + create_cmt_info_rec); + } + + /* + * Restore Src DS back to Dest DS only through a commit abort + * request. + */ + if (txn->session_id && txn->commit_cfg_req->req.commit_cfg.abort) + mgmt_ds_copy_dss(txn->commit_cfg_req->req.commit_cfg + .dst_ds_ctx, + txn->commit_cfg_req->req.commit_cfg + .src_ds_ctx, + false); + } else { + /* + * The commit has failied. For implicit commit requests restore + * back the contents of the candidate DS. + */ + if (txn->commit_cfg_req->req.commit_cfg.implicit) + mgmt_ds_copy_dss(txn->commit_cfg_req->req.commit_cfg + .dst_ds_ctx, + txn->commit_cfg_req->req.commit_cfg + .src_ds_ctx, + false); + } + + if (txn->commit_cfg_req->req.commit_cfg.rollback) { + mgmt_ds_unlock(txn->commit_cfg_req->req.commit_cfg.src_ds_ctx); + mgmt_ds_unlock(txn->commit_cfg_req->req.commit_cfg.dst_ds_ctx); + /* + * Resume processing the rollback command. + * + * TODO: there's no good reason to special case rollback, the + * rollback boolean should be passed back to the FE client and it + * can do the right thing. + */ + mgmt_history_rollback_complete(success); + } + + txn->commit_cfg_req->req.commit_cfg.cmt_stats = NULL; + mgmt_txn_req_free(&txn->commit_cfg_req); + + /* + * The CONFIG Transaction should be destroyed from Frontend-adapter. + * But in case the transaction is not triggered from a front-end session + * we need to cleanup by itself. + */ + if (!txn->session_id) + mgmt_txn_register_event(txn, MGMTD_TXN_CLEANUP); + + return 0; +} + +static void +mgmt_move_txn_cfg_batch_to_next(struct mgmt_commit_cfg_req *cmtcfg_req, + struct mgmt_txn_be_cfg_batch *batch, + struct mgmt_txn_batches_head *src_list, + struct mgmt_txn_batches_head *dst_list, + bool update_commit_phase, + enum mgmt_commit_phase to_phase) +{ + mgmt_txn_batches_del(src_list, batch); + + if (update_commit_phase) { + MGMTD_TXN_DBG("Move txn-id %" PRIu64 " batch-id: %" PRIu64 + " from '%s' --> '%s'", + batch->txn->txn_id, batch->batch_id, + mgmt_commit_phase2str(batch->comm_phase), + mgmt_txn_commit_phase_str(batch->txn, false)); + batch->comm_phase = to_phase; + } + + mgmt_txn_batches_add_tail(dst_list, batch); +} + +static void mgmt_move_txn_cfg_batches(struct mgmt_txn_ctx *txn, + struct mgmt_commit_cfg_req *cmtcfg_req, + struct mgmt_txn_batches_head *src_list, + struct mgmt_txn_batches_head *dst_list, + bool update_commit_phase, + enum mgmt_commit_phase to_phase) +{ + struct mgmt_txn_be_cfg_batch *batch; + + FOREACH_TXN_CFG_BATCH_IN_LIST (src_list, batch) { + mgmt_move_txn_cfg_batch_to_next(cmtcfg_req, batch, src_list, + dst_list, update_commit_phase, + to_phase); + } +} + +static int +mgmt_try_move_commit_to_next_phase(struct mgmt_txn_ctx *txn, + struct mgmt_commit_cfg_req *cmtcfg_req) +{ + struct mgmt_txn_batches_head *curr_list, *next_list; + enum mgmt_be_client_id id; + + MGMTD_TXN_DBG("txn-id: %" PRIu64 ", Phase(current:'%s' next:'%s')", + txn->txn_id, mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + /* + * Check if all clients has moved to next phase or not. + */ + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (cmtcfg_req->subscr_info.xpath_subscr[id] && + mgmt_txn_batches_count(&cmtcfg_req->curr_batches[id])) { + /* + * There's atleast once client who hasn't moved to + * next phase. + * + * TODO: Need to re-think this design for the case + * set of validators for a given YANG data item is + * different from the set of notifiers for the same. + */ + return -1; + } + } + + MGMTD_TXN_DBG("Move entire txn-id: %" PRIu64 " from '%s' to '%s'", + txn->txn_id, mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + /* + * If we are here, it means all the clients has moved to next phase. + * So we can move the whole commit to next phase. + */ + cmtcfg_req->curr_phase = cmtcfg_req->next_phase; + cmtcfg_req->next_phase++; + MGMTD_TXN_DBG("Move back all config batches for txn-id: %" PRIu64 + " from next to current branch", + txn->txn_id); + FOREACH_MGMTD_BE_CLIENT_ID (id) { + curr_list = &cmtcfg_req->curr_batches[id]; + next_list = &cmtcfg_req->next_batches[id]; + mgmt_move_txn_cfg_batches(txn, cmtcfg_req, next_list, curr_list, + false, 0); + } + + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_COMMITCFG); + + return 0; +} + +static int +mgmt_move_be_commit_to_next_phase(struct mgmt_txn_ctx *txn, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_commit_cfg_req *cmtcfg_req; + struct mgmt_txn_batches_head *curr_list, *next_list; + + if (txn->type != MGMTD_TXN_TYPE_CONFIG || !txn->commit_cfg_req) + return -1; + + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + + MGMTD_TXN_DBG("Move txn-id: %" PRIu64 + " for '%s' Phase(current: '%s' next:'%s')", + txn->txn_id, adapter->name, + mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + MGMTD_TXN_DBG("Move all config batches for '%s' from current to next list", + adapter->name); + curr_list = &cmtcfg_req->curr_batches[adapter->id]; + next_list = &cmtcfg_req->next_batches[adapter->id]; + mgmt_move_txn_cfg_batches(txn, cmtcfg_req, curr_list, next_list, true, + cmtcfg_req->next_phase); + + MGMTD_TXN_DBG("txn-id: %" PRIu64 ", Phase(current:'%s' next:'%s')", + txn->txn_id, mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + /* + * Check if all clients has moved to next phase or not. + */ + mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); + + return 0; +} + +static int mgmt_txn_create_config_batches(struct mgmt_txn_req *txn_req, + struct nb_config_cbs *changes) +{ + struct nb_config_cb *cb, *nxt; + struct nb_config_change *chg; + struct mgmt_txn_be_cfg_batch *batch; + struct mgmt_be_client_subscr_info subscr_info; + char *xpath = NULL, *value = NULL; + char err_buf[1024]; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + struct mgmt_commit_cfg_req *cmtcfg_req; + bool found_validator; + int num_chgs = 0; + int xpath_len, value_len; + + cmtcfg_req = &txn_req->req.commit_cfg; + + RB_FOREACH_SAFE (cb, nb_config_cbs, changes, nxt) { + chg = (struct nb_config_change *)cb; + + /* + * Could have directly pointed to xpath in nb_node. + * But dont want to mess with it now. + * xpath = chg->cb.nb_node->xpath; + */ + xpath = lyd_path(chg->cb.dnode, LYD_PATH_STD, NULL, 0); + if (!xpath) { + (void)mgmt_txn_send_commit_cfg_reply( + txn_req->txn, MGMTD_INTERNAL_ERROR, + "Internal error! Could not get Xpath from Ds node!"); + return -1; + } + + value = (char *)lyd_get_value(chg->cb.dnode); + if (!value) + value = (char *)MGMTD_BE_CONTAINER_NODE_VAL; + + MGMTD_TXN_DBG("XPATH: %s, Value: '%s'", xpath, + value ? value : "NIL"); + + mgmt_be_get_subscr_info_for_xpath(xpath, &subscr_info); + + xpath_len = strlen(xpath) + 1; + value_len = strlen(value) + 1; + found_validator = false; + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (!(subscr_info.xpath_subscr[id] & + (MGMT_SUBSCR_VALIDATE_CFG | + MGMT_SUBSCR_NOTIFY_CFG))) + continue; + + adapter = mgmt_be_get_adapter_by_id(id); + if (!adapter) + continue; + + batch = cmtcfg_req->last_be_cfg_batch[id]; + if (!batch || + (batch->num_cfg_data == + MGMTD_MAX_CFG_CHANGES_IN_BATCH) || + (batch->buf_space_left < (xpath_len + value_len))) { + /* Allocate a new config batch */ + batch = mgmt_txn_cfg_batch_alloc(txn_req->txn, + id, adapter); + } + + batch->buf_space_left -= (xpath_len + value_len); + memcpy(&batch->xp_subscr[batch->num_cfg_data], + &subscr_info.xpath_subscr[id], + sizeof(batch->xp_subscr[0])); + + mgmt_yang_cfg_data_req_init( + &batch->cfg_data[batch->num_cfg_data]); + batch->cfg_datap[batch->num_cfg_data] = + &batch->cfg_data[batch->num_cfg_data]; + + if (chg->cb.operation == NB_OP_DESTROY) + batch->cfg_data[batch->num_cfg_data].req_type = + MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA; + else + batch->cfg_data[batch->num_cfg_data].req_type = + MGMTD__CFG_DATA_REQ_TYPE__SET_DATA; + + mgmt_yang_data_init(&batch->data[batch->num_cfg_data]); + batch->cfg_data[batch->num_cfg_data].data = + &batch->data[batch->num_cfg_data]; + batch->data[batch->num_cfg_data].xpath = strdup(xpath); + + mgmt_yang_data_value_init( + &batch->value[batch->num_cfg_data]); + batch->data[batch->num_cfg_data].value = + &batch->value[batch->num_cfg_data]; + batch->value[batch->num_cfg_data].value_case = + MGMTD__YANG_DATA_VALUE__VALUE_ENCODED_STR_VAL; + batch->value[batch->num_cfg_data].encoded_str_val = + value; + value = NULL; + + if (subscr_info.xpath_subscr[id] & + MGMT_SUBSCR_VALIDATE_CFG) + found_validator = true; + + cmtcfg_req->subscr_info.xpath_subscr[id] |= + subscr_info.xpath_subscr[id]; + MGMTD_TXN_DBG(" -- %s, batch-id: %" PRIu64 " item:%d", + adapter->name, batch->batch_id, + (int)batch->num_cfg_data); + + batch->num_cfg_data++; + num_chgs++; + } + + if (!found_validator) { + snprintf(err_buf, sizeof(err_buf), + "No validator module found for XPATH: '%s", + xpath); + MGMTD_TXN_ERR("***** %s", err_buf); + } + + free(xpath); + } + + cmtcfg_req->cmt_stats->last_batch_cnt = num_chgs; + if (!num_chgs) { + (void)mgmt_txn_send_commit_cfg_reply(txn_req->txn, + MGMTD_NO_CFG_CHANGES, + "No changes found to commit!"); + return -1; + } + + cmtcfg_req->next_phase = MGMTD_COMMIT_PHASE_TXN_CREATE; + return 0; +} + +static int mgmt_txn_prepare_config(struct mgmt_txn_ctx *txn) +{ + struct nb_context nb_ctx; + struct nb_config *nb_config; + struct nb_config_cbs changes; + struct nb_config_cbs *cfg_chgs = NULL; + int ret; + bool del_cfg_chgs = false; + + ret = 0; + memset(&nb_ctx, 0, sizeof(nb_ctx)); + memset(&changes, 0, sizeof(changes)); + if (txn->commit_cfg_req->req.commit_cfg.cfg_chgs) { + cfg_chgs = txn->commit_cfg_req->req.commit_cfg.cfg_chgs; + del_cfg_chgs = true; + goto mgmt_txn_prep_config_validation_done; + } + + if (txn->commit_cfg_req->req.commit_cfg.src_ds_id != MGMTD_DS_CANDIDATE) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INVALID_PARAM, + "Source DS cannot be any other than CANDIDATE!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + if (txn->commit_cfg_req->req.commit_cfg.dst_ds_id != MGMTD_DS_RUNNING) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INVALID_PARAM, + "Destination DS cannot be any other than RUNNING!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + if (!txn->commit_cfg_req->req.commit_cfg.src_ds_ctx) { + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_INVALID_PARAM, + "No such source datastore!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + if (!txn->commit_cfg_req->req.commit_cfg.dst_ds_ctx) { + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_INVALID_PARAM, + "No such destination datastore!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + if (txn->commit_cfg_req->req.commit_cfg.abort) { + /* + * This is a commit abort request. Return back success. + * That should trigger a restore of Candidate datastore to + * Running. + */ + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_SUCCESS, NULL); + goto mgmt_txn_prepare_config_done; + } + + nb_config = mgmt_ds_get_nb_config( + txn->commit_cfg_req->req.commit_cfg.src_ds_ctx); + if (!nb_config) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Unable to retrieve Commit DS Config Tree!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + /* + * Check for diffs from scratch buffer. If found empty + * get the diff from Candidate DS itself. + */ + cfg_chgs = &nb_config->cfg_chgs; + if (RB_EMPTY(nb_config_cbs, cfg_chgs)) { + /* + * This could be the case when the config is directly + * loaded onto the candidate DS from a file. Get the + * diff from a full comparison of the candidate and + * running DSs. + */ + nb_config_diff(mgmt_ds_get_nb_config( + txn->commit_cfg_req->req.commit_cfg + .dst_ds_ctx), + nb_config, &changes); + cfg_chgs = &changes; + del_cfg_chgs = true; + } + + if (RB_EMPTY(nb_config_cbs, cfg_chgs)) { + /* + * This means there's no changes to commit whatsoever + * is the source of the changes in config. + */ + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_NO_CFG_CHANGES, + "No changes found to be committed!"); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + +#ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED + if (mm->perf_stats_en) + gettimeofday(&txn->commit_cfg_req->req.commit_cfg.cmt_stats + ->validate_start, + NULL); + /* + * Validate YANG contents of the source DS and get the diff + * between source and destination DS contents. + */ + char err_buf[1024] = { 0 }; + nb_ctx.client = NB_CLIENT_MGMTD_SERVER; + nb_ctx.user = (void *)txn; + + ret = nb_candidate_validate_yang(nb_config, true, err_buf, + sizeof(err_buf) - 1); + if (ret != NB_OK) { + if (strncmp(err_buf, " ", strlen(err_buf)) == 0) + strlcpy(err_buf, "Validation failed", sizeof(err_buf)); + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_INVALID_PARAM, + err_buf); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + /* + * Perform application level validations locally on the MGMTD + * process by calling application specific validation routines + * loaded onto MGMTD process using libraries. + */ + ret = nb_candidate_validate_code(&nb_ctx, nb_config, &changes, err_buf, + sizeof(err_buf) - 1); + if (ret != NB_OK) { + if (strncmp(err_buf, " ", strlen(err_buf)) == 0) + strlcpy(err_buf, "Validation failed", sizeof(err_buf)); + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_INVALID_PARAM, + err_buf); + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + if (txn->commit_cfg_req->req.commit_cfg.validate_only) { + /* + * This was a validate-only COMMIT request return success. + */ + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_SUCCESS, NULL); + goto mgmt_txn_prepare_config_done; + } +#endif /* ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED */ + +mgmt_txn_prep_config_validation_done: + + if (mm->perf_stats_en) + gettimeofday(&txn->commit_cfg_req->req.commit_cfg.cmt_stats + ->prep_cfg_start, + NULL); + + /* + * Iterate over the diffs and create ordered batches of config + * commands to be validated. + */ + ret = mgmt_txn_create_config_batches(txn->commit_cfg_req, cfg_chgs); + if (ret != 0) { + ret = -1; + goto mgmt_txn_prepare_config_done; + } + + /* Move to the Transaction Create Phase */ + txn->commit_cfg_req->req.commit_cfg.curr_phase = + MGMTD_COMMIT_PHASE_TXN_CREATE; + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_COMMITCFG); + + /* + * Start the COMMIT Timeout Timer to abort Txn if things get stuck at + * backend. + */ + mgmt_txn_register_event(txn, MGMTD_TXN_COMMITCFG_TIMEOUT); +mgmt_txn_prepare_config_done: + + if (cfg_chgs && del_cfg_chgs) + nb_config_diff_del_changes(cfg_chgs); + + return ret; +} + +static int mgmt_txn_send_be_txn_create(struct mgmt_txn_ctx *txn) +{ + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + struct mgmt_commit_cfg_req *cmtcfg_req; + struct mgmt_txn_be_cfg_batch *batch; + + assert(txn->type == MGMTD_TXN_TYPE_CONFIG && txn->commit_cfg_req); + + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (cmtcfg_req->subscr_info.xpath_subscr[id]) { + adapter = mgmt_be_get_adapter_by_id(id); + if (mgmt_be_send_txn_req(adapter, txn->txn_id, true)) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Could not send TXN_CREATE to backend adapter"); + return -1; + } + + FOREACH_TXN_CFG_BATCH_IN_LIST (&txn->commit_cfg_req->req + .commit_cfg + .curr_batches[id], + batch) + batch->comm_phase = + MGMTD_COMMIT_PHASE_TXN_CREATE; + } + } + + txn->commit_cfg_req->req.commit_cfg.next_phase = + MGMTD_COMMIT_PHASE_SEND_CFG; + + /* + * Dont move the commit to next phase yet. Wait for the TXN_REPLY to + * come back. + */ + + MGMTD_TXN_DBG("txn-id: %" PRIu64 " session-id: %" PRIu64 + " Phase(Current:'%s', Next: '%s')", + txn->txn_id, txn->session_id, + mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + return 0; +} + +static int mgmt_txn_send_be_cfg_data(struct mgmt_txn_ctx *txn, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_commit_cfg_req *cmtcfg_req; + struct mgmt_txn_be_cfg_batch *batch; + struct mgmt_be_cfgreq cfg_req = { 0 }; + size_t num_batches, indx; + + assert(txn->type == MGMTD_TXN_TYPE_CONFIG && txn->commit_cfg_req); + + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + assert(cmtcfg_req->subscr_info.xpath_subscr[adapter->id]); + + indx = 0; + num_batches = + mgmt_txn_batches_count(&cmtcfg_req->curr_batches[adapter->id]); + FOREACH_TXN_CFG_BATCH_IN_LIST (&cmtcfg_req->curr_batches[adapter->id], + batch) { + assert(cmtcfg_req->next_phase == MGMTD_COMMIT_PHASE_SEND_CFG); + + cfg_req.cfgdata_reqs = batch->cfg_datap; + cfg_req.num_reqs = batch->num_cfg_data; + indx++; + if (mgmt_be_send_cfgdata_req(adapter, txn->txn_id, + batch->batch_id, + cfg_req.cfgdata_reqs, + cfg_req.num_reqs, + indx == num_batches)) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Internal Error! Could not send config data to backend!"); + MGMTD_TXN_ERR("Could not send CFGDATA_CREATE txn-id: %" PRIu64 + " batch-id: %" PRIu64 " to client '%s", + txn->txn_id, batch->batch_id, + adapter->name); + return -1; + } + + cmtcfg_req->cmt_stats->last_num_cfgdata_reqs++; + mgmt_move_txn_cfg_batch_to_next( + cmtcfg_req, batch, + &cmtcfg_req->curr_batches[adapter->id], + &cmtcfg_req->next_batches[adapter->id], true, + MGMTD_COMMIT_PHASE_SEND_CFG); + } + + /* + * This could be the last Backend Client to send CFGDATA_CREATE_REQ to. + * Try moving the commit to next phase. + */ + mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); + + return 0; +} + +static int mgmt_txn_send_be_txn_delete(struct mgmt_txn_ctx *txn, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_commit_cfg_req *cmtcfg_req = + &txn->commit_cfg_req->req.commit_cfg; + + assert(txn->type == MGMTD_TXN_TYPE_CONFIG); + assert(!mgmt_txn_batches_count(&cmtcfg_req->curr_batches[adapter->id])); + + if (!cmtcfg_req->subscr_info.xpath_subscr[adapter->id]) + return 0; + + return mgmt_be_send_txn_req(adapter, txn->txn_id, false); +} + +static void mgmt_txn_cfg_commit_timedout(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + + assert(txn->type == MGMTD_TXN_TYPE_CONFIG); + + if (!txn->commit_cfg_req) + return; + + MGMTD_TXN_ERR("Backend timeout txn-id: %" PRIu64 " aborting commit", + txn->txn_id); + + /* + * Send a COMMIT_CONFIG_REPLY with failure. + * NOTE: The transaction cleanup will be triggered from Front-end + * adapter. + */ + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Operation on the backend timed-out. Aborting commit!"); +} + +/* + * Send CFG_APPLY_REQs to all the backend client. + * + * NOTE: This is always dispatched when all CFGDATA_CREATE_REQs + * for all backend clients has been generated. Please see + * mgmt_txn_register_event() and mgmt_txn_process_commit_cfg() + * for details. + */ +static int mgmt_txn_send_be_cfg_apply(struct mgmt_txn_ctx *txn) +{ + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + struct mgmt_commit_cfg_req *cmtcfg_req; + struct mgmt_txn_batches_head *batch_list; + struct mgmt_txn_be_cfg_batch *batch; + + assert(txn->type == MGMTD_TXN_TYPE_CONFIG && txn->commit_cfg_req); + + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + if (cmtcfg_req->validate_only) { + /* + * If this was a validate-only COMMIT request return success. + */ + (void)mgmt_txn_send_commit_cfg_reply(txn, MGMTD_SUCCESS, NULL); + return 0; + } + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (cmtcfg_req->subscr_info.xpath_subscr[id] & + MGMT_SUBSCR_NOTIFY_CFG) { + adapter = mgmt_be_get_adapter_by_id(id); + if (!adapter) + return -1; + + batch_list = &cmtcfg_req->curr_batches[id]; + if (mgmt_be_send_cfgapply_req(adapter, txn->txn_id)) { + (void)mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Could not send CFG_APPLY_REQ to backend adapter"); + return -1; + } + cmtcfg_req->cmt_stats->last_num_apply_reqs++; + + UNSET_FLAG(adapter->flags, + MGMTD_BE_ADAPTER_FLAGS_CFG_SYNCED); + + FOREACH_TXN_CFG_BATCH_IN_LIST (batch_list, batch) + batch->comm_phase = MGMTD_COMMIT_PHASE_APPLY_CFG; + } + } + + txn->commit_cfg_req->req.commit_cfg.next_phase = + MGMTD_COMMIT_PHASE_TXN_DELETE; + + /* + * Dont move the commit to next phase yet. Wait for all VALIDATE_REPLIES + * to come back. + */ + + return 0; +} + +static void mgmt_txn_process_commit_cfg(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_commit_cfg_req *cmtcfg_req; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + + MGMTD_TXN_DBG("Processing COMMIT_CONFIG for txn-id: %" PRIu64 + " session-id: %" PRIu64 " Phase(Current:'%s', Next: '%s')", + txn->txn_id, txn->session_id, + mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); + + assert(txn->commit_cfg_req); + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + switch (cmtcfg_req->curr_phase) { + case MGMTD_COMMIT_PHASE_PREPARE_CFG: + mgmt_txn_prepare_config(txn); + break; + case MGMTD_COMMIT_PHASE_TXN_CREATE: + if (mm->perf_stats_en) + gettimeofday(&cmtcfg_req->cmt_stats->txn_create_start, + NULL); + /* + * Send TXN_CREATE_REQ to all Backend now. + */ + mgmt_txn_send_be_txn_create(txn); + break; + case MGMTD_COMMIT_PHASE_SEND_CFG: + if (mm->perf_stats_en) + gettimeofday(&cmtcfg_req->cmt_stats->send_cfg_start, + NULL); + /* + * All CFGDATA_CREATE_REQ should have been sent to + * Backend by now. + */ +#ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED + assert(cmtcfg_req->next_phase == MGMTD_COMMIT_PHASE_APPLY_CFG); + MGMTD_TXN_DBG("txn-id: %" PRIu64 " session-id: %" PRIu64 + " trigger sending CFG_VALIDATE_REQ to all backend clients", + txn->txn_id, txn->session_id); +#else /* ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED */ + assert(cmtcfg_req->next_phase == MGMTD_COMMIT_PHASE_APPLY_CFG); + MGMTD_TXN_DBG("txn-id: %" PRIu64 " session-id: %" PRIu64 + " trigger sending CFG_APPLY_REQ to all backend clients", + txn->txn_id, txn->session_id); +#endif /* ifndef MGMTD_LOCAL_VALIDATIONS_ENABLED */ + break; + case MGMTD_COMMIT_PHASE_APPLY_CFG: + if (mm->perf_stats_en) + gettimeofday(&cmtcfg_req->cmt_stats->apply_cfg_start, + NULL); + /* + * We should have received successful CFG_VALIDATE_REPLY from + * all concerned Backend Clients by now. Send out the + * CFG_APPLY_REQs now. + */ + mgmt_txn_send_be_cfg_apply(txn); + break; + case MGMTD_COMMIT_PHASE_TXN_DELETE: + if (mm->perf_stats_en) + gettimeofday(&cmtcfg_req->cmt_stats->txn_del_start, + NULL); + /* + * We would have sent TXN_DELETE_REQ to all backend by now. + * Send a successful CONFIG_COMMIT_REPLY back to front-end. + * NOTE: This should also trigger DS merge/unlock and Txn + * cleanup. Please see mgmt_fe_send_commit_cfg_reply() for + * more details. + */ + EVENT_OFF(txn->comm_cfg_timeout); + mgmt_txn_send_commit_cfg_reply(txn, MGMTD_SUCCESS, NULL); + break; + case MGMTD_COMMIT_PHASE_MAX: + break; + } + + MGMTD_TXN_DBG("txn-id:%" PRIu64 " session-id: %" PRIu64 + " phase updated to (current:'%s', next: '%s')", + txn->txn_id, txn->session_id, + mgmt_txn_commit_phase_str(txn, true), + mgmt_txn_commit_phase_str(txn, false)); +} + +static void mgmt_init_get_data_reply(struct mgmt_get_data_reply *get_reply) +{ + size_t indx; + + for (indx = 0; indx < array_size(get_reply->reply_data); indx++) + get_reply->reply_datap[indx] = &get_reply->reply_data[indx]; +} + +static void mgmt_reset_get_data_reply(struct mgmt_get_data_reply *get_reply) +{ + int indx; + + for (indx = 0; indx < get_reply->num_reply; indx++) { + if (get_reply->reply_xpathp[indx]) { + free(get_reply->reply_xpathp[indx]); + get_reply->reply_xpathp[indx] = 0; + } + if (get_reply->reply_data[indx].xpath) { + zlog_debug("%s free xpath %p", __func__, + get_reply->reply_data[indx].xpath); + free(get_reply->reply_data[indx].xpath); + get_reply->reply_data[indx].xpath = 0; + } + } + + get_reply->num_reply = 0; + memset(&get_reply->data_reply, 0, sizeof(get_reply->data_reply)); + memset(&get_reply->reply_data, 0, sizeof(get_reply->reply_data)); + memset(&get_reply->reply_datap, 0, sizeof(get_reply->reply_datap)); + + memset(&get_reply->reply_value, 0, sizeof(get_reply->reply_value)); + + mgmt_init_get_data_reply(get_reply); +} + +static void mgmt_reset_get_data_reply_buf(struct mgmt_get_data_req *get_data) +{ + if (get_data->reply) + mgmt_reset_get_data_reply(get_data->reply); +} + +static void mgmt_txn_send_getcfg_reply_data(struct mgmt_txn_req *txn_req, + struct mgmt_get_data_req *get_req) +{ + struct mgmt_get_data_reply *get_reply; + Mgmtd__YangDataReply *data_reply; + + get_reply = get_req->reply; + if (!get_reply) + return; + + data_reply = &get_reply->data_reply; + mgmt_yang_data_reply_init(data_reply); + data_reply->n_data = get_reply->num_reply; + data_reply->data = get_reply->reply_datap; + data_reply->next_indx = (!get_reply->last_batch ? get_req->total_reply + : -1); + + MGMTD_TXN_DBG("Sending %zu Get-Config/Data replies next-index:%" PRId64, + data_reply->n_data, data_reply->next_indx); + + switch (txn_req->req_event) { + case MGMTD_TXN_PROC_GETCFG: + if (mgmt_fe_send_get_reply(txn_req->txn->session_id, + txn_req->txn->txn_id, get_req->ds_id, + txn_req->req_id, MGMTD_SUCCESS, + data_reply, NULL) != 0) { + MGMTD_TXN_ERR("Failed to send GET-CONFIG-REPLY txn-id: %" PRIu64 + " session-id: %" PRIu64 + " req-id: %" PRIu64, + txn_req->txn->txn_id, + txn_req->txn->session_id, txn_req->req_id); + } + break; + case MGMTD_TXN_PROC_GETDATA: + if (mgmt_fe_send_get_reply(txn_req->txn->session_id, + txn_req->txn->txn_id, get_req->ds_id, + txn_req->req_id, MGMTD_SUCCESS, + data_reply, NULL) != 0) { + MGMTD_TXN_ERR("Failed to send GET-DATA-REPLY txn-id: %" PRIu64 + " session-id: %" PRIu64 + " req-id: %" PRIu64, + txn_req->txn->txn_id, + txn_req->txn->session_id, txn_req->req_id); + } + break; + case MGMTD_TXN_PROC_SETCFG: + case MGMTD_TXN_PROC_COMMITCFG: + case MGMTD_TXN_COMMITCFG_TIMEOUT: + case MGMTD_TXN_CLEANUP: + MGMTD_TXN_ERR("Invalid Txn-Req-Event %u", txn_req->req_event); + break; + } + + /* + * Reset reply buffer for next reply. + */ + mgmt_reset_get_data_reply_buf(get_req); +} + +static void mgmt_txn_iter_and_send_get_cfg_reply(const char *xpath, + struct lyd_node *node, + struct nb_node *nb_node, + void *ctx) +{ + struct mgmt_txn_req *txn_req; + struct mgmt_get_data_req *get_req; + struct mgmt_get_data_reply *get_reply; + Mgmtd__YangData *data; + Mgmtd__YangDataValue *data_value; + + txn_req = (struct mgmt_txn_req *)ctx; + if (!txn_req) + return; + + if (!(node->schema->nodetype & LYD_NODE_TERM)) + return; + + assert(txn_req->req_event == MGMTD_TXN_PROC_GETCFG || + txn_req->req_event == MGMTD_TXN_PROC_GETDATA); + + get_req = txn_req->req.get_data; + assert(get_req); + get_reply = get_req->reply; + data = &get_reply->reply_data[get_reply->num_reply]; + data_value = &get_reply->reply_value[get_reply->num_reply]; + + mgmt_yang_data_init(data); + data->xpath = strdup(xpath); + mgmt_yang_data_value_init(data_value); + data_value->value_case = MGMTD__YANG_DATA_VALUE__VALUE_ENCODED_STR_VAL; + data_value->encoded_str_val = (char *)lyd_get_value(node); + data->value = data_value; + + get_reply->num_reply++; + get_req->total_reply++; + MGMTD_TXN_DBG(" [%d] XPATH: '%s', Value: '%s'", get_req->total_reply, + data->xpath, data_value->encoded_str_val); + + if (get_reply->num_reply == MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH) + mgmt_txn_send_getcfg_reply_data(txn_req, get_req); +} + +static int mgmt_txn_get_config(struct mgmt_txn_ctx *txn, + struct mgmt_txn_req *txn_req, + struct nb_config *root) +{ + int indx; + struct mgmt_get_data_req *get_data; + struct mgmt_get_data_reply *get_reply; + + get_data = txn_req->req.get_data; + + if (!get_data->reply) { + get_data->reply = XCALLOC(MTYPE_MGMTD_TXN_GETDATA_REPLY, + sizeof(struct mgmt_get_data_reply)); + if (!get_data->reply) { + mgmt_fe_send_get_reply( + txn->session_id, txn->txn_id, get_data->ds_id, + txn_req->req_id, MGMTD_INTERNAL_ERROR, NULL, + "Internal error: Unable to allocate reply buffers!"); + goto mgmt_txn_get_config_failed; + } + } + + /* + * Read data contents from the DS and respond back directly. + * No need to go to backend for getting data. + */ + get_reply = get_data->reply; + for (indx = 0; indx < get_data->num_xpaths; indx++) { + MGMTD_TXN_DBG("Trying to get all data under '%s'", + get_data->xpaths[indx]); + mgmt_init_get_data_reply(get_reply); + /* + * mgmt_ds_iter_data works on path prefixes, but the user may + * want to also use an xpath regexp we need to add this + * functionality. + */ + if (mgmt_ds_iter_data(get_data->ds_id, root, + get_data->xpaths[indx], + mgmt_txn_iter_and_send_get_cfg_reply, + (void *)txn_req) == -1) { + MGMTD_TXN_DBG("Invalid Xpath '%s", + get_data->xpaths[indx]); + mgmt_fe_send_get_reply(txn->session_id, txn->txn_id, + get_data->ds_id, txn_req->req_id, + MGMTD_INTERNAL_ERROR, NULL, + "Invalid xpath"); + goto mgmt_txn_get_config_failed; + } + MGMTD_TXN_DBG("Got %d remaining data-replies for xpath '%s'", + get_reply->num_reply, get_data->xpaths[indx]); + get_reply->last_batch = true; + mgmt_txn_send_getcfg_reply_data(txn_req, get_data); + } + +mgmt_txn_get_config_failed: + + /* + * Delete the txn request. It will also remove it from request + * list. + */ + mgmt_txn_req_free(&txn_req); + + return 0; +} + +static void mgmt_txn_process_get_cfg(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + struct nb_config *cfg_root; + int num_processed = 0; + bool error; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + + MGMTD_TXN_DBG("Processing %zu GET_CONFIG requests txn-id: %" PRIu64 + " session-id: %" PRIu64, + mgmt_txn_reqs_count(&txn->get_cfg_reqs), txn->txn_id, + txn->session_id); + + FOREACH_TXN_REQ_IN_LIST (&txn->get_cfg_reqs, txn_req) { + error = false; + assert(txn_req->req_event == MGMTD_TXN_PROC_GETCFG); + cfg_root = txn_req->req.get_data->cfg_root; + assert(cfg_root); + + if (mgmt_txn_get_config(txn, txn_req, cfg_root) != 0) { + MGMTD_TXN_ERR("Unable to retrieve config from DS %d txn-id: %" PRIu64 + " session-id: %" PRIu64 + " req-id: %" PRIu64, + txn_req->req.get_data->ds_id, txn->txn_id, + txn->session_id, txn_req->req_id); + error = true; + } + + if (error) { + /* + * Delete the txn request. + * Note: The following will remove it from the list + * as well. + */ + mgmt_txn_req_free(&txn_req); + } + + /* + * Else the transaction would have been already deleted or + * moved to corresponding pending list. No need to delete it. + */ + num_processed++; + if (num_processed == MGMTD_TXN_MAX_NUM_GETCFG_PROC) + break; + } + + if (mgmt_txn_reqs_count(&txn->get_cfg_reqs)) { + MGMTD_TXN_DBG("Processed maximum number of Get-Config requests (%d/%d). Rescheduling for rest.", + num_processed, MGMTD_TXN_MAX_NUM_GETCFG_PROC); + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_GETCFG); + } +} + +static void mgmt_txn_process_get_data(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + int num_processed = 0; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + + MGMTD_TXN_DBG("Processing %zu GET_DATA requests txn-id: %" PRIu64 + " session-id: %" PRIu64, + mgmt_txn_reqs_count(&txn->get_data_reqs), txn->txn_id, + txn->session_id); + + FOREACH_TXN_REQ_IN_LIST (&txn->get_data_reqs, txn_req) { + assert(txn_req->req_event == MGMTD_TXN_PROC_GETDATA); + + /* + * TODO: Trigger GET procedures for Backend + * For now return back error. + */ + mgmt_fe_send_get_reply(txn->session_id, txn->txn_id, + txn_req->req.get_data->ds_id, + txn_req->req_id, MGMTD_INTERNAL_ERROR, + NULL, "GET-DATA is not supported yet!"); + /* + * Delete the txn request. + * Note: The following will remove it from the list + * as well. + */ + mgmt_txn_req_free(&txn_req); + + /* + * Else the transaction would have been already deleted or + * moved to corresponding pending list. No need to delete it. + */ + num_processed++; + if (num_processed == MGMTD_TXN_MAX_NUM_GETDATA_PROC) + break; + } + + if (mgmt_txn_reqs_count(&txn->get_data_reqs)) { + MGMTD_TXN_DBG("Processed maximum number of Get-Data requests (%d/%d). Rescheduling for rest.", + num_processed, MGMTD_TXN_MAX_NUM_GETDATA_PROC); + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_GETDATA); + } +} + +static struct mgmt_txn_ctx * +mgmt_fe_find_txn_by_session_id(struct mgmt_master *cm, uint64_t session_id, + enum mgmt_txn_type type) +{ + struct mgmt_txn_ctx *txn; + + FOREACH_TXN_IN_LIST (cm, txn) { + if (txn->session_id == session_id && txn->type == type) + return txn; + } + + return NULL; +} + +static struct mgmt_txn_ctx *mgmt_txn_create_new(uint64_t session_id, + enum mgmt_txn_type type) +{ + struct mgmt_txn_ctx *txn = NULL; + + /* + * For 'CONFIG' transaction check if one is already created + * or not. + */ + if (type == MGMTD_TXN_TYPE_CONFIG && mgmt_txn_mm->cfg_txn) { + if (mgmt_config_txn_in_progress() == session_id) + txn = mgmt_txn_mm->cfg_txn; + goto mgmt_create_txn_done; + } + + txn = mgmt_fe_find_txn_by_session_id(mgmt_txn_mm, session_id, type); + if (!txn) { + txn = XCALLOC(MTYPE_MGMTD_TXN, sizeof(struct mgmt_txn_ctx)); + assert(txn); + + txn->session_id = session_id; + txn->type = type; + mgmt_txns_add_tail(&mgmt_txn_mm->txn_list, txn); + mgmt_txn_reqs_init(&txn->set_cfg_reqs); + mgmt_txn_reqs_init(&txn->get_cfg_reqs); + mgmt_txn_reqs_init(&txn->get_data_reqs); + mgmt_txn_reqs_init(&txn->pending_get_datas); + txn->commit_cfg_req = NULL; + txn->refcount = 0; + if (!mgmt_txn_mm->next_txn_id) + mgmt_txn_mm->next_txn_id++; + txn->txn_id = mgmt_txn_mm->next_txn_id++; + hash_get(mgmt_txn_mm->txn_hash, txn, hash_alloc_intern); + + MGMTD_TXN_DBG("Added new '%s' txn-id: %" PRIu64, + mgmt_txn_type2str(type), txn->txn_id); + + if (type == MGMTD_TXN_TYPE_CONFIG) + mgmt_txn_mm->cfg_txn = txn; + + MGMTD_TXN_LOCK(txn); + } + +mgmt_create_txn_done: + return txn; +} + +static void mgmt_txn_delete(struct mgmt_txn_ctx **txn) +{ + MGMTD_TXN_UNLOCK(txn); +} + +static unsigned int mgmt_txn_hash_key(const void *data) +{ + const struct mgmt_txn_ctx *txn = data; + + return jhash2((uint32_t *)&txn->txn_id, + sizeof(txn->txn_id) / sizeof(uint32_t), 0); +} + +static bool mgmt_txn_hash_cmp(const void *d1, const void *d2) +{ + const struct mgmt_txn_ctx *txn1 = d1; + const struct mgmt_txn_ctx *txn2 = d2; + + return (txn1->txn_id == txn2->txn_id); +} + +static void mgmt_txn_hash_free(void *data) +{ + struct mgmt_txn_ctx *txn = data; + + mgmt_txn_delete(&txn); +} + +static void mgmt_txn_hash_init(void) +{ + if (!mgmt_txn_mm || mgmt_txn_mm->txn_hash) + return; + + mgmt_txn_mm->txn_hash = hash_create(mgmt_txn_hash_key, mgmt_txn_hash_cmp, + "MGMT Transactions"); +} + +static void mgmt_txn_hash_destroy(void) +{ + if (!mgmt_txn_mm || !mgmt_txn_mm->txn_hash) + return; + + hash_clean(mgmt_txn_mm->txn_hash, mgmt_txn_hash_free); + hash_free(mgmt_txn_mm->txn_hash); + mgmt_txn_mm->txn_hash = NULL; +} + +static inline struct mgmt_txn_ctx *mgmt_txn_id2ctx(uint64_t txn_id) +{ + struct mgmt_txn_ctx key = { 0 }; + struct mgmt_txn_ctx *txn; + + if (!mgmt_txn_mm || !mgmt_txn_mm->txn_hash) + return NULL; + + key.txn_id = txn_id; + txn = hash_lookup(mgmt_txn_mm->txn_hash, &key); + + return txn; +} + +static void mgmt_txn_lock(struct mgmt_txn_ctx *txn, const char *file, int line) +{ + txn->refcount++; + MGMTD_TXN_DBG("%s:%d --> Lock %s txn-id: %" PRIu64 " refcnt: %d", file, + line, mgmt_txn_type2str(txn->type), txn->txn_id, + txn->refcount); +} + +static void mgmt_txn_unlock(struct mgmt_txn_ctx **txn, const char *file, + int line) +{ + assert(*txn && (*txn)->refcount); + + (*txn)->refcount--; + MGMTD_TXN_DBG("%s:%d --> Unlock %s txn-id: %" PRIu64 " refcnt: %d", + file, line, mgmt_txn_type2str((*txn)->type), + (*txn)->txn_id, (*txn)->refcount); + if (!(*txn)->refcount) { + if ((*txn)->type == MGMTD_TXN_TYPE_CONFIG) + if (mgmt_txn_mm->cfg_txn == *txn) + mgmt_txn_mm->cfg_txn = NULL; + EVENT_OFF((*txn)->proc_get_cfg); + EVENT_OFF((*txn)->proc_get_data); + EVENT_OFF((*txn)->proc_comm_cfg); + EVENT_OFF((*txn)->comm_cfg_timeout); + hash_release(mgmt_txn_mm->txn_hash, *txn); + mgmt_txns_del(&mgmt_txn_mm->txn_list, *txn); + + MGMTD_TXN_DBG("Deleted %s txn-id: %" PRIu64 + " session-id: %" PRIu64, + mgmt_txn_type2str((*txn)->type), (*txn)->txn_id, + (*txn)->session_id); + + XFREE(MTYPE_MGMTD_TXN, *txn); + } + + *txn = NULL; +} + +static void mgmt_txn_cleanup_txn(struct mgmt_txn_ctx **txn) +{ + /* TODO: Any other cleanup applicable */ + + mgmt_txn_delete(txn); +} + +static void mgmt_txn_cleanup_all_txns(void) +{ + struct mgmt_txn_ctx *txn; + + if (!mgmt_txn_mm || !mgmt_txn_mm->txn_hash) + return; + + FOREACH_TXN_IN_LIST (mgmt_txn_mm, txn) + mgmt_txn_cleanup_txn(&txn); +} + +static void mgmt_txn_cleanup(struct event *thread) +{ + struct mgmt_txn_ctx *txn; + + txn = (struct mgmt_txn_ctx *)EVENT_ARG(thread); + assert(txn); + + mgmt_txn_cleanup_txn(&txn); +} + +static void mgmt_txn_register_event(struct mgmt_txn_ctx *txn, + enum mgmt_txn_event event) +{ + struct timeval tv = { .tv_sec = 0, + .tv_usec = MGMTD_TXN_PROC_DELAY_USEC }; + + assert(mgmt_txn_mm && mgmt_txn_tm); + + switch (event) { + case MGMTD_TXN_PROC_SETCFG: + event_add_timer_tv(mgmt_txn_tm, mgmt_txn_process_set_cfg, txn, + &tv, &txn->proc_set_cfg); + break; + case MGMTD_TXN_PROC_COMMITCFG: + event_add_timer_tv(mgmt_txn_tm, mgmt_txn_process_commit_cfg, + txn, &tv, &txn->proc_comm_cfg); + break; + case MGMTD_TXN_PROC_GETCFG: + event_add_timer_tv(mgmt_txn_tm, mgmt_txn_process_get_cfg, txn, + &tv, &txn->proc_get_cfg); + break; + case MGMTD_TXN_PROC_GETDATA: + event_add_timer_tv(mgmt_txn_tm, mgmt_txn_process_get_data, txn, + &tv, &txn->proc_get_data); + break; + case MGMTD_TXN_COMMITCFG_TIMEOUT: + event_add_timer_msec(mgmt_txn_tm, mgmt_txn_cfg_commit_timedout, + txn, MGMTD_TXN_CFG_COMMIT_MAX_DELAY_MSEC, + &txn->comm_cfg_timeout); + break; + case MGMTD_TXN_CLEANUP: + tv.tv_usec = MGMTD_TXN_CLEANUP_DELAY_USEC; + event_add_timer_tv(mgmt_txn_tm, mgmt_txn_cleanup, txn, &tv, + &txn->clnup); + } +} + +int mgmt_txn_init(struct mgmt_master *mm, struct event_loop *tm) +{ + if (mgmt_txn_mm || mgmt_txn_tm) + assert(!"MGMTD TXN: Call txn_init() only once"); + + mgmt_txn_mm = mm; + mgmt_txn_tm = tm; + mgmt_txns_init(&mm->txn_list); + mgmt_txn_hash_init(); + assert(!mm->cfg_txn); + mm->cfg_txn = NULL; + + return 0; +} + +void mgmt_txn_destroy(void) +{ + mgmt_txn_cleanup_all_txns(); + mgmt_txn_hash_destroy(); +} + +uint64_t mgmt_config_txn_in_progress(void) +{ + if (mgmt_txn_mm && mgmt_txn_mm->cfg_txn) + return mgmt_txn_mm->cfg_txn->session_id; + + return MGMTD_SESSION_ID_NONE; +} + +uint64_t mgmt_create_txn(uint64_t session_id, enum mgmt_txn_type type) +{ + struct mgmt_txn_ctx *txn; + + txn = mgmt_txn_create_new(session_id, type); + return txn ? txn->txn_id : MGMTD_TXN_ID_NONE; +} + +void mgmt_destroy_txn(uint64_t *txn_id) +{ + struct mgmt_txn_ctx *txn; + + txn = mgmt_txn_id2ctx(*txn_id); + if (!txn) + return; + + mgmt_txn_delete(&txn); + *txn_id = MGMTD_TXN_ID_NONE; +} + +int mgmt_txn_send_set_config_req(uint64_t txn_id, uint64_t req_id, + Mgmtd__DatastoreId ds_id, + struct mgmt_ds_ctx *ds_ctx, + Mgmtd__YangCfgDataReq **cfg_req, + size_t num_req, bool implicit_commit, + Mgmtd__DatastoreId dst_ds_id, + struct mgmt_ds_ctx *dst_ds_ctx) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + size_t indx; + uint16_t *num_chgs; + struct nb_cfg_change *cfg_chg; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn) + return -1; + + if (implicit_commit && mgmt_txn_reqs_count(&txn->set_cfg_reqs)) { + MGMTD_TXN_ERR( + "For implicit commit config only one SETCFG-REQ can be allowed!"); + return -1; + } + + txn_req = mgmt_txn_req_alloc(txn, req_id, MGMTD_TXN_PROC_SETCFG); + txn_req->req.set_cfg->ds_id = ds_id; + txn_req->req.set_cfg->ds_ctx = ds_ctx; + num_chgs = &txn_req->req.set_cfg->num_cfg_changes; + for (indx = 0; indx < num_req; indx++) { + cfg_chg = &txn_req->req.set_cfg->cfg_changes[*num_chgs]; + + if (cfg_req[indx]->req_type == + MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA) + cfg_chg->operation = NB_OP_DESTROY; + else if (cfg_req[indx]->req_type == + MGMTD__CFG_DATA_REQ_TYPE__SET_DATA) + cfg_chg->operation = + mgmt_ds_find_data_node_by_xpath(ds_ctx, + cfg_req[indx] + ->data + ->xpath) + ? NB_OP_MODIFY + : NB_OP_CREATE; + else + continue; + + MGMTD_TXN_DBG("XPath: '%s', Value: '%s'", + cfg_req[indx]->data->xpath, + (cfg_req[indx]->data->value && + cfg_req[indx]->data->value->encoded_str_val + ? cfg_req[indx]->data->value->encoded_str_val + : "NULL")); + strlcpy(cfg_chg->xpath, cfg_req[indx]->data->xpath, + sizeof(cfg_chg->xpath)); + cfg_chg->value = + (cfg_req[indx]->data->value && + cfg_req[indx]->data->value->encoded_str_val + ? strdup(cfg_req[indx] + ->data->value->encoded_str_val) + : NULL); + if (cfg_chg->value) + MGMTD_TXN_DBG("Allocated value at %p ==> '%s'", + cfg_chg->value, cfg_chg->value); + + (*num_chgs)++; + } + txn_req->req.set_cfg->implicit_commit = implicit_commit; + txn_req->req.set_cfg->dst_ds_id = dst_ds_id; + txn_req->req.set_cfg->dst_ds_ctx = dst_ds_ctx; + txn_req->req.set_cfg->setcfg_stats = + mgmt_fe_get_session_setcfg_stats(txn->session_id); + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_SETCFG); + + return 0; +} + +int mgmt_txn_send_commit_config_req(uint64_t txn_id, uint64_t req_id, + Mgmtd__DatastoreId src_ds_id, + struct mgmt_ds_ctx *src_ds_ctx, + Mgmtd__DatastoreId dst_ds_id, + struct mgmt_ds_ctx *dst_ds_ctx, + bool validate_only, bool abort, + bool implicit) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn) + return -1; + + if (txn->commit_cfg_req) { + MGMTD_TXN_ERR("Commit already in-progress txn-id: %" PRIu64 + " session-id: %" PRIu64 ". Cannot start another", + txn->txn_id, txn->session_id); + return -1; + } + + txn_req = mgmt_txn_req_alloc(txn, req_id, MGMTD_TXN_PROC_COMMITCFG); + txn_req->req.commit_cfg.src_ds_id = src_ds_id; + txn_req->req.commit_cfg.src_ds_ctx = src_ds_ctx; + txn_req->req.commit_cfg.dst_ds_id = dst_ds_id; + txn_req->req.commit_cfg.dst_ds_ctx = dst_ds_ctx; + txn_req->req.commit_cfg.validate_only = validate_only; + txn_req->req.commit_cfg.abort = abort; + txn_req->req.commit_cfg.implicit = implicit; + txn_req->req.commit_cfg.cmt_stats = + mgmt_fe_get_session_commit_stats(txn->session_id); + + /* + * Trigger a COMMIT-CONFIG process. + */ + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_COMMITCFG); + return 0; +} + +int mgmt_txn_notify_be_adapter_conn(struct mgmt_be_client_adapter *adapter, + bool connect) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + struct mgmt_commit_cfg_req *cmtcfg_req; + static struct mgmt_commit_stats dummy_stats; + struct nb_config_cbs *adapter_cfgs = NULL; + + memset(&dummy_stats, 0, sizeof(dummy_stats)); + if (connect) { + /* Get config for this single backend client */ + + mgmt_be_get_adapter_config(adapter, &adapter_cfgs); + if (!adapter_cfgs || RB_EMPTY(nb_config_cbs, adapter_cfgs)) { + SET_FLAG(adapter->flags, + MGMTD_BE_ADAPTER_FLAGS_CFG_SYNCED); + return 0; + } + + /* + * Create a CONFIG transaction to push the config changes + * provided to the backend client. + */ + txn = mgmt_txn_create_new(0, MGMTD_TXN_TYPE_CONFIG); + if (!txn) { + MGMTD_TXN_ERR("Failed to create CONFIG Transaction for downloading CONFIGs for client '%s'", + adapter->name); + return -1; + } + + MGMTD_TXN_DBG("Created initial txn-id: %" PRIu64 + " for BE client '%s'", + txn->txn_id, adapter->name); + /* + * Set the changeset for transaction to commit and trigger the + * commit request. + */ + txn_req = mgmt_txn_req_alloc(txn, 0, MGMTD_TXN_PROC_COMMITCFG); + txn_req->req.commit_cfg.src_ds_id = MGMTD_DS_NONE; + txn_req->req.commit_cfg.src_ds_ctx = 0; + txn_req->req.commit_cfg.dst_ds_id = MGMTD_DS_NONE; + txn_req->req.commit_cfg.dst_ds_ctx = 0; + txn_req->req.commit_cfg.validate_only = false; + txn_req->req.commit_cfg.abort = false; + txn_req->req.commit_cfg.cmt_stats = &dummy_stats; + txn_req->req.commit_cfg.cfg_chgs = adapter_cfgs; + + /* + * Trigger a COMMIT-CONFIG process. + */ + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_COMMITCFG); + + } else { + /* + * Check if any transaction is currently on-going that + * involves this backend client. If so, report the transaction + * has failed. + */ + FOREACH_TXN_IN_LIST (mgmt_txn_mm, txn) { + /* TODO: update with operational state when that is + * completed */ + if (txn->type == MGMTD_TXN_TYPE_CONFIG) { + cmtcfg_req = txn->commit_cfg_req + ? &txn->commit_cfg_req->req + .commit_cfg + : NULL; + if (cmtcfg_req && + cmtcfg_req->subscr_info + .xpath_subscr[adapter->id]) { + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Backend daemon disconnected while processing commit!"); + } + } + } + } + + return 0; +} + +int mgmt_txn_notify_be_txn_reply(uint64_t txn_id, bool create, bool success, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_commit_cfg_req *cmtcfg_req = NULL; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG) + return -1; + + if (!create && !txn->commit_cfg_req) + return 0; + + assert(txn->commit_cfg_req); + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + if (create) { + if (success) { + /* + * Done with TXN_CREATE. Move the backend client to + * next phase. + */ + assert(cmtcfg_req->curr_phase == + MGMTD_COMMIT_PHASE_TXN_CREATE); + + /* + * Send CFGDATA_CREATE-REQs to the backend immediately. + */ + mgmt_txn_send_be_cfg_data(txn, adapter); + } else { + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + "Internal error! Failed to initiate transaction at backend!"); + } + } else { + /* + * Done with TXN_DELETE. Move the backend client to next phase. + */ + if (false) + mgmt_move_be_commit_to_next_phase(txn, adapter); + } + + return 0; +} + +int mgmt_txn_notify_be_cfgdata_reply(uint64_t txn_id, uint64_t batch_id, + bool success, char *error_if_any, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_be_cfg_batch *batch; + struct mgmt_commit_cfg_req *cmtcfg_req; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG) + return -1; + + if (!txn->commit_cfg_req) + return -1; + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + + batch = mgmt_txn_cfgbatch_id2ctx(txn, batch_id); + if (!batch || batch->txn != txn) + return -1; + + if (!success) { + MGMTD_TXN_ERR("CFGDATA_CREATE_REQ sent to '%s' failed txn-id: %" PRIu64 + " batch-id %" PRIu64 " err: %s", + adapter->name, txn->txn_id, batch->batch_id, + error_if_any ? error_if_any : "None"); + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + error_if_any + ? error_if_any + : "Internal error! Failed to download config data to backend!"); + return 0; + } + + MGMTD_TXN_DBG("CFGDATA_CREATE_REQ sent to '%s' was successful txn-id: %" PRIu64 + " batch-id %" PRIu64 " err: %s", + adapter->name, txn->txn_id, batch->batch_id, + error_if_any ? error_if_any : "None"); + mgmt_move_txn_cfg_batch_to_next(cmtcfg_req, batch, + &cmtcfg_req->curr_batches[adapter->id], + &cmtcfg_req->next_batches[adapter->id], + true, MGMTD_COMMIT_PHASE_APPLY_CFG); + + mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); + + return 0; +} + +int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success, + uint64_t batch_ids[], + size_t num_batch_ids, char *error_if_any, + struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_be_cfg_batch *batch; + struct mgmt_commit_cfg_req *cmtcfg_req = NULL; + size_t indx; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG || !txn->commit_cfg_req) + return -1; + + cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; + + if (!success) { + MGMTD_TXN_ERR("CFGDATA_APPLY_REQ sent to '%s' failed txn-id: %" PRIu64 + " batch ids %" PRIu64 " - %" PRIu64 " err: %s", + adapter->name, txn->txn_id, batch_ids[0], + batch_ids[num_batch_ids - 1], + error_if_any ? error_if_any : "None"); + mgmt_txn_send_commit_cfg_reply( + txn, MGMTD_INTERNAL_ERROR, + error_if_any + ? error_if_any + : "Internal error! Failed to apply config data on backend!"); + return 0; + } + + for (indx = 0; indx < num_batch_ids; indx++) { + batch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]); + if (batch->txn != txn) + return -1; + mgmt_move_txn_cfg_batch_to_next( + cmtcfg_req, batch, + &cmtcfg_req->curr_batches[adapter->id], + &cmtcfg_req->next_batches[adapter->id], true, + MGMTD_COMMIT_PHASE_TXN_DELETE); + } + + if (!mgmt_txn_batches_count(&cmtcfg_req->curr_batches[adapter->id])) { + /* + * All configuration for the specific backend has been applied. + * Send TXN-DELETE to wrap up the transaction for this backend. + */ + SET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_CFG_SYNCED); + mgmt_txn_send_be_txn_delete(txn, adapter); + } + + mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); + if (mm->perf_stats_en) + gettimeofday(&cmtcfg_req->cmt_stats->apply_cfg_end, NULL); + + return 0; +} + +int mgmt_txn_send_get_req(uint64_t txn_id, uint64_t req_id, + Mgmtd__DatastoreId ds_id, struct nb_config *cfg_root, + Mgmtd__YangGetDataReq **data_req, size_t num_reqs) +{ + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + enum mgmt_txn_event req_event; + size_t indx; + + txn = mgmt_txn_id2ctx(txn_id); + if (!txn) + return -1; + + req_event = cfg_root ? MGMTD_TXN_PROC_GETCFG : MGMTD_TXN_PROC_GETDATA; + + txn_req = mgmt_txn_req_alloc(txn, req_id, req_event); + txn_req->req.get_data->ds_id = ds_id; + txn_req->req.get_data->cfg_root = cfg_root; + for (indx = 0; + indx < num_reqs && indx < MGMTD_MAX_NUM_DATA_REPLY_IN_BATCH; + indx++) { + MGMTD_TXN_DBG("XPath: '%s'", data_req[indx]->data->xpath); + txn_req->req.get_data->xpaths[indx] = + strdup(data_req[indx]->data->xpath); + txn_req->req.get_data->num_xpaths++; + } + + mgmt_txn_register_event(txn, req_event); + + return 0; +} + +void mgmt_txn_status_write(struct vty *vty) +{ + struct mgmt_txn_ctx *txn; + + vty_out(vty, "MGMTD Transactions\n"); + + FOREACH_TXN_IN_LIST (mgmt_txn_mm, txn) { + vty_out(vty, " Txn: \t\t\t0x%p\n", txn); + vty_out(vty, " Txn-Id: \t\t\t%" PRIu64 "\n", txn->txn_id); + vty_out(vty, " Session-Id: \t\t%" PRIu64 "\n", + txn->session_id); + vty_out(vty, " Type: \t\t\t%s\n", + mgmt_txn_type2str(txn->type)); + vty_out(vty, " Ref-Count: \t\t\t%d\n", txn->refcount); + } + vty_out(vty, " Total: %d\n", + (int)mgmt_txns_count(&mgmt_txn_mm->txn_list)); +} + +int mgmt_txn_rollback_trigger_cfg_apply(struct mgmt_ds_ctx *src_ds_ctx, + struct mgmt_ds_ctx *dst_ds_ctx) +{ + static struct nb_config_cbs changes; + static struct mgmt_commit_stats dummy_stats; + + struct nb_config_cbs *cfg_chgs = NULL; + struct mgmt_txn_ctx *txn; + struct mgmt_txn_req *txn_req; + + memset(&changes, 0, sizeof(changes)); + memset(&dummy_stats, 0, sizeof(dummy_stats)); + /* + * This could be the case when the config is directly + * loaded onto the candidate DS from a file. Get the + * diff from a full comparison of the candidate and + * running DSs. + */ + nb_config_diff(mgmt_ds_get_nb_config(dst_ds_ctx), + mgmt_ds_get_nb_config(src_ds_ctx), &changes); + cfg_chgs = &changes; + + if (RB_EMPTY(nb_config_cbs, cfg_chgs)) { + /* + * This means there's no changes to commit whatsoever + * is the source of the changes in config. + */ + return -1; + } + + /* + * Create a CONFIG transaction to push the config changes + * provided to the backend client. + */ + txn = mgmt_txn_create_new(0, MGMTD_TXN_TYPE_CONFIG); + if (!txn) { + MGMTD_TXN_ERR( + "Failed to create CONFIG Transaction for downloading CONFIGs"); + return -1; + } + + MGMTD_TXN_DBG("Created rollback txn-id: %" PRIu64, txn->txn_id); + + /* + * Set the changeset for transaction to commit and trigger the commit + * request. + */ + txn_req = mgmt_txn_req_alloc(txn, 0, MGMTD_TXN_PROC_COMMITCFG); + txn_req->req.commit_cfg.src_ds_id = MGMTD_DS_CANDIDATE; + txn_req->req.commit_cfg.src_ds_ctx = src_ds_ctx; + txn_req->req.commit_cfg.dst_ds_id = MGMTD_DS_RUNNING; + txn_req->req.commit_cfg.dst_ds_ctx = dst_ds_ctx; + txn_req->req.commit_cfg.validate_only = false; + txn_req->req.commit_cfg.abort = false; + txn_req->req.commit_cfg.rollback = true; + txn_req->req.commit_cfg.cmt_stats = &dummy_stats; + txn_req->req.commit_cfg.cfg_chgs = cfg_chgs; + + /* + * Trigger a COMMIT-CONFIG process. + */ + mgmt_txn_register_event(txn, MGMTD_TXN_PROC_COMMITCFG); + return 0; +} |