diff options
Diffstat (limited to 'mgmtd/mgmt_fe_adapter.c')
-rw-r--r-- | mgmtd/mgmt_fe_adapter.c | 352 |
1 files changed, 337 insertions, 15 deletions
diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index ec8e773..fc1bde0 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -898,11 +898,13 @@ static int mgmt_fe_session_handle_commit_config_req_msg( /* * Create COMMITConfig request under the transaction */ - if (mgmt_txn_send_commit_config_req( - session->cfg_txn_id, commcfg_req->req_id, - commcfg_req->src_ds_id, src_ds_ctx, commcfg_req->dst_ds_id, - dst_ds_ctx, commcfg_req->validate_only, commcfg_req->abort, - false) != 0) { + if (mgmt_txn_send_commit_config_req(session->cfg_txn_id, + commcfg_req->req_id, + commcfg_req->src_ds_id, src_ds_ctx, + commcfg_req->dst_ds_id, dst_ds_ctx, + commcfg_req->validate_only, + commcfg_req->abort, false, + NULL) != 0) { fe_adapter_send_commit_cfg_reply( session, commcfg_req->src_ds_id, commcfg_req->dst_ds_id, commcfg_req->req_id, MGMTD_INTERNAL_ERROR, @@ -1099,6 +1101,74 @@ done: return ret; } +static int fe_adapter_send_rpc_reply(struct mgmt_fe_session_ctx *session, + uint64_t req_id, uint8_t result_type, + const struct lyd_node *result) +{ + struct mgmt_msg_rpc_reply *msg; + uint8_t **darrp = NULL; + int ret; + + msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_rpc_reply, 0, + MTYPE_MSG_NATIVE_RPC_REPLY); + msg->refer_id = session->session_id; + msg->req_id = req_id; + msg->code = MGMT_MSG_CODE_RPC_REPLY; + msg->result_type = result_type; + + if (result) { + darrp = mgmt_msg_native_get_darrp(msg); + ret = yang_print_tree_append(darrp, result, result_type, 0); + if (ret != LY_SUCCESS) { + __log_err("Error building rpc-reply result for client %s session-id %" PRIu64 + " req-id %" PRIu64 " result type %u", + session->adapter->name, session->session_id, + req_id, result_type); + goto done; + } + } + + __dbg("Sending rpc-reply from adapter %s to session-id %" PRIu64 + " req-id %" PRIu64 " len %u", + session->adapter->name, session->session_id, req_id, + mgmt_msg_native_get_msg_len(msg)); + + ret = fe_adapter_send_native_msg(session->adapter, msg, + mgmt_msg_native_get_msg_len(msg), + false); +done: + mgmt_msg_native_free_msg(msg); + + return ret; +} + +static int fe_adapter_send_edit_reply(struct mgmt_fe_session_ctx *session, + uint64_t req_id, const char *xpath) +{ + struct mgmt_msg_edit_reply *msg; + int ret; + + msg = mgmt_msg_native_alloc_msg(struct mgmt_msg_edit_reply, 0, + MTYPE_MSG_NATIVE_EDIT_REPLY); + msg->refer_id = session->session_id; + msg->req_id = req_id; + msg->code = MGMT_MSG_CODE_EDIT_REPLY; + + mgmt_msg_native_xpath_encode(msg, xpath); + + __dbg("Sending edit-reply from adapter %s to session-id %" PRIu64 + " req-id %" PRIu64 " len %u", + session->adapter->name, session->session_id, req_id, + mgmt_msg_native_get_msg_len(msg)); + + ret = fe_adapter_send_native_msg(session->adapter, msg, + mgmt_msg_native_get_msg_len(msg), + false); + mgmt_msg_native_free_msg(msg); + + return ret; +} + /** * fe_adapter_handle_get_data() - Handle a get-tree message from a FE client. * @session: the client session. @@ -1186,7 +1256,8 @@ static void fe_adapter_handle_get_data(struct mgmt_fe_session_ctx *session, } darr_free(snodes); - clients = mgmt_be_interested_clients(msg->xpath, false); + clients = mgmt_be_interested_clients(msg->xpath, + MGMT_BE_XPATH_SUBSCR_TYPE_OPER); if (!clients && !CHECK_FLAG(msg->flags, GET_DATA_FLAG_CONFIG)) { __dbg("No backends provide xpath: %s for txn-id: %" PRIu64 " session-id: %" PRIu64, @@ -1224,6 +1295,196 @@ done: darr_free(xpath_resolved); } +static void fe_adapter_handle_edit(struct mgmt_fe_session_ctx *session, + void *__msg, size_t msg_len) +{ + struct mgmt_msg_edit *msg = __msg; + Mgmtd__DatastoreId ds_id, rds_id; + struct mgmt_ds_ctx *ds_ctx, *rds_ctx; + const char *xpath, *data; + bool lock, commit; + int ret; + + if (msg->datastore != MGMT_MSG_DATASTORE_CANDIDATE) { + fe_adapter_send_error(session, msg->req_id, false, -EINVAL, + "Unsupported datastore"); + return; + } + + xpath = mgmt_msg_native_xpath_data_decode(msg, msg_len, data); + if (!xpath) { + fe_adapter_send_error(session, msg->req_id, false, -EINVAL, + "Invalid message"); + return; + } + + ds_id = MGMTD_DS_CANDIDATE; + ds_ctx = mgmt_ds_get_ctx_by_id(mm, ds_id); + assert(ds_ctx); + + rds_id = MGMTD_DS_RUNNING; + rds_ctx = mgmt_ds_get_ctx_by_id(mm, rds_id); + assert(rds_ctx); + + lock = CHECK_FLAG(msg->flags, EDIT_FLAG_IMPLICIT_LOCK); + commit = CHECK_FLAG(msg->flags, EDIT_FLAG_IMPLICIT_COMMIT); + + if (lock) { + if (mgmt_fe_session_write_lock_ds(ds_id, ds_ctx, session)) { + fe_adapter_send_error(session, msg->req_id, false, + -EBUSY, + "Candidate DS is locked by another session"); + return; + } + + if (commit) { + if (mgmt_fe_session_write_lock_ds(rds_id, rds_ctx, + session)) { + mgmt_fe_session_unlock_ds(ds_id, ds_ctx, + session); + fe_adapter_send_error( + session, msg->req_id, false, -EBUSY, + "Running DS is locked by another session"); + return; + } + } + } else { + if (!session->ds_locked[ds_id]) { + fe_adapter_send_error(session, msg->req_id, false, + -EBUSY, + "Candidate DS is not locked"); + return; + } + + if (commit) { + if (!session->ds_locked[rds_id]) { + fe_adapter_send_error(session, msg->req_id, + false, -EBUSY, + "Running DS is not locked"); + return; + } + } + } + + session->cfg_txn_id = mgmt_create_txn(session->session_id, + MGMTD_TXN_TYPE_CONFIG); + if (session->cfg_txn_id == MGMTD_SESSION_ID_NONE) { + if (lock) { + mgmt_fe_session_unlock_ds(ds_id, ds_ctx, session); + if (commit) + mgmt_fe_session_unlock_ds(rds_id, rds_ctx, + session); + } + fe_adapter_send_error(session, msg->req_id, false, -EBUSY, + "Failed to create a configuration transaction"); + return; + } + + __dbg("Created new config txn-id: %" PRIu64 " for session-id: %" PRIu64, + session->cfg_txn_id, session->session_id); + + ret = mgmt_txn_send_edit(session->cfg_txn_id, msg->req_id, ds_id, + ds_ctx, rds_id, rds_ctx, lock, commit, + msg->request_type, msg->flags, msg->operation, + xpath, data); + if (ret) { + /* destroy the just created txn */ + mgmt_destroy_txn(&session->cfg_txn_id); + if (lock) { + mgmt_fe_session_unlock_ds(ds_id, ds_ctx, session); + if (commit) + mgmt_fe_session_unlock_ds(rds_id, rds_ctx, + session); + } + fe_adapter_send_error(session, msg->req_id, false, -EBUSY, + "Failed to create a configuration transaction"); + } +} + +/** + * fe_adapter_handle_rpc() - Handle an RPC message from an FE client. + * @session: the client session. + * @msg_raw: the message data. + * @msg_len: the length of the message data. + */ +static void fe_adapter_handle_rpc(struct mgmt_fe_session_ctx *session, + void *__msg, size_t msg_len) +{ + struct mgmt_msg_rpc *msg = __msg; + const struct lysc_node *snode; + const char *xpath, *data; + uint64_t req_id = msg->req_id; + uint64_t clients; + int ret; + + __dbg("Received RPC request from client %s for session-id %" PRIu64 + " req-id %" PRIu64, + session->adapter->name, session->session_id, msg->req_id); + + xpath = mgmt_msg_native_xpath_data_decode(msg, msg_len, data); + if (!xpath) { + fe_adapter_send_error(session, req_id, false, -EINVAL, + "Invalid message"); + return; + } + + if (session->txn_id != MGMTD_TXN_ID_NONE) { + fe_adapter_send_error(session, req_id, false, -EINPROGRESS, + "Transaction in progress txn-id: %" PRIu64 + " for session-id: %" PRIu64, + session->txn_id, session->session_id); + return; + } + + snode = lys_find_path(ly_native_ctx, NULL, xpath, 0); + if (!snode) { + fe_adapter_send_error(session, req_id, false, -ENOENT, + "No such path: %s", xpath); + return; + } + + if (snode->nodetype != LYS_RPC && snode->nodetype != LYS_ACTION) { + fe_adapter_send_error(session, req_id, false, -EINVAL, + "Not an RPC or action path: %s", xpath); + return; + } + + clients = mgmt_be_interested_clients(xpath, + MGMT_BE_XPATH_SUBSCR_TYPE_RPC); + if (!clients) { + __dbg("No backends implement xpath: %s for txn-id: %" PRIu64 + " session-id: %" PRIu64, + xpath, session->txn_id, session->session_id); + + fe_adapter_send_error(session, req_id, false, -ENOENT, + "No backends implement xpath: %s", xpath); + return; + } + + /* Start a RPC Transaction */ + session->txn_id = mgmt_create_txn(session->session_id, + MGMTD_TXN_TYPE_RPC); + if (session->txn_id == MGMTD_SESSION_ID_NONE) { + fe_adapter_send_error(session, req_id, false, -EINPROGRESS, + "Failed to create an RPC transaction"); + return; + } + + __dbg("Created new rpc txn-id: %" PRIu64 " for session-id: %" PRIu64, + session->txn_id, session->session_id); + + /* Create an RPC request under the transaction */ + ret = mgmt_txn_send_rpc(session->txn_id, req_id, clients, + msg->request_type, xpath, data, + mgmt_msg_native_data_len_decode(msg, msg_len)); + if (ret) { + /* destroy the just created txn */ + mgmt_destroy_txn(&session->txn_id); + fe_adapter_send_error(session, req_id, false, -EINPROGRESS, + "Failed to create an RPC transaction"); + } +} + /** * Handle a native encoded message from the FE client. */ @@ -1245,6 +1506,12 @@ static void fe_adapter_handle_native_msg(struct mgmt_fe_client_adapter *adapter, case MGMT_MSG_CODE_GET_DATA: fe_adapter_handle_get_data(session, msg, msg_len); break; + case MGMT_MSG_CODE_EDIT: + fe_adapter_handle_edit(session, msg, msg_len); + break; + case MGMT_MSG_CODE_RPC: + fe_adapter_handle_rpc(session, msg, msg_len); + break; default: __log_err("unknown native message session-id %" PRIu64 " req-id %" PRIu64 " code %u to FE adapter %s", @@ -1484,6 +1751,70 @@ int mgmt_fe_adapter_send_tree_data(uint64_t session_id, uint64_t txn_id, return ret; } +int mgmt_fe_adapter_send_rpc_reply(uint64_t session_id, uint64_t txn_id, + uint64_t req_id, LYD_FORMAT result_type, + const struct lyd_node *result) +{ + struct mgmt_fe_session_ctx *session; + int ret; + + session = mgmt_session_id2ctx(session_id); + if (!session || session->txn_id != txn_id) + return -1; + + ret = fe_adapter_send_rpc_reply(session, req_id, result_type, result); + + mgmt_destroy_txn(&session->txn_id); + + return ret; +} + +int mgmt_fe_adapter_send_edit_reply(uint64_t session_id, uint64_t txn_id, + uint64_t req_id, bool unlock, bool commit, + const char *xpath, int16_t error, + const char *errstr) +{ + struct mgmt_fe_session_ctx *session; + Mgmtd__DatastoreId ds_id, rds_id; + struct mgmt_ds_ctx *ds_ctx, *rds_ctx; + int ret; + + session = mgmt_session_id2ctx(session_id); + if (!session || session->cfg_txn_id != txn_id) + return -1; + + if (session->cfg_txn_id != MGMTD_TXN_ID_NONE && commit) + mgmt_fe_session_register_event(session, + MGMTD_FE_SESSION_CFG_TXN_CLNUP); + + if (unlock) { + ds_id = MGMTD_DS_CANDIDATE; + ds_ctx = mgmt_ds_get_ctx_by_id(mm, ds_id); + assert(ds_ctx); + + mgmt_fe_session_unlock_ds(ds_id, ds_ctx, session); + + if (commit) { + rds_id = MGMTD_DS_RUNNING; + rds_ctx = mgmt_ds_get_ctx_by_id(mm, rds_id); + assert(rds_ctx); + + mgmt_fe_session_unlock_ds(rds_id, rds_ctx, session); + } + } + + if (error) + ret = fe_adapter_send_error(session, req_id, false, error, "%s", + errstr); + else + ret = fe_adapter_send_edit_reply(session, req_id, xpath); + + if (session->cfg_txn_id != MGMTD_TXN_ID_NONE && !commit) + mgmt_destroy_txn(&session->cfg_txn_id); + + return ret; +} + /** * Send an error back to the FE client and cleanup any in-progress txn. */ @@ -1586,15 +1917,6 @@ mgmt_fe_adapter_cmt_stats_write(struct vty *vty, mgmt_realtime_to_string( &adapter->cmt_stats.txn_create_start, buf, sizeof(buf))); - vty_out(vty, -#ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED - " Send-Config Start: \t\t%s\n", -#else - " Send-Config-Validate Start: \t%s\n", -#endif - mgmt_realtime_to_string( - &adapter->cmt_stats.send_cfg_start, buf, - sizeof(buf))); vty_out(vty, " Apply-Config Start: \t\t%s\n", mgmt_realtime_to_string( &adapter->cmt_stats.apply_cfg_start, |