summaryrefslogtreecommitdiffstats
path: root/bindings/rust/src/cpg.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 17:01:24 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 17:01:24 +0000
commit6dd3dfb79125cd02d02efbce435a6c82e5af92ef (patch)
tree45084fc83278586f6bbafcb935f92d53f71a6b03 /bindings/rust/src/cpg.rs
parentInitial commit. (diff)
downloadcorosync-upstream.tar.xz
corosync-upstream.zip
Adding upstream version 3.1.8.upstream/3.1.8upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'bindings/rust/src/cpg.rs')
-rw-r--r--bindings/rust/src/cpg.rs628
1 files changed, 628 insertions, 0 deletions
diff --git a/bindings/rust/src/cpg.rs b/bindings/rust/src/cpg.rs
new file mode 100644
index 0000000..1246497
--- /dev/null
+++ b/bindings/rust/src/cpg.rs
@@ -0,0 +1,628 @@
+// libcpg interface for Rust
+// Copyright (c) 2020 Red Hat, Inc.
+//
+// All rights reserved.
+//
+// Author: Christine Caulfield (ccaulfi@redhat.com)
+//
+
+#![allow(clippy::single_match)]
+#![allow(clippy::needless_range_loop)]
+#![allow(clippy::type_complexity)]
+
+// For the code generated by bindgen
+use crate::sys::cpg as ffi;
+
+use std::collections::HashMap;
+use std::ffi::{CStr, CString};
+use std::fmt;
+use std::os::raw::{c_int, c_void};
+use std::ptr::copy_nonoverlapping;
+use std::slice;
+use std::string::String;
+use std::sync::Mutex;
+
+// General corosync things
+use crate::string_from_bytes;
+use crate::{CsError, DispatchFlags, NodeId, Result};
+
+const CPG_NAMELEN_MAX: usize = 128;
+const CPG_MEMBERS_MAX: usize = 128;
+
+/// RingId returned by totem_confchg_fn
+#[derive(Copy, Clone)]
+pub struct RingId {
+ pub nodeid: NodeId,
+ pub seq: u64,
+}
+
+/// Totem delivery guarantee options for [mcast_joined]
+// The C enum doesn't have numbers in the code
+// so don't assume we can match them
+#[derive(Copy, Clone)]
+pub enum Guarantee {
+ TypeUnordered,
+ TypeFifo,
+ TypeAgreed,
+ TypeSafe,
+}
+
+// Convert internal to cpg.h values.
+impl Guarantee {
+ pub fn to_c(&self) -> u32 {
+ match self {
+ Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED,
+ Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO,
+ Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED,
+ Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE,
+ }
+ }
+}
+
+/// Flow control state returned from [flow_control_state_get]
+#[derive(Copy, Clone)]
+pub enum FlowControlState {
+ Disabled,
+ Enabled,
+}
+
+/// No flags current specified for model1 so leave this at None
+#[derive(Copy, Clone)]
+pub enum Model1Flags {
+ None,
+}
+
+/// Reason for cpg item callback
+#[derive(Copy, Clone)]
+pub enum Reason {
+ Undefined = 0,
+ Join = 1,
+ Leave = 2,
+ NodeDown = 3,
+ NodeUp = 4,
+ ProcDown = 5,
+}
+
+// Convert to cpg.h values
+impl Reason {
+ pub fn new(r: u32) -> Reason {
+ match r {
+ 0 => Reason::Undefined,
+ 1 => Reason::Join,
+ 2 => Reason::Leave,
+ 3 => Reason::NodeDown,
+ 4 => Reason::NodeUp,
+ 5 => Reason::ProcDown,
+ _ => Reason::Undefined,
+ }
+ }
+}
+impl fmt::Display for Reason {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ Reason::Undefined => write!(f, "Undefined"),
+ Reason::Join => write!(f, "Join"),
+ Reason::Leave => write!(f, "Leave"),
+ Reason::NodeDown => write!(f, "NodeDown"),
+ Reason::NodeUp => write!(f, "NodeUp"),
+ Reason::ProcDown => write!(f, "ProcDown"),
+ }
+ }
+}
+
+/// A CPG address entry returned in the callbacks
+pub struct Address {
+ pub nodeid: NodeId,
+ pub pid: u32,
+ pub reason: Reason,
+}
+impl fmt::Debug for Address {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "[nodeid: {}, pid: {}, reason: {}]",
+ self.nodeid, self.pid, self.reason
+ )
+ }
+}
+
+/// Data for model1 [initialize]
+#[derive(Copy, Clone)]
+pub struct Model1Data {
+ pub flags: Model1Flags,
+ pub deliver_fn: Option<
+ fn(
+ handle: &Handle,
+ group_name: String,
+ nodeid: NodeId,
+ pid: u32,
+ msg: &[u8],
+ msg_len: usize,
+ ),
+ >,
+ pub confchg_fn: Option<
+ fn(
+ handle: &Handle,
+ group_name: &str,
+ member_list: Vec<Address>,
+ left_list: Vec<Address>,
+ joined_list: Vec<Address>,
+ ),
+ >,
+ pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>,
+}
+
+/// Modeldata for [initialize], only v1 supported at the moment
+#[derive(Copy, Clone)]
+pub enum ModelData {
+ ModelNone,
+ ModelV1(Model1Data),
+}
+
+/// A handle into the cpg library. Returned from [initialize] and needed for all other calls
+#[derive(Copy, Clone)]
+pub struct Handle {
+ cpg_handle: u64, // Corosync library handle
+ model_data: ModelData,
+}
+
+// Used to convert a CPG handle into one of ours
+lazy_static! {
+ static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new());
+}
+
+// Convert a Rust String into a cpg_name struct for libcpg
+fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> {
+ if group.len() > CPG_NAMELEN_MAX - 1 {
+ return Err(CsError::CsErrInvalidParam);
+ }
+
+ let c_name = match CString::new(group) {
+ Ok(n) => n,
+ Err(_) => return Err(CsError::CsErrLibrary),
+ };
+ let mut c_group = ffi::cpg_name {
+ length: group.len() as u32,
+ value: [0; CPG_NAMELEN_MAX],
+ };
+
+ unsafe {
+ // NOTE param order is 'wrong-way round' from C
+ copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len());
+ }
+
+ Ok(c_group)
+}
+
+// Convert an array of cpg_addresses to a Vec<cpg::Address> - used in callbacks
+fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> {
+ let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) };
+ let mut r_vec = Vec::<Address>::new();
+
+ for i in 0..list_entries {
+ let a: Address = Address {
+ nodeid: NodeId::from(temp[i].nodeid),
+ pid: temp[i].pid,
+ reason: Reason::new(temp[i].reason),
+ };
+ r_vec.push(a);
+ }
+ r_vec
+}
+
+// Called from CPG callback function - munge params back to Rust from C
+extern "C" fn rust_deliver_fn(
+ handle: ffi::cpg_handle_t,
+ group_name: *const ffi::cpg_name,
+ nodeid: u32,
+ pid: u32,
+ msg: *mut ::std::os::raw::c_void,
+ msg_len: usize,
+) {
+ if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
+ // Convert group_name into a Rust str.
+ let r_group_name = unsafe {
+ CStr::from_ptr(&(*group_name).value[0])
+ .to_string_lossy()
+ .into_owned()
+ };
+
+ let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) };
+
+ match h.model_data {
+ ModelData::ModelV1(md) => {
+ if let Some(cb) = md.deliver_fn {
+ (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len);
+ }
+ }
+ _ => {}
+ }
+ }
+}
+
+// Called from CPG callback function - munge params back to Rust from C
+extern "C" fn rust_confchg_fn(
+ handle: ffi::cpg_handle_t,
+ group_name: *const ffi::cpg_name,
+ member_list: *const ffi::cpg_address,
+ member_list_entries: usize,
+ left_list: *const ffi::cpg_address,
+ left_list_entries: usize,
+ joined_list: *const ffi::cpg_address,
+ joined_list_entries: usize,
+) {
+ if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
+ let r_group_name = unsafe {
+ CStr::from_ptr(&(*group_name).value[0])
+ .to_string_lossy()
+ .into_owned()
+ };
+ let r_member_list = cpg_array_to_vec(member_list, member_list_entries);
+ let r_left_list = cpg_array_to_vec(left_list, left_list_entries);
+ let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries);
+
+ match h.model_data {
+ ModelData::ModelV1(md) => {
+ if let Some(cb) = md.confchg_fn {
+ (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list);
+ }
+ }
+ _ => {}
+ }
+ }
+}
+
+// Called from CPG callback function - munge params back to Rust from C
+extern "C" fn rust_totem_confchg_fn(
+ handle: ffi::cpg_handle_t,
+ ring_id: ffi::cpg_ring_id,
+ member_list_entries: u32,
+ member_list: *const u32,
+) {
+ if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
+ let r_ring_id = RingId {
+ nodeid: NodeId::from(ring_id.nodeid),
+ seq: ring_id.seq,
+ };
+ let mut r_member_list = Vec::<NodeId>::new();
+ let temp_members: &[u32] =
+ unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) };
+ for i in 0..member_list_entries as usize {
+ r_member_list.push(NodeId::from(temp_members[i]));
+ }
+
+ match h.model_data {
+ ModelData::ModelV1(md) => {
+ if let Some(cb) = md.totem_confchg_fn {
+ (cb)(h, r_ring_id, r_member_list);
+ }
+ }
+ _ => {}
+ }
+ }
+}
+
+/// Initialize a connection to the cpg library. You must call this before doing anything
+/// else and use the passed back [Handle].
+/// Remember to free the handle using [finalize] when finished.
+pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> {
+ let mut handle: ffi::cpg_handle_t = 0;
+ let mut m = match model_data {
+ ModelData::ModelV1(_v1) => {
+ ffi::cpg_model_v1_data_t {
+ model: ffi::CPG_MODEL_V1,
+ cpg_deliver_fn: Some(rust_deliver_fn),
+ cpg_confchg_fn: Some(rust_confchg_fn),
+ cpg_totem_confchg_fn: Some(rust_totem_confchg_fn),
+ flags: 0, // No supported flags (yet)
+ }
+ }
+ _ => return Err(CsError::CsErrInvalidParam),
+ };
+
+ unsafe {
+ let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void;
+ let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t;
+ let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context);
+
+ if res == ffi::CS_OK {
+ let rhandle = Handle {
+ cpg_handle: handle,
+ model_data: *model_data,
+ };
+ HANDLE_HASH.lock().unwrap().insert(handle, rhandle);
+ Ok(rhandle)
+ } else {
+ Err(CsError::from_c(res))
+ }
+ }
+}
+
+/// Finish with a connection to corosync
+pub fn finalize(handle: Handle) -> Result<()> {
+ let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) };
+ if res == ffi::CS_OK {
+ HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle);
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+// Not sure if an FD is the right thing to return here, but it will do for now.
+/// Returns a file descriptor to use for poll/select on the CPG handle
+pub fn fd_get(handle: Handle) -> Result<i32> {
+ let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int;
+ let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) };
+ if res == ffi::CS_OK {
+ Ok(c_fd as i32)
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Call any/all active CPG callbacks for this [Handle] see [DispatchFlags] for details
+pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> {
+ let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) };
+ if res == ffi::CS_OK {
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Joins a CPG group for sending and receiving messages
+pub fn join(handle: Handle, group: &str) -> Result<()> {
+ let res = unsafe {
+ let c_group = string_to_cpg_name(group)?;
+ ffi::cpg_join(handle.cpg_handle, &c_group)
+ };
+ if res == ffi::CS_OK {
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Leave the currently joined CPG group, another group can now be joined on
+/// the same [Handle] or [finalize] can be called to finish using CPG
+pub fn leave(handle: Handle, group: &str) -> Result<()> {
+ let res = unsafe {
+ let c_group = string_to_cpg_name(group)?;
+ ffi::cpg_leave(handle.cpg_handle, &c_group)
+ };
+ if res == ffi::CS_OK {
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Get the local node ID
+pub fn local_get(handle: Handle) -> Result<NodeId> {
+ let mut nodeid: u32 = 0;
+ let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) };
+ if res == ffi::CS_OK {
+ Ok(NodeId::from(nodeid))
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Get a list of members of a CPG group as a vector of [Address] structs
+pub fn membership_get(handle: Handle, group: &str) -> Result<Vec<Address>> {
+ let mut member_list_entries: i32 = 0;
+ let member_list = [ffi::cpg_address {
+ nodeid: 0,
+ pid: 0,
+ reason: 0,
+ }; CPG_MEMBERS_MAX];
+ let res = unsafe {
+ let mut c_group = string_to_cpg_name(group)?;
+ let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address;
+ ffi::cpg_membership_get(
+ handle.cpg_handle,
+ &mut c_group,
+ &mut *c_memlist,
+ &mut member_list_entries,
+ )
+ };
+ if res == ffi::CS_OK {
+ Ok(cpg_array_to_vec(
+ member_list.as_ptr(),
+ member_list_entries as usize,
+ ))
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Get the maximum size that CPG can send in one corosync message,
+/// any messages sent via [mcast_joined] that are larger than this
+/// will be fragmented
+pub fn max_atomic_msgsize_get(handle: Handle) -> Result<u32> {
+ let mut asize: u32 = 0;
+ let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) };
+ if res == ffi::CS_OK {
+ Ok(asize)
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Get the current 'context' value for this handle.
+/// The context value is an arbitrary value that is always passed
+/// back to callbacks to help identify the source
+pub fn context_get(handle: Handle) -> Result<u64> {
+ let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void;
+ let (res, context) = unsafe {
+ let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context);
+ let context: u64 = c_context as u64;
+ (r, context)
+ };
+ if res == ffi::CS_OK {
+ Ok(context)
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Set the current 'context' value for this handle.
+/// The context value is an arbitrary value that is always passed
+/// back to callbacks to help identify the source.
+/// Normally this is set in [initialize], but this allows it to be changed
+pub fn context_set(handle: Handle, context: u64) -> Result<()> {
+ let res = unsafe {
+ let c_context = context as *mut c_void;
+ ffi::cpg_context_set(handle.cpg_handle, c_context)
+ };
+ if res == ffi::CS_OK {
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Get the flow control state of corosync CPG
+pub fn flow_control_state_get(handle: Handle) -> Result<bool> {
+ let mut fc_state: u32 = 0;
+ let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) };
+ if res == ffi::CS_OK {
+ if fc_state == 1 {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Send a message to the currently joined CPG group
+pub fn mcast_joined(handle: Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> {
+ let c_iovec = ffi::iovec {
+ iov_base: msg.as_ptr() as *mut c_void,
+ iov_len: msg.len(),
+ };
+ let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) };
+ if res == ffi::CS_OK {
+ Ok(())
+ } else {
+ Err(CsError::from_c(res))
+ }
+}
+
+/// Type of iteration for [CpgIterStart]
+#[derive(Copy, Clone)]
+pub enum CpgIterType {
+ NameOnly = 1,
+ OneGroup = 2,
+ All = 3,
+}
+
+// Iterator based on information on this page. thank you!
+// https://stackoverflow.com/questions/30218886/how-to-implement-iterator-and-intoiterator-for-a-simple-struct
+// Object to iterate over
+/// An object to iterate over a list of CPG groups, create one of these and then use 'for' over it
+pub struct CpgIterStart {
+ iter_handle: u64,
+}
+
+/// struct returned from iterating over a [CpgIterStart]
+pub struct CpgIter {
+ pub group: String,
+ pub nodeid: NodeId,
+ pub pid: u32,
+}
+
+pub struct CpgIntoIter {
+ iter_handle: u64,
+}
+
+impl fmt::Debug for CpgIter {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "[group: {}, nodeid: {}, pid: {}]",
+ self.group, self.nodeid, self.pid
+ )
+ }
+}
+
+impl Iterator for CpgIntoIter {
+ type Item = CpgIter;
+
+ fn next(&mut self) -> Option<CpgIter> {
+ let mut c_iter_description = ffi::cpg_iteration_description_t {
+ nodeid: 0,
+ pid: 0,
+ group: ffi::cpg_name {
+ length: 0_u32,
+ value: [0; CPG_NAMELEN_MAX],
+ },
+ };
+ let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) };
+
+ if res == ffi::CS_OK {
+ let r_group =
+ match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) {
+ Ok(groupname) => groupname,
+ Err(_) => return None,
+ };
+ Some(CpgIter {
+ group: r_group,
+ nodeid: NodeId::from(c_iter_description.nodeid),
+ pid: c_iter_description.pid,
+ })
+ } else if res == ffi::CS_ERR_NO_SECTIONS {
+ // End of list
+ unsafe {
+ // Yeah, we don't check this return code. There's nowhere to report it.
+ ffi::cpg_iteration_finalize(self.iter_handle)
+ };
+ None
+ } else {
+ None
+ }
+ }
+}
+
+impl CpgIterStart {
+ /// Create a new [CpgIterStart] object for iterating over a list of active CPG groups
+ pub fn new(cpg_handle: Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> {
+ let mut iter_handle: u64 = 0;
+ let res = unsafe {
+ let mut c_group = string_to_cpg_name(group)?;
+ let c_itertype = iter_type as u32;
+ // IterType 'All' requires that the group pointer is passed in as NULL
+ let c_group_ptr = {
+ match iter_type {
+ CpgIterType::All => std::ptr::null_mut(),
+ _ => &mut c_group,
+ }
+ };
+ ffi::cpg_iteration_initialize(
+ cpg_handle.cpg_handle,
+ c_itertype,
+ c_group_ptr,
+ &mut iter_handle,
+ )
+ };
+ if res == ffi::CS_OK {
+ Ok(CpgIterStart { iter_handle })
+ } else {
+ Err(CsError::from_c(res))
+ }
+ }
+}
+
+impl IntoIterator for CpgIterStart {
+ type Item = CpgIter;
+ type IntoIter = CpgIntoIter;
+
+ fn into_iter(self) -> Self::IntoIter {
+ CpgIntoIter {
+ iter_handle: self.iter_handle,
+ }
+ }
+}