// libcpg interface for Rust // Copyright (c) 2020 Red Hat, Inc. // // All rights reserved. // // Author: Christine Caulfield (ccaulfi@redhat.com) // #![allow(clippy::single_match)] #![allow(clippy::needless_range_loop)] #![allow(clippy::type_complexity)] // For the code generated by bindgen use crate::sys::cpg as ffi; use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::fmt; use std::os::raw::{c_int, c_void}; use std::ptr::copy_nonoverlapping; use std::slice; use std::string::String; use std::sync::Mutex; // General corosync things use crate::string_from_bytes; use crate::{CsError, DispatchFlags, NodeId, Result}; const CPG_NAMELEN_MAX: usize = 128; const CPG_MEMBERS_MAX: usize = 128; /// RingId returned by totem_confchg_fn #[derive(Copy, Clone)] pub struct RingId { pub nodeid: NodeId, pub seq: u64, } /// Totem delivery guarantee options for [mcast_joined] // The C enum doesn't have numbers in the code // so don't assume we can match them #[derive(Copy, Clone)] pub enum Guarantee { TypeUnordered, TypeFifo, TypeAgreed, TypeSafe, } // Convert internal to cpg.h values. impl Guarantee { pub fn to_c(&self) -> u32 { match self { Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED, Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO, Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED, Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE, } } } /// Flow control state returned from [flow_control_state_get] #[derive(Copy, Clone)] pub enum FlowControlState { Disabled, Enabled, } /// No flags current specified for model1 so leave this at None #[derive(Copy, Clone)] pub enum Model1Flags { None, } /// Reason for cpg item callback #[derive(Copy, Clone)] pub enum Reason { Undefined = 0, Join = 1, Leave = 2, NodeDown = 3, NodeUp = 4, ProcDown = 5, } // Convert to cpg.h values impl Reason { pub fn new(r: u32) -> Reason { match r { 0 => Reason::Undefined, 1 => Reason::Join, 2 => Reason::Leave, 3 => Reason::NodeDown, 4 => Reason::NodeUp, 5 => Reason::ProcDown, _ => Reason::Undefined, } } } impl fmt::Display for Reason { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Reason::Undefined => write!(f, "Undefined"), Reason::Join => write!(f, "Join"), Reason::Leave => write!(f, "Leave"), Reason::NodeDown => write!(f, "NodeDown"), Reason::NodeUp => write!(f, "NodeUp"), Reason::ProcDown => write!(f, "ProcDown"), } } } /// A CPG address entry returned in the callbacks pub struct Address { pub nodeid: NodeId, pub pid: u32, pub reason: Reason, } impl fmt::Debug for Address { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "[nodeid: {}, pid: {}, reason: {}]", self.nodeid, self.pid, self.reason ) } } /// Data for model1 [initialize] #[derive(Copy, Clone)] pub struct Model1Data { pub flags: Model1Flags, pub deliver_fn: Option< fn( handle: &Handle, group_name: String, nodeid: NodeId, pid: u32, msg: &[u8], msg_len: usize, ), >, pub confchg_fn: Option< fn( handle: &Handle, group_name: &str, member_list: Vec
, left_list: Vec
, joined_list: Vec
, ), >, pub totem_confchg_fn: Option)>, } /// Modeldata for [initialize], only v1 supported at the moment #[derive(Copy, Clone)] pub enum ModelData { ModelNone, ModelV1(Model1Data), } /// A handle into the cpg library. Returned from [initialize] and needed for all other calls #[derive(Copy, Clone)] pub struct Handle { cpg_handle: u64, // Corosync library handle model_data: ModelData, } // Used to convert a CPG handle into one of ours lazy_static! { static ref HANDLE_HASH: Mutex> = Mutex::new(HashMap::new()); } // Convert a Rust String into a cpg_name struct for libcpg fn string_to_cpg_name(group: &str) -> Result { if group.len() > CPG_NAMELEN_MAX - 1 { return Err(CsError::CsErrInvalidParam); } let c_name = match CString::new(group) { Ok(n) => n, Err(_) => return Err(CsError::CsErrLibrary), }; let mut c_group = ffi::cpg_name { length: group.len() as u32, value: [0; CPG_NAMELEN_MAX], }; unsafe { // NOTE param order is 'wrong-way round' from C copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len()); } Ok(c_group) } // Convert an array of cpg_addresses to a Vec - used in callbacks fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec
{ let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) }; let mut r_vec = Vec::
::new(); for i in 0..list_entries { let a: Address = Address { nodeid: NodeId::from(temp[i].nodeid), pid: temp[i].pid, reason: Reason::new(temp[i].reason), }; r_vec.push(a); } r_vec } // Called from CPG callback function - munge params back to Rust from C extern "C" fn rust_deliver_fn( handle: ffi::cpg_handle_t, group_name: *const ffi::cpg_name, nodeid: u32, pid: u32, msg: *mut ::std::os::raw::c_void, msg_len: usize, ) { if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { // Convert group_name into a Rust str. let r_group_name = unsafe { CStr::from_ptr(&(*group_name).value[0]) .to_string_lossy() .into_owned() }; let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) }; match h.model_data { ModelData::ModelV1(md) => { if let Some(cb) = md.deliver_fn { (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len); } } _ => {} } } } // Called from CPG callback function - munge params back to Rust from C extern "C" fn rust_confchg_fn( handle: ffi::cpg_handle_t, group_name: *const ffi::cpg_name, member_list: *const ffi::cpg_address, member_list_entries: usize, left_list: *const ffi::cpg_address, left_list_entries: usize, joined_list: *const ffi::cpg_address, joined_list_entries: usize, ) { if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { let r_group_name = unsafe { CStr::from_ptr(&(*group_name).value[0]) .to_string_lossy() .into_owned() }; let r_member_list = cpg_array_to_vec(member_list, member_list_entries); let r_left_list = cpg_array_to_vec(left_list, left_list_entries); let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries); match h.model_data { ModelData::ModelV1(md) => { if let Some(cb) = md.confchg_fn { (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list); } } _ => {} } } } // Called from CPG callback function - munge params back to Rust from C extern "C" fn rust_totem_confchg_fn( handle: ffi::cpg_handle_t, ring_id: ffi::cpg_ring_id, member_list_entries: u32, member_list: *const u32, ) { if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { let r_ring_id = RingId { nodeid: NodeId::from(ring_id.nodeid), seq: ring_id.seq, }; let mut r_member_list = Vec::::new(); let temp_members: &[u32] = unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) }; for i in 0..member_list_entries as usize { r_member_list.push(NodeId::from(temp_members[i])); } match h.model_data { ModelData::ModelV1(md) => { if let Some(cb) = md.totem_confchg_fn { (cb)(h, r_ring_id, r_member_list); } } _ => {} } } } /// Initialize a connection to the cpg library. You must call this before doing anything /// else and use the passed back [Handle]. /// Remember to free the handle using [finalize] when finished. pub fn initialize(model_data: &ModelData, context: u64) -> Result { let mut handle: ffi::cpg_handle_t = 0; let mut m = match model_data { ModelData::ModelV1(_v1) => { ffi::cpg_model_v1_data_t { model: ffi::CPG_MODEL_V1, cpg_deliver_fn: Some(rust_deliver_fn), cpg_confchg_fn: Some(rust_confchg_fn), cpg_totem_confchg_fn: Some(rust_totem_confchg_fn), flags: 0, // No supported flags (yet) } } _ => return Err(CsError::CsErrInvalidParam), }; unsafe { let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void; let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t; let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context); if res == ffi::CS_OK { let rhandle = Handle { cpg_handle: handle, model_data: *model_data, }; HANDLE_HASH.lock().unwrap().insert(handle, rhandle); Ok(rhandle) } else { Err(CsError::from_c(res)) } } } /// Finish with a connection to corosync pub fn finalize(handle: Handle) -> Result<()> { let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) }; if res == ffi::CS_OK { HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle); Ok(()) } else { Err(CsError::from_c(res)) } } // Not sure if an FD is the right thing to return here, but it will do for now. /// Returns a file descriptor to use for poll/select on the CPG handle pub fn fd_get(handle: Handle) -> Result { let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) }; if res == ffi::CS_OK { Ok(c_fd as i32) } else { Err(CsError::from_c(res)) } } /// Call any/all active CPG callbacks for this [Handle] see [DispatchFlags] for details pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) }; if res == ffi::CS_OK { Ok(()) } else { Err(CsError::from_c(res)) } } /// Joins a CPG group for sending and receiving messages pub fn join(handle: Handle, group: &str) -> Result<()> { let res = unsafe { let c_group = string_to_cpg_name(group)?; ffi::cpg_join(handle.cpg_handle, &c_group) }; if res == ffi::CS_OK { Ok(()) } else { Err(CsError::from_c(res)) } } /// Leave the currently joined CPG group, another group can now be joined on /// the same [Handle] or [finalize] can be called to finish using CPG pub fn leave(handle: Handle, group: &str) -> Result<()> { let res = unsafe { let c_group = string_to_cpg_name(group)?; ffi::cpg_leave(handle.cpg_handle, &c_group) }; if res == ffi::CS_OK { Ok(()) } else { Err(CsError::from_c(res)) } } /// Get the local node ID pub fn local_get(handle: Handle) -> Result { let mut nodeid: u32 = 0; let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) }; if res == ffi::CS_OK { Ok(NodeId::from(nodeid)) } else { Err(CsError::from_c(res)) } } /// Get a list of members of a CPG group as a vector of [Address] structs pub fn membership_get(handle: Handle, group: &str) -> Result> { let mut member_list_entries: i32 = 0; let member_list = [ffi::cpg_address { nodeid: 0, pid: 0, reason: 0, }; CPG_MEMBERS_MAX]; let res = unsafe { let mut c_group = string_to_cpg_name(group)?; let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address; ffi::cpg_membership_get( handle.cpg_handle, &mut c_group, &mut *c_memlist, &mut member_list_entries, ) }; if res == ffi::CS_OK { Ok(cpg_array_to_vec( member_list.as_ptr(), member_list_entries as usize, )) } else { Err(CsError::from_c(res)) } } /// Get the maximum size that CPG can send in one corosync message, /// any messages sent via [mcast_joined] that are larger than this /// will be fragmented pub fn max_atomic_msgsize_get(handle: Handle) -> Result { let mut asize: u32 = 0; let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) }; if res == ffi::CS_OK { Ok(asize) } else { Err(CsError::from_c(res)) } } /// Get the current 'context' value for this handle. /// The context value is an arbitrary value that is always passed /// back to callbacks to help identify the source pub fn context_get(handle: Handle) -> Result { let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void; let (res, context) = unsafe { let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context); let context: u64 = c_context as u64; (r, context) }; if res == ffi::CS_OK { Ok(context) } else { Err(CsError::from_c(res)) } } /// Set the current 'context' value for this handle. /// The context value is an arbitrary value that is always passed /// back to callbacks to help identify the source. /// Normally this is set in [initialize], but this allows it to be changed pub fn context_set(handle: Handle, context: u64) -> Result<()> { let res = unsafe { let c_context = context as *mut c_void; ffi::cpg_context_set(handle.cpg_handle, c_context) }; if res == ffi::CS_OK { Ok(()) } else { Err(CsError::from_c(res)) } } /// Get the flow control state of corosync CPG pub fn flow_control_state_get(handle: Handle) -> Result { let mut fc_state: u32 = 0; let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) }; if res == ffi::CS_OK { if fc_state == 1 { Ok(true) } else { Ok(false) } } else { Err(CsError::from_c(res)) } } /// Send a message to the currently joined CPG group pub fn mcast_joined(handle: Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> { let c_iovec = ffi::iovec { iov_base: msg.as_ptr() as *mut c_void, iov_len: msg.len(), }; let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) }; if res == ffi::CS_OK { Ok(()) } else { Err(CsError::from_c(res)) } } /// Type of iteration for [CpgIterStart] #[derive(Copy, Clone)] pub enum CpgIterType { NameOnly = 1, OneGroup = 2, All = 3, } // Iterator based on information on this page. thank you! // https://stackoverflow.com/questions/30218886/how-to-implement-iterator-and-intoiterator-for-a-simple-struct // Object to iterate over /// An object to iterate over a list of CPG groups, create one of these and then use 'for' over it pub struct CpgIterStart { iter_handle: u64, } /// struct returned from iterating over a [CpgIterStart] pub struct CpgIter { pub group: String, pub nodeid: NodeId, pub pid: u32, } pub struct CpgIntoIter { iter_handle: u64, } impl fmt::Debug for CpgIter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "[group: {}, nodeid: {}, pid: {}]", self.group, self.nodeid, self.pid ) } } impl Iterator for CpgIntoIter { type Item = CpgIter; fn next(&mut self) -> Option { let mut c_iter_description = ffi::cpg_iteration_description_t { nodeid: 0, pid: 0, group: ffi::cpg_name { length: 0_u32, value: [0; CPG_NAMELEN_MAX], }, }; let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) }; if res == ffi::CS_OK { let r_group = match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) { Ok(groupname) => groupname, Err(_) => return None, }; Some(CpgIter { group: r_group, nodeid: NodeId::from(c_iter_description.nodeid), pid: c_iter_description.pid, }) } else if res == ffi::CS_ERR_NO_SECTIONS { // End of list unsafe { // Yeah, we don't check this return code. There's nowhere to report it. ffi::cpg_iteration_finalize(self.iter_handle) }; None } else { None } } } impl CpgIterStart { /// Create a new [CpgIterStart] object for iterating over a list of active CPG groups pub fn new(cpg_handle: Handle, group: &str, iter_type: CpgIterType) -> Result { let mut iter_handle: u64 = 0; let res = unsafe { let mut c_group = string_to_cpg_name(group)?; let c_itertype = iter_type as u32; // IterType 'All' requires that the group pointer is passed in as NULL let c_group_ptr = { match iter_type { CpgIterType::All => std::ptr::null_mut(), _ => &mut c_group, } }; ffi::cpg_iteration_initialize( cpg_handle.cpg_handle, c_itertype, c_group_ptr, &mut iter_handle, ) }; if res == ffi::CS_OK { Ok(CpgIterStart { iter_handle }) } else { Err(CsError::from_c(res)) } } } impl IntoIterator for CpgIterStart { type Item = CpgIter; type IntoIter = CpgIntoIter; fn into_iter(self) -> Self::IntoIter { CpgIntoIter { iter_handle: self.iter_handle, } } }