diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
commit | 6dd3dfb79125cd02d02efbce435a6c82e5af92ef (patch) | |
tree | 45084fc83278586f6bbafcb935f92d53f71a6b03 /bindings/rust/src | |
parent | Initial commit. (diff) | |
download | corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.tar.xz corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.zip |
Adding upstream version 3.1.8.upstream/3.1.8upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'bindings/rust/src')
-rw-r--r-- | bindings/rust/src/cfg.rs | 348 | ||||
-rw-r--r-- | bindings/rust/src/cmap.rs | 894 | ||||
-rw-r--r-- | bindings/rust/src/cpg.rs | 628 | ||||
-rw-r--r-- | bindings/rust/src/lib.rs | 296 | ||||
-rw-r--r-- | bindings/rust/src/quorum.rs | 298 | ||||
-rw-r--r-- | bindings/rust/src/sys/mod.rs | 7 | ||||
-rw-r--r-- | bindings/rust/src/votequorum.rs | 501 |
7 files changed, 2972 insertions, 0 deletions
diff --git a/bindings/rust/src/cfg.rs b/bindings/rust/src/cfg.rs new file mode 100644 index 0000000..b4eecac --- /dev/null +++ b/bindings/rust/src/cfg.rs @@ -0,0 +1,348 @@ +// libcfg interface for Rust +// Copyright (c) 2021 Red Hat, Inc. +// +// All rights reserved. +// +// Author: Christine Caulfield (ccaulfi@redhat.com) +// + +// For the code generated by bindgen +use crate::sys::cfg as ffi; + +use std::collections::HashMap; +use std::ffi::CString; +use std::os::raw::{c_int, c_void}; +use std::sync::Mutex; + +use crate::string_from_bytes; +use crate::{CsError, DispatchFlags, NodeId, Result}; + +// Used to convert a CFG handle into one of ours +lazy_static! { + static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new()); +} + +/// Callback from [track_start]. Will be called if another process +/// requests to shut down corosync. [reply_to_shutdown] should be called +/// with a [ShutdownReply] of either Yes or No. +#[derive(Copy, Clone)] +pub struct Callbacks { + pub corosync_cfg_shutdown_callback_fn: Option<fn(handle: &Handle, flags: u32)>, +} + +/// A handle into the cfg library. returned from [initialize] and needed for all other calls +#[derive(Copy, Clone)] +pub struct Handle { + cfg_handle: u64, + callbacks: Callbacks, +} + +/// Flags for [try_shutdown] +pub enum ShutdownFlags { + /// Request shutdown (other daemons will be consulted) + Request, + /// Tells other daemons but ignore their opinions + Regardless, + /// Go down straight away (but still tell other nodes) + Immediate, +} + +/// Responses for [reply_to_shutdown] +pub enum ShutdownReply { + Yes = 1, + No = 0, +} + +/// Trackflags for [track_start]. None currently supported +pub enum TrackFlags { + None, +} + +/// Version of the [NodeStatus] structure returned from [node_status_get] +#[derive(Debug, Copy, Clone)] +pub enum NodeStatusVersion { + V1, +} + +/// Status of a link inside [NodeStatus] struct +#[derive(Debug)] +pub struct LinkStatus { + pub enabled: bool, + pub connected: bool, + pub dynconnected: bool, + pub mtu: u32, + pub src_ipaddr: String, + pub dst_ipaddr: String, +} + +/// Structure returned from [node_status_get], shows all the details of a node +/// that is known to corosync, including all configured links +#[derive(Debug)] +pub struct NodeStatus { + pub version: NodeStatusVersion, + pub nodeid: NodeId, + pub reachable: bool, + pub remote: bool, + pub external: bool, + pub onwire_min: u8, + pub onwire_max: u8, + pub onwire_ver: u8, + pub link_status: Vec<LinkStatus>, +} + +extern "C" fn rust_shutdown_notification_fn(handle: ffi::corosync_cfg_handle_t, flags: u32) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + if let Some(cb) = h.callbacks.corosync_cfg_shutdown_callback_fn { + (cb)(h, flags); + } + } +} + +/// Initialize a connection to the cfg library. You must call this before doing anything +/// else and use the passed back [Handle]. +/// Remember to free the handle using [finalize] when finished. +pub fn initialize(callbacks: &Callbacks) -> Result<Handle> { + let mut handle: ffi::corosync_cfg_handle_t = 0; + + let c_callbacks = ffi::corosync_cfg_callbacks_t { + corosync_cfg_shutdown_callback: Some(rust_shutdown_notification_fn), + }; + + unsafe { + let res = ffi::corosync_cfg_initialize(&mut handle, &c_callbacks); + if res == ffi::CS_OK { + let rhandle = Handle { + cfg_handle: handle, + callbacks: *callbacks, + }; + HANDLE_HASH.lock().unwrap().insert(handle, rhandle); + Ok(rhandle) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// Finish with a connection to corosync, after calling this the [Handle] is invalid +pub fn finalize(handle: Handle) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_finalize(handle.cfg_handle) }; + if res == ffi::CS_OK { + HANDLE_HASH.lock().unwrap().remove(&handle.cfg_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// not sure if an fd is the right thing to return here, but it will do for now. +/// Returns a file descriptor to use for poll/select on the CFG handle +pub fn fd_get(handle: Handle) -> Result<i32> { + let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let res = unsafe { ffi::corosync_cfg_fd_get(handle.cfg_handle, c_fd) }; + if res == ffi::CS_OK { + Ok(c_fd as i32) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the local [NodeId] +pub fn local_get(handle: Handle) -> Result<NodeId> { + let mut nodeid: u32 = 0; + let res = unsafe { ffi::corosync_cfg_local_get(handle.cfg_handle, &mut nodeid) }; + if res == ffi::CS_OK { + Ok(NodeId::from(nodeid)) + } else { + Err(CsError::from_c(res)) + } +} + +/// Reload the cluster configuration on all nodes +pub fn reload_cnfig(handle: Handle) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_reload_config(handle.cfg_handle) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Re-open the cluster log files, on this node only +pub fn reopen_log_files(handle: Handle) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_reopen_log_files(handle.cfg_handle) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Tell another cluster node to shutdown. reason is a string that +/// will be written to the system log files. +pub fn kill_node(handle: Handle, nodeid: NodeId, reason: &str) -> Result<()> { + let c_string = { + match CString::new(reason) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let res = unsafe { + ffi::corosync_cfg_kill_node(handle.cfg_handle, u32::from(nodeid), c_string.as_ptr()) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Ask this cluster node to shutdown. If [ShutdownFlags] is set to Request then +///it may be refused by other applications +/// that have registered for shutdown callbacks. +pub fn try_shutdown(handle: Handle, flags: ShutdownFlags) -> Result<()> { + let c_flags = match flags { + ShutdownFlags::Request => 0, + ShutdownFlags::Regardless => 1, + ShutdownFlags::Immediate => 2, + }; + let res = unsafe { ffi::corosync_cfg_try_shutdown(handle.cfg_handle, c_flags) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Reply to a shutdown request with Yes or No [ShutdownReply] +pub fn reply_to_shutdown(handle: Handle, flags: ShutdownReply) -> Result<()> { + let c_flags = match flags { + ShutdownReply::No => 0, + ShutdownReply::Yes => 1, + }; + let res = unsafe { ffi::corosync_cfg_replyto_shutdown(handle.cfg_handle, c_flags) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Call any/all active CFG callbacks for this [Handle] see [DispatchFlags] for details +pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_dispatch(handle.cfg_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Quick & dirty u8 to boolean +fn u8_to_bool(val: u8) -> bool { + val != 0 +} + +const CFG_MAX_LINKS: usize = 8; +const CFG_MAX_HOST_LEN: usize = 256; +fn unpack_nodestatus(c_nodestatus: ffi::corosync_cfg_node_status_v1) -> Result<NodeStatus> { + let mut ns = NodeStatus { + version: NodeStatusVersion::V1, + nodeid: NodeId::from(c_nodestatus.nodeid), + reachable: u8_to_bool(c_nodestatus.reachable), + remote: u8_to_bool(c_nodestatus.remote), + external: u8_to_bool(c_nodestatus.external), + onwire_min: c_nodestatus.onwire_min, + onwire_max: c_nodestatus.onwire_max, + onwire_ver: c_nodestatus.onwire_min, + link_status: Vec::<LinkStatus>::new(), + }; + for i in 0..CFG_MAX_LINKS { + let ls = LinkStatus { + enabled: u8_to_bool(c_nodestatus.link_status[i].enabled), + connected: u8_to_bool(c_nodestatus.link_status[i].connected), + dynconnected: u8_to_bool(c_nodestatus.link_status[i].dynconnected), + mtu: c_nodestatus.link_status[i].mtu, + src_ipaddr: string_from_bytes( + &c_nodestatus.link_status[i].src_ipaddr[0], + CFG_MAX_HOST_LEN, + )?, + dst_ipaddr: string_from_bytes( + &c_nodestatus.link_status[i].dst_ipaddr[0], + CFG_MAX_HOST_LEN, + )?, + }; + ns.link_status.push(ls); + } + + Ok(ns) +} + +// Constructor for link status to make c_ndostatus initialization tidier. +fn new_ls() -> ffi::corosync_knet_link_status_v1 { + ffi::corosync_knet_link_status_v1 { + enabled: 0, + connected: 0, + dynconnected: 0, + mtu: 0, + src_ipaddr: [0; 256], + dst_ipaddr: [0; 256], + } +} + +/// Get the extended status of a node in the cluster (including active links) from its [NodeId]. +/// Returns a filled in [NodeStatus] struct +pub fn node_status_get( + handle: Handle, + nodeid: NodeId, + _version: NodeStatusVersion, +) -> Result<NodeStatus> { + // Currently only supports V1 struct + unsafe { + // We need to initialize this even though it's all going to be overwritten. + let mut c_nodestatus = ffi::corosync_cfg_node_status_v1 { + version: 1, + nodeid: 0, + reachable: 0, + remote: 0, + external: 0, + onwire_min: 0, + onwire_max: 0, + onwire_ver: 0, + link_status: [new_ls(); 8], + }; + + let res = ffi::corosync_cfg_node_status_get( + handle.cfg_handle, + u32::from(nodeid), + 1, + &mut c_nodestatus as *mut _ as *mut c_void, + ); + + if res == ffi::CS_OK { + unpack_nodestatus(c_nodestatus) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// Start tracking for shutdown notifications +pub fn track_start(handle: Handle, _flags: TrackFlags) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_trackstart(handle.cfg_handle, 0) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Stop tracking for shutdown notifications +pub fn track_stop(handle: Handle) -> Result<()> { + let res = unsafe { ffi::corosync_cfg_trackstop(handle.cfg_handle) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} diff --git a/bindings/rust/src/cmap.rs b/bindings/rust/src/cmap.rs new file mode 100644 index 0000000..454fbee --- /dev/null +++ b/bindings/rust/src/cmap.rs @@ -0,0 +1,894 @@ +// libcmap interface for Rust +// Copyright (c) 2021 Red Hat, Inc. +// +// All rights reserved. +// +// Author: Christine Caulfield (ccaulfi@redhat.com) +// + +#![allow(clippy::type_complexity)] + +// For the code generated by bindgen +use crate::sys::cmap as ffi; + +use num_enum::TryFromPrimitive; +use std::any::type_name; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::ffi::CString; +use std::fmt; +use std::os::raw::{c_char, c_int, c_void}; +use std::ptr::copy_nonoverlapping; +use std::sync::Mutex; + +use crate::string_from_bytes; +use crate::{CsError, DispatchFlags, Result}; + +// Maps: +/// "Maps" available to [initialize] +pub enum Map { + Icmap, + Stats, +} + +bitflags! { +/// Tracker types for cmap, both passed into [track_add] +/// and returned from its callback. + pub struct TrackType: i32 + { + const DELETE = 1; + const MODIFY = 2; + const ADD = 4; + const PREFIX = 8; + } +} + +impl fmt::Display for TrackType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.contains(TrackType::DELETE) { + write!(f, "DELETE ")? + } + if self.contains(TrackType::MODIFY) { + write!(f, "MODIFY ")? + } + if self.contains(TrackType::ADD) { + write!(f, "ADD ")? + } + if self.contains(TrackType::PREFIX) { + write!(f, "PREFIX ") + } else { + Ok(()) + } + } +} + +#[derive(Copy, Clone)] +/// A handle returned from [initialize], needs to be passed to all other cmap API calls +pub struct Handle { + cmap_handle: u64, +} + +#[derive(Copy, Clone)] +/// A handle for a specific CMAP tracker. returned from [track_add]. +/// There may be multiple TrackHandles per [Handle] +pub struct TrackHandle { + track_handle: u64, + notify_callback: NotifyCallback, +} + +// Used to convert CMAP handles into one of ours, for callbacks +lazy_static! { + static ref TRACKHANDLE_HASH: Mutex<HashMap<u64, TrackHandle>> = Mutex::new(HashMap::new()); + static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new()); +} + +/// Initialize a connection to the cmap subsystem. +/// map specifies which cmap "map" to use. +/// Returns a [Handle] into the cmap library +pub fn initialize(map: Map) -> Result<Handle> { + let mut handle: ffi::cmap_handle_t = 0; + let c_map = match map { + Map::Icmap => ffi::CMAP_MAP_ICMAP, + Map::Stats => ffi::CMAP_MAP_STATS, + }; + + unsafe { + let res = ffi::cmap_initialize_map(&mut handle, c_map); + if res == ffi::CS_OK { + let rhandle = Handle { + cmap_handle: handle, + }; + HANDLE_HASH.lock().unwrap().insert(handle, rhandle); + Ok(rhandle) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// Finish with a connection to corosync. +/// Takes a [Handle] as returned from [initialize] +pub fn finalize(handle: Handle) -> Result<()> { + let res = unsafe { ffi::cmap_finalize(handle.cmap_handle) }; + if res == ffi::CS_OK { + HANDLE_HASH.lock().unwrap().remove(&handle.cmap_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Return a file descriptor to use for poll/select on the CMAP handle. +/// Takes a [Handle] as returned from [initialize], +/// returns a C file descriptor as i32 +pub fn fd_get(handle: Handle) -> Result<i32> { + let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let res = unsafe { ffi::cmap_fd_get(handle.cmap_handle, c_fd) }; + if res == ffi::CS_OK { + Ok(c_fd as i32) + } else { + Err(CsError::from_c(res)) + } +} + +/// Dispatch any/all active CMAP callbacks. +/// Takes a [Handle] as returned from [initialize], +/// flags [DispatchFlags] tells it how many items to dispatch before returning +pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { + let res = unsafe { ffi::cmap_dispatch(handle.cmap_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the current 'context' value for this handle +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source +pub fn context_get(handle: Handle) -> Result<u64> { + let (res, context) = unsafe { + let mut context: u64 = 0; + let c_context: *mut c_void = &mut context as *mut _ as *mut c_void; + let r = ffi::cmap_context_get(handle.cmap_handle, c_context as *mut *const c_void); + (r, context) + }; + if res == ffi::CS_OK { + Ok(context) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current 'context' value for this handle +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source. +/// Normally this is set in [initialize], but this allows it to be changed +pub fn context_set(handle: Handle, context: u64) -> Result<()> { + let res = unsafe { + let c_context = context as *mut c_void; + ffi::cmap_context_set(handle.cmap_handle, c_context) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// The type of data returned from [get] or in a +/// tracker callback or iterator, part of the [Data] struct +#[derive(Clone, Copy, Debug, Eq, PartialEq, TryFromPrimitive)] +#[repr(u32)] +pub enum DataType { + Int8 = ffi::CMAP_VALUETYPE_INT8, + UInt8 = ffi::CMAP_VALUETYPE_UINT8, + Int16 = ffi::CMAP_VALUETYPE_INT16, + UInt16 = ffi::CMAP_VALUETYPE_UINT16, + Int32 = ffi::CMAP_VALUETYPE_INT32, + UInt32 = ffi::CMAP_VALUETYPE_UINT32, + Int64 = ffi::CMAP_VALUETYPE_INT64, + UInt64 = ffi::CMAP_VALUETYPE_UINT64, + Float = ffi::CMAP_VALUETYPE_FLOAT, + Double = ffi::CMAP_VALUETYPE_DOUBLE, + String = ffi::CMAP_VALUETYPE_STRING, + Binary = ffi::CMAP_VALUETYPE_BINARY, + Unknown = 999, +} + +fn cmap_to_enum(cmap_type: u32) -> DataType { + match DataType::try_from(cmap_type) { + Ok(e) => e, + Err(_) => DataType::Unknown, + } +} + +/// Data returned from the cmap::get() call and tracker & iterators. +/// Contains the data itself and the type of that data. +pub enum Data { + Int8(i8), + UInt8(u8), + Int16(i16), + UInt16(u16), + Int32(i32), + UInt32(u32), + Int64(i64), + UInt64(u64), + Float(f32), + Double(f64), + String(String), + Binary(Vec<u8>), + Unknown, +} + +impl fmt::Display for DataType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DataType::Int8 => write!(f, "Int8"), + DataType::UInt8 => write!(f, "UInt8"), + DataType::Int16 => write!(f, "Int16"), + DataType::UInt16 => write!(f, "UInt16"), + DataType::Int32 => write!(f, "Int32"), + DataType::UInt32 => write!(f, "UInt32"), + DataType::Int64 => write!(f, "Int64"), + DataType::UInt64 => write!(f, "UInt64"), + DataType::Float => write!(f, "Float"), + DataType::Double => write!(f, "Double"), + DataType::String => write!(f, "String"), + DataType::Binary => write!(f, "Binary"), + DataType::Unknown => write!(f, "Unknown"), + } + } +} + +impl fmt::Display for Data { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Data::Int8(v) => write!(f, "{v} (Int8)"), + Data::UInt8(v) => write!(f, "{v} (UInt8)"), + Data::Int16(v) => write!(f, "{v} (Int16)"), + Data::UInt16(v) => write!(f, "{v} (UInt16)"), + Data::Int32(v) => write!(f, "{v} (Int32)"), + Data::UInt32(v) => write!(f, "{v} (UInt32)"), + Data::Int64(v) => write!(f, "{v} (Int64)"), + Data::UInt64(v) => write!(f, "{v} (UInt64)"), + Data::Float(v) => write!(f, "{v} (Float)"), + Data::Double(v) => write!(f, "{v} (Double)"), + Data::String(v) => write!(f, "{v} (String)"), + Data::Binary(v) => write!(f, "{v:?} (Binary)"), + Data::Unknown => write!(f, "Unknown)"), + } + } +} + +const CMAP_KEYNAME_MAXLENGTH: usize = 255; +fn string_to_cstring_validated(key: &str, maxlen: usize) -> Result<CString> { + if maxlen > 0 && key.chars().count() >= maxlen { + return Err(CsError::CsErrInvalidParam); + } + + match CString::new(key) { + Ok(n) => Ok(n), + Err(_) => Err(CsError::CsErrLibrary), + } +} + +fn set_value( + handle: Handle, + key_name: &str, + datatype: DataType, + value: *mut c_void, + length: usize, +) -> Result<()> { + let csname = string_to_cstring_validated(key_name, CMAP_KEYNAME_MAXLENGTH)?; + let res = unsafe { + ffi::cmap_set( + handle.cmap_handle, + csname.as_ptr(), + value, + length, + datatype as u32, + ) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Returns type and size +fn generic_to_cmap<T>(_value: T) -> (DataType, usize) { + match type_name::<T>() { + "u8" => (DataType::UInt8, 1), + "i8" => (DataType::Int8, 1), + "u16" => (DataType::UInt16, 2), + "i16" => (DataType::Int16, 2), + "u32" => (DataType::UInt32, 4), + "i32" => (DataType::Int32, 4), + "u64" => (DataType::UInt64, 4), + "f32" => (DataType::Float, 4), + "f64" => (DataType::Double, 8), + "&str" => (DataType::String, 0), + // Binary not currently supported here + _ => (DataType::Unknown, 0), + } +} + +fn is_numeric_type(dtype: DataType) -> bool { + matches!( + dtype, + DataType::UInt8 + | DataType::Int8 + | DataType::UInt16 + | DataType::Int16 + | DataType::UInt32 + | DataType::Int32 + | DataType::UInt64 + | DataType::Int64 + | DataType::Float + | DataType::Double + ) +} + +/// Function to set a generic numeric value +/// This doesn't work for strings or binaries +pub fn set_number<T: Copy>(handle: Handle, key_name: &str, value: T) -> Result<()> { + let (c_type, c_size) = generic_to_cmap(value); + + if is_numeric_type(c_type) { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, c_type, c_value as *mut c_void, c_size) + } else { + Err(CsError::CsErrNotSupported) + } +} + +pub fn set_u8(handle: Handle, key_name: &str, value: u8) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::UInt8, c_value as *mut c_void, 1) +} + +/// Sets an i8 value into cmap +pub fn set_i8(handle: Handle, key_name: &str, value: i8) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::Int8, c_value as *mut c_void, 1) +} + +/// Sets a u16 value into cmap +pub fn set_u16(handle: Handle, key_name: &str, value: u16) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value( + handle, + key_name, + DataType::UInt16, + c_value as *mut c_void, + 2, + ) +} + +/// Sets an i16 value into cmap +pub fn set_i16(handle: Handle, key_name: &str, value: i16) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::Int16, c_value as *mut c_void, 2) +} + +/// Sets a u32 value into cmap +pub fn set_u32(handle: Handle, key_name: &str, value: u32) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::UInt32, c_value, 4) +} + +/// Sets an i32 value into cmap +pub fn set_i132(handle: Handle, key_name: &str, value: i32) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::Int32, c_value as *mut c_void, 4) +} + +/// Sets a u64 value into cmap +pub fn set_u64(handle: Handle, key_name: &str, value: u64) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value( + handle, + key_name, + DataType::UInt64, + c_value as *mut c_void, + 8, + ) +} + +/// Sets an i64 value into cmap +pub fn set_i164(handle: Handle, key_name: &str, value: i64) -> Result<()> { + let mut tmp = value; + let c_value: *mut c_void = &mut tmp as *mut _ as *mut c_void; + set_value(handle, key_name, DataType::Int64, c_value as *mut c_void, 8) +} + +/// Sets a string value into cmap +pub fn set_string(handle: Handle, key_name: &str, value: &str) -> Result<()> { + let v_string = string_to_cstring_validated(value, 0)?; + set_value( + handle, + key_name, + DataType::String, + v_string.as_ptr() as *mut c_void, + value.chars().count(), + ) +} + +/// Sets a binary value into cmap +pub fn set_binary(handle: Handle, key_name: &str, value: &[u8]) -> Result<()> { + set_value( + handle, + key_name, + DataType::Binary, + value.as_ptr() as *mut c_void, + value.len(), + ) +} + +/// Sets a [Data] type into cmap +pub fn set(handle: Handle, key_name: &str, data: &Data) -> Result<()> { + let (datatype, datalen, c_value) = match data { + Data::Int8(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Int8, 1, cv) + } + Data::UInt8(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::UInt8, 1, cv) + } + Data::Int16(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Int16, 2, cv) + } + Data::UInt16(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::UInt8, 2, cv) + } + Data::Int32(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Int32, 4, cv) + } + Data::UInt32(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::UInt32, 4, cv) + } + Data::Int64(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Int64, 8, cv) + } + Data::UInt64(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::UInt64, 8, cv) + } + Data::Float(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Float, 4, cv) + } + Data::Double(v) => { + let mut tmp = *v; + let cv: *mut c_void = &mut tmp as *mut _ as *mut c_void; + (DataType::Double, 8, cv) + } + Data::String(v) => { + let cv = string_to_cstring_validated(v, 0)?; + // Can't let cv go out of scope + return set_value( + handle, + key_name, + DataType::String, + cv.as_ptr() as *mut c_void, + v.chars().count(), + ); + } + Data::Binary(v) => { + // Vec doesn't return quite the right types. + return set_value( + handle, + key_name, + DataType::Binary, + v.as_ptr() as *mut c_void, + v.len(), + ); + } + Data::Unknown => return Err(CsError::CsErrInvalidParam), + }; + + set_value(handle, key_name, datatype, c_value, datalen) +} + +// Local function to parse out values from the C mess +// Assumes the c_value is complete. So cmap::get() will need to check the size +// and re-get before calling us with a resized buffer +fn c_to_data(value_size: usize, c_key_type: u32, c_value: *const u8) -> Result<Data> { + unsafe { + match cmap_to_enum(c_key_type) { + DataType::UInt8 => { + let mut ints = [0u8; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr(), value_size); + Ok(Data::UInt8(ints[0])) + } + DataType::Int8 => { + let mut ints = [0i8; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Int8(ints[0])) + } + DataType::UInt16 => { + let mut ints = [0u16; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::UInt16(ints[0])) + } + DataType::Int16 => { + let mut ints = [0i16; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Int16(ints[0])) + } + DataType::UInt32 => { + let mut ints = [0u32; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::UInt32(ints[0])) + } + DataType::Int32 => { + let mut ints = [0i32; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Int32(ints[0])) + } + DataType::UInt64 => { + let mut ints = [0u64; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::UInt64(ints[0])) + } + DataType::Int64 => { + let mut ints = [0i64; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Int64(ints[0])) + } + DataType::Float => { + let mut ints = [0f32; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Float(ints[0])) + } + DataType::Double => { + let mut ints = [0f64; 1]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr() as *mut u8, value_size); + Ok(Data::Double(ints[0])) + } + DataType::String => { + let mut ints = vec![0u8; value_size]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr(), value_size); + // -1 here so CString doesn't see the NUL + let cs = match CString::new(&ints[0..value_size - 1_usize]) { + Ok(c1) => c1, + Err(_) => return Err(CsError::CsErrLibrary), + }; + match cs.into_string() { + Ok(s) => Ok(Data::String(s)), + Err(_) => Err(CsError::CsErrLibrary), + } + } + DataType::Binary => { + let mut ints = vec![0u8; value_size]; + copy_nonoverlapping(c_value as *mut u8, ints.as_mut_ptr(), value_size); + Ok(Data::Binary(ints)) + } + DataType::Unknown => Ok(Data::Unknown), + } + } +} + +const INITIAL_SIZE: usize = 256; + +/// Get a value from cmap, returned as a [Data] struct, so could be anything +pub fn get(handle: Handle, key_name: &str) -> Result<Data> { + let csname = string_to_cstring_validated(key_name, CMAP_KEYNAME_MAXLENGTH)?; + let mut value_size: usize = 16; + let mut c_key_type: u32 = 0; + + // First guess at a size for Strings and Binaries. Expand if needed + let mut c_value = vec![0u8; INITIAL_SIZE]; + + unsafe { + let res = ffi::cmap_get( + handle.cmap_handle, + csname.as_ptr(), + c_value.as_mut_ptr() as *mut c_void, + &mut value_size, + &mut c_key_type, + ); + if res == ffi::CS_OK { + if value_size > INITIAL_SIZE { + // Need to try again with a bigger buffer + c_value.resize(value_size, 0u8); + let res2 = ffi::cmap_get( + handle.cmap_handle, + csname.as_ptr(), + c_value.as_mut_ptr() as *mut c_void, + &mut value_size, + &mut c_key_type, + ); + if res2 != ffi::CS_OK { + return Err(CsError::from_c(res2)); + } + } + + // Convert to Rust type and return as a Data enum + c_to_data(value_size, c_key_type, c_value.as_ptr()) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// increment the value in a cmap key (must be a numeric type) +pub fn inc(handle: Handle, key_name: &str) -> Result<()> { + let csname = string_to_cstring_validated(key_name, CMAP_KEYNAME_MAXLENGTH)?; + let res = unsafe { ffi::cmap_inc(handle.cmap_handle, csname.as_ptr()) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// decrement the value in a cmap key (must be a numeric type) +pub fn dec(handle: Handle, key_name: &str) -> Result<()> { + let csname = string_to_cstring_validated(key_name, CMAP_KEYNAME_MAXLENGTH)?; + let res = unsafe { ffi::cmap_dec(handle.cmap_handle, csname.as_ptr()) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Callback for CMAP notify events from corosync, convert params to Rust and pass on. +extern "C" fn rust_notify_fn( + cmap_handle: ffi::cmap_handle_t, + cmap_track_handle: ffi::cmap_track_handle_t, + event: i32, + key_name: *const ::std::os::raw::c_char, + new_value: ffi::cmap_notify_value, + old_value: ffi::cmap_notify_value, + user_data: *mut ::std::os::raw::c_void, +) { + // If cmap_handle doesn't match then throw away the callback. + if let Some(r_cmap_handle) = HANDLE_HASH.lock().unwrap().get(&cmap_handle) { + if let Some(h) = TRACKHANDLE_HASH.lock().unwrap().get(&cmap_track_handle) { + let r_keyname = match string_from_bytes(key_name, CMAP_KEYNAME_MAXLENGTH) { + Ok(s) => s, + Err(_) => return, + }; + + let r_old = match c_to_data(old_value.len, old_value.type_, old_value.data as *const u8) + { + Ok(v) => v, + Err(_) => return, + }; + let r_new = match c_to_data(new_value.len, new_value.type_, new_value.data as *const u8) + { + Ok(v) => v, + Err(_) => return, + }; + + if let Some(cb) = h.notify_callback.notify_fn { + (cb)( + r_cmap_handle, + h, + TrackType { bits: event }, + &r_keyname, + &r_old, + &r_new, + user_data as u64, + ); + } + } + } +} + +/// Callback function called every time a tracker reports a change in a tracked value +#[derive(Copy, Clone)] +pub struct NotifyCallback { + pub notify_fn: Option< + fn( + handle: &Handle, + track_handle: &TrackHandle, + event: TrackType, + key_name: &str, + new_value: &Data, + old_value: &Data, + user_data: u64, + ), + >, +} + +/// Track changes in cmap values, multiple [TrackHandle]s per [Handle] are allowed +pub fn track_add( + handle: Handle, + key_name: &str, + track_type: TrackType, + notify_callback: &NotifyCallback, + user_data: u64, +) -> Result<TrackHandle> { + let c_name = string_to_cstring_validated(key_name, CMAP_KEYNAME_MAXLENGTH)?; + let mut c_trackhandle = 0u64; + let res = unsafe { + ffi::cmap_track_add( + handle.cmap_handle, + c_name.as_ptr(), + track_type.bits, + Some(rust_notify_fn), + user_data as *mut c_void, + &mut c_trackhandle, + ) + }; + if res == ffi::CS_OK { + let rhandle = TrackHandle { + track_handle: c_trackhandle, + notify_callback: *notify_callback, + }; + TRACKHANDLE_HASH + .lock() + .unwrap() + .insert(c_trackhandle, rhandle); + Ok(rhandle) + } else { + Err(CsError::from_c(res)) + } +} + +/// Remove a tracker frm this [Handle] +pub fn track_delete(handle: Handle, track_handle: TrackHandle) -> Result<()> { + let res = unsafe { ffi::cmap_track_delete(handle.cmap_handle, track_handle.track_handle) }; + if res == ffi::CS_OK { + TRACKHANDLE_HASH + .lock() + .unwrap() + .remove(&track_handle.track_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Create one of these to start iterating over cmap values. +pub struct CmapIterStart { + iter_handle: u64, + cmap_handle: u64, +} + +pub struct CmapIntoIter { + cmap_handle: u64, + iter_handle: u64, +} + +/// Value returned from the iterator. contains the key name and the [Data] +pub struct CmapIter { + key_name: String, + data: Data, +} + +impl CmapIter { + pub fn key_name(&self) -> &str { + &self.key_name + } + pub fn data(&self) -> &Data { + &self.data + } +} + +impl fmt::Debug for CmapIter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}: {}", self.key_name, self.data) + } +} + +impl Iterator for CmapIntoIter { + type Item = CmapIter; + + fn next(&mut self) -> Option<CmapIter> { + let mut c_key_name = [0u8; CMAP_KEYNAME_MAXLENGTH + 1]; + let mut c_value_len = 0usize; + let mut c_value_type = 0u32; + let res = unsafe { + ffi::cmap_iter_next( + self.cmap_handle, + self.iter_handle, + c_key_name.as_mut_ptr() as *mut c_char, + &mut c_value_len, + &mut c_value_type, + ) + }; + if res == ffi::CS_OK { + // Return the Data for this iteration + let mut c_value = vec![0u8; c_value_len]; + let res = unsafe { + ffi::cmap_get( + self.cmap_handle, + c_key_name.as_ptr() as *mut c_char, + c_value.as_mut_ptr() as *mut c_void, + &mut c_value_len, + &mut c_value_type, + ) + }; + if res == ffi::CS_OK { + match c_to_data(c_value_len, c_value_type, c_value.as_ptr()) { + Ok(d) => { + let r_keyname = match string_from_bytes( + c_key_name.as_ptr() as *mut c_char, + CMAP_KEYNAME_MAXLENGTH, + ) { + Ok(s) => s, + Err(_) => return None, + }; + Some(CmapIter { + key_name: r_keyname, + data: d, + }) + } + Err(_) => None, + } + } else { + // cmap_get returned error + None + } + } else if res == ffi::CS_ERR_NO_SECTIONS { + // End of list + unsafe { + // Yeah, we don't check this return code. There's nowhere to report it. + ffi::cmap_iter_finalize(self.cmap_handle, self.iter_handle) + }; + None + } else { + None + } + } +} + +impl CmapIterStart { + /// Create a new [CmapIterStart] object for iterating over a list of cmap keys + pub fn new(cmap_handle: Handle, prefix: &str) -> Result<CmapIterStart> { + let mut iter_handle: u64 = 0; + let res = unsafe { + let c_prefix = string_to_cstring_validated(prefix, CMAP_KEYNAME_MAXLENGTH)?; + ffi::cmap_iter_init(cmap_handle.cmap_handle, c_prefix.as_ptr(), &mut iter_handle) + }; + if res == ffi::CS_OK { + Ok(CmapIterStart { + cmap_handle: cmap_handle.cmap_handle, + iter_handle, + }) + } else { + Err(CsError::from_c(res)) + } + } +} + +impl IntoIterator for CmapIterStart { + type Item = CmapIter; + type IntoIter = CmapIntoIter; + + fn into_iter(self) -> Self::IntoIter { + CmapIntoIter { + iter_handle: self.iter_handle, + cmap_handle: self.cmap_handle, + } + } +} diff --git a/bindings/rust/src/cpg.rs b/bindings/rust/src/cpg.rs new file mode 100644 index 0000000..1246497 --- /dev/null +++ b/bindings/rust/src/cpg.rs @@ -0,0 +1,628 @@ +// libcpg interface for Rust +// Copyright (c) 2020 Red Hat, Inc. +// +// All rights reserved. +// +// Author: Christine Caulfield (ccaulfi@redhat.com) +// + +#![allow(clippy::single_match)] +#![allow(clippy::needless_range_loop)] +#![allow(clippy::type_complexity)] + +// For the code generated by bindgen +use crate::sys::cpg as ffi; + +use std::collections::HashMap; +use std::ffi::{CStr, CString}; +use std::fmt; +use std::os::raw::{c_int, c_void}; +use std::ptr::copy_nonoverlapping; +use std::slice; +use std::string::String; +use std::sync::Mutex; + +// General corosync things +use crate::string_from_bytes; +use crate::{CsError, DispatchFlags, NodeId, Result}; + +const CPG_NAMELEN_MAX: usize = 128; +const CPG_MEMBERS_MAX: usize = 128; + +/// RingId returned by totem_confchg_fn +#[derive(Copy, Clone)] +pub struct RingId { + pub nodeid: NodeId, + pub seq: u64, +} + +/// Totem delivery guarantee options for [mcast_joined] +// The C enum doesn't have numbers in the code +// so don't assume we can match them +#[derive(Copy, Clone)] +pub enum Guarantee { + TypeUnordered, + TypeFifo, + TypeAgreed, + TypeSafe, +} + +// Convert internal to cpg.h values. +impl Guarantee { + pub fn to_c(&self) -> u32 { + match self { + Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED, + Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO, + Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED, + Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE, + } + } +} + +/// Flow control state returned from [flow_control_state_get] +#[derive(Copy, Clone)] +pub enum FlowControlState { + Disabled, + Enabled, +} + +/// No flags current specified for model1 so leave this at None +#[derive(Copy, Clone)] +pub enum Model1Flags { + None, +} + +/// Reason for cpg item callback +#[derive(Copy, Clone)] +pub enum Reason { + Undefined = 0, + Join = 1, + Leave = 2, + NodeDown = 3, + NodeUp = 4, + ProcDown = 5, +} + +// Convert to cpg.h values +impl Reason { + pub fn new(r: u32) -> Reason { + match r { + 0 => Reason::Undefined, + 1 => Reason::Join, + 2 => Reason::Leave, + 3 => Reason::NodeDown, + 4 => Reason::NodeUp, + 5 => Reason::ProcDown, + _ => Reason::Undefined, + } + } +} +impl fmt::Display for Reason { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Reason::Undefined => write!(f, "Undefined"), + Reason::Join => write!(f, "Join"), + Reason::Leave => write!(f, "Leave"), + Reason::NodeDown => write!(f, "NodeDown"), + Reason::NodeUp => write!(f, "NodeUp"), + Reason::ProcDown => write!(f, "ProcDown"), + } + } +} + +/// A CPG address entry returned in the callbacks +pub struct Address { + pub nodeid: NodeId, + pub pid: u32, + pub reason: Reason, +} +impl fmt::Debug for Address { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "[nodeid: {}, pid: {}, reason: {}]", + self.nodeid, self.pid, self.reason + ) + } +} + +/// Data for model1 [initialize] +#[derive(Copy, Clone)] +pub struct Model1Data { + pub flags: Model1Flags, + pub deliver_fn: Option< + fn( + handle: &Handle, + group_name: String, + nodeid: NodeId, + pid: u32, + msg: &[u8], + msg_len: usize, + ), + >, + pub confchg_fn: Option< + fn( + handle: &Handle, + group_name: &str, + member_list: Vec<Address>, + left_list: Vec<Address>, + joined_list: Vec<Address>, + ), + >, + pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>, +} + +/// Modeldata for [initialize], only v1 supported at the moment +#[derive(Copy, Clone)] +pub enum ModelData { + ModelNone, + ModelV1(Model1Data), +} + +/// A handle into the cpg library. Returned from [initialize] and needed for all other calls +#[derive(Copy, Clone)] +pub struct Handle { + cpg_handle: u64, // Corosync library handle + model_data: ModelData, +} + +// Used to convert a CPG handle into one of ours +lazy_static! { + static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new()); +} + +// Convert a Rust String into a cpg_name struct for libcpg +fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> { + if group.len() > CPG_NAMELEN_MAX - 1 { + return Err(CsError::CsErrInvalidParam); + } + + let c_name = match CString::new(group) { + Ok(n) => n, + Err(_) => return Err(CsError::CsErrLibrary), + }; + let mut c_group = ffi::cpg_name { + length: group.len() as u32, + value: [0; CPG_NAMELEN_MAX], + }; + + unsafe { + // NOTE param order is 'wrong-way round' from C + copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len()); + } + + Ok(c_group) +} + +// Convert an array of cpg_addresses to a Vec<cpg::Address> - used in callbacks +fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> { + let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) }; + let mut r_vec = Vec::<Address>::new(); + + for i in 0..list_entries { + let a: Address = Address { + nodeid: NodeId::from(temp[i].nodeid), + pid: temp[i].pid, + reason: Reason::new(temp[i].reason), + }; + r_vec.push(a); + } + r_vec +} + +// Called from CPG callback function - munge params back to Rust from C +extern "C" fn rust_deliver_fn( + handle: ffi::cpg_handle_t, + group_name: *const ffi::cpg_name, + nodeid: u32, + pid: u32, + msg: *mut ::std::os::raw::c_void, + msg_len: usize, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + // Convert group_name into a Rust str. + let r_group_name = unsafe { + CStr::from_ptr(&(*group_name).value[0]) + .to_string_lossy() + .into_owned() + }; + + let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) }; + + match h.model_data { + ModelData::ModelV1(md) => { + if let Some(cb) = md.deliver_fn { + (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len); + } + } + _ => {} + } + } +} + +// Called from CPG callback function - munge params back to Rust from C +extern "C" fn rust_confchg_fn( + handle: ffi::cpg_handle_t, + group_name: *const ffi::cpg_name, + member_list: *const ffi::cpg_address, + member_list_entries: usize, + left_list: *const ffi::cpg_address, + left_list_entries: usize, + joined_list: *const ffi::cpg_address, + joined_list_entries: usize, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_group_name = unsafe { + CStr::from_ptr(&(*group_name).value[0]) + .to_string_lossy() + .into_owned() + }; + let r_member_list = cpg_array_to_vec(member_list, member_list_entries); + let r_left_list = cpg_array_to_vec(left_list, left_list_entries); + let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries); + + match h.model_data { + ModelData::ModelV1(md) => { + if let Some(cb) = md.confchg_fn { + (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list); + } + } + _ => {} + } + } +} + +// Called from CPG callback function - munge params back to Rust from C +extern "C" fn rust_totem_confchg_fn( + handle: ffi::cpg_handle_t, + ring_id: ffi::cpg_ring_id, + member_list_entries: u32, + member_list: *const u32, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_ring_id = RingId { + nodeid: NodeId::from(ring_id.nodeid), + seq: ring_id.seq, + }; + let mut r_member_list = Vec::<NodeId>::new(); + let temp_members: &[u32] = + unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) }; + for i in 0..member_list_entries as usize { + r_member_list.push(NodeId::from(temp_members[i])); + } + + match h.model_data { + ModelData::ModelV1(md) => { + if let Some(cb) = md.totem_confchg_fn { + (cb)(h, r_ring_id, r_member_list); + } + } + _ => {} + } + } +} + +/// Initialize a connection to the cpg library. You must call this before doing anything +/// else and use the passed back [Handle]. +/// Remember to free the handle using [finalize] when finished. +pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> { + let mut handle: ffi::cpg_handle_t = 0; + let mut m = match model_data { + ModelData::ModelV1(_v1) => { + ffi::cpg_model_v1_data_t { + model: ffi::CPG_MODEL_V1, + cpg_deliver_fn: Some(rust_deliver_fn), + cpg_confchg_fn: Some(rust_confchg_fn), + cpg_totem_confchg_fn: Some(rust_totem_confchg_fn), + flags: 0, // No supported flags (yet) + } + } + _ => return Err(CsError::CsErrInvalidParam), + }; + + unsafe { + let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void; + let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t; + let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context); + + if res == ffi::CS_OK { + let rhandle = Handle { + cpg_handle: handle, + model_data: *model_data, + }; + HANDLE_HASH.lock().unwrap().insert(handle, rhandle); + Ok(rhandle) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// Finish with a connection to corosync +pub fn finalize(handle: Handle) -> Result<()> { + let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) }; + if res == ffi::CS_OK { + HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Not sure if an FD is the right thing to return here, but it will do for now. +/// Returns a file descriptor to use for poll/select on the CPG handle +pub fn fd_get(handle: Handle) -> Result<i32> { + let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) }; + if res == ffi::CS_OK { + Ok(c_fd as i32) + } else { + Err(CsError::from_c(res)) + } +} + +/// Call any/all active CPG callbacks for this [Handle] see [DispatchFlags] for details +pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { + let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Joins a CPG group for sending and receiving messages +pub fn join(handle: Handle, group: &str) -> Result<()> { + let res = unsafe { + let c_group = string_to_cpg_name(group)?; + ffi::cpg_join(handle.cpg_handle, &c_group) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Leave the currently joined CPG group, another group can now be joined on +/// the same [Handle] or [finalize] can be called to finish using CPG +pub fn leave(handle: Handle, group: &str) -> Result<()> { + let res = unsafe { + let c_group = string_to_cpg_name(group)?; + ffi::cpg_leave(handle.cpg_handle, &c_group) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the local node ID +pub fn local_get(handle: Handle) -> Result<NodeId> { + let mut nodeid: u32 = 0; + let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) }; + if res == ffi::CS_OK { + Ok(NodeId::from(nodeid)) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get a list of members of a CPG group as a vector of [Address] structs +pub fn membership_get(handle: Handle, group: &str) -> Result<Vec<Address>> { + let mut member_list_entries: i32 = 0; + let member_list = [ffi::cpg_address { + nodeid: 0, + pid: 0, + reason: 0, + }; CPG_MEMBERS_MAX]; + let res = unsafe { + let mut c_group = string_to_cpg_name(group)?; + let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address; + ffi::cpg_membership_get( + handle.cpg_handle, + &mut c_group, + &mut *c_memlist, + &mut member_list_entries, + ) + }; + if res == ffi::CS_OK { + Ok(cpg_array_to_vec( + member_list.as_ptr(), + member_list_entries as usize, + )) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the maximum size that CPG can send in one corosync message, +/// any messages sent via [mcast_joined] that are larger than this +/// will be fragmented +pub fn max_atomic_msgsize_get(handle: Handle) -> Result<u32> { + let mut asize: u32 = 0; + let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) }; + if res == ffi::CS_OK { + Ok(asize) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source +pub fn context_get(handle: Handle) -> Result<u64> { + let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void; + let (res, context) = unsafe { + let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context); + let context: u64 = c_context as u64; + (r, context) + }; + if res == ffi::CS_OK { + Ok(context) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source. +/// Normally this is set in [initialize], but this allows it to be changed +pub fn context_set(handle: Handle, context: u64) -> Result<()> { + let res = unsafe { + let c_context = context as *mut c_void; + ffi::cpg_context_set(handle.cpg_handle, c_context) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the flow control state of corosync CPG +pub fn flow_control_state_get(handle: Handle) -> Result<bool> { + let mut fc_state: u32 = 0; + let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) }; + if res == ffi::CS_OK { + if fc_state == 1 { + Ok(true) + } else { + Ok(false) + } + } else { + Err(CsError::from_c(res)) + } +} + +/// Send a message to the currently joined CPG group +pub fn mcast_joined(handle: Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> { + let c_iovec = ffi::iovec { + iov_base: msg.as_ptr() as *mut c_void, + iov_len: msg.len(), + }; + let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Type of iteration for [CpgIterStart] +#[derive(Copy, Clone)] +pub enum CpgIterType { + NameOnly = 1, + OneGroup = 2, + All = 3, +} + +// Iterator based on information on this page. thank you! +// https://stackoverflow.com/questions/30218886/how-to-implement-iterator-and-intoiterator-for-a-simple-struct +// Object to iterate over +/// An object to iterate over a list of CPG groups, create one of these and then use 'for' over it +pub struct CpgIterStart { + iter_handle: u64, +} + +/// struct returned from iterating over a [CpgIterStart] +pub struct CpgIter { + pub group: String, + pub nodeid: NodeId, + pub pid: u32, +} + +pub struct CpgIntoIter { + iter_handle: u64, +} + +impl fmt::Debug for CpgIter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "[group: {}, nodeid: {}, pid: {}]", + self.group, self.nodeid, self.pid + ) + } +} + +impl Iterator for CpgIntoIter { + type Item = CpgIter; + + fn next(&mut self) -> Option<CpgIter> { + let mut c_iter_description = ffi::cpg_iteration_description_t { + nodeid: 0, + pid: 0, + group: ffi::cpg_name { + length: 0_u32, + value: [0; CPG_NAMELEN_MAX], + }, + }; + let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) }; + + if res == ffi::CS_OK { + let r_group = + match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) { + Ok(groupname) => groupname, + Err(_) => return None, + }; + Some(CpgIter { + group: r_group, + nodeid: NodeId::from(c_iter_description.nodeid), + pid: c_iter_description.pid, + }) + } else if res == ffi::CS_ERR_NO_SECTIONS { + // End of list + unsafe { + // Yeah, we don't check this return code. There's nowhere to report it. + ffi::cpg_iteration_finalize(self.iter_handle) + }; + None + } else { + None + } + } +} + +impl CpgIterStart { + /// Create a new [CpgIterStart] object for iterating over a list of active CPG groups + pub fn new(cpg_handle: Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> { + let mut iter_handle: u64 = 0; + let res = unsafe { + let mut c_group = string_to_cpg_name(group)?; + let c_itertype = iter_type as u32; + // IterType 'All' requires that the group pointer is passed in as NULL + let c_group_ptr = { + match iter_type { + CpgIterType::All => std::ptr::null_mut(), + _ => &mut c_group, + } + }; + ffi::cpg_iteration_initialize( + cpg_handle.cpg_handle, + c_itertype, + c_group_ptr, + &mut iter_handle, + ) + }; + if res == ffi::CS_OK { + Ok(CpgIterStart { iter_handle }) + } else { + Err(CsError::from_c(res)) + } + } +} + +impl IntoIterator for CpgIterStart { + type Item = CpgIter; + type IntoIter = CpgIntoIter; + + fn into_iter(self) -> Self::IntoIter { + CpgIntoIter { + iter_handle: self.iter_handle, + } + } +} diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs new file mode 100644 index 0000000..dbf34fc --- /dev/null +++ b/bindings/rust/src/lib.rs @@ -0,0 +1,296 @@ +//! This crate provides access to the corosync libraries cpg, cfg, cmap, quorum & votequorum +//! from Rust. They are a fairly thin layer around the actual API calls but with Rust data types +//! and iterators. +//! +//! Corosync is a low-level provider of cluster services for high-availability clusters, +//! for more information about corosync see <https://corosync.github.io/corosync/> +//! +//! No more information about corosync itself will be provided here, it is expected that if +//! you feel you need access to the Corosync API calls, you know what they do :) +//! +//! # Example +//! ``` +//! extern crate rust_corosync as corosync; +//! use corosync::cmap; +//! +//! fn main() +//! { +//! // Open connection to corosync libcmap +//! let handle = +//! match cmap::initialize(cmap::Map::Icmap) { +//! Ok(h) => { +//! println!("cmap initialized."); +//! h +//! } +//! Err(e) => { +//! println!("Error in CMAP (Icmap) init: {}", e); +//! return; +//! } +//! }; +//! +//! // Set a numeric value (this is a generic fn) +//! match cmap::set_number(handle, "test.test_uint32", 456) +//! { +//! Ok(_) => {} +//! Err(e) => { +//! println!("Error in CMAP set_u32: {}", e); +//! return; +//! } +//! }; +//! +//! // Get a value - this will be a Data struct +//! match cmap::get(handle, "test.test_uint32") +//! { +//! Ok(v) => { +//! println!("GOT value {}", v); +//! } +//! Err(e) => { +//! println!("Error in CMAP get: {}", e); +//! return; +//! } +//! }; +//! +//! // Use an iterator +//! match cmap::CmapIterStart::new(handle, "totem.") { +//! Ok(cmap_iter) => { +//! for i in cmap_iter { +//! println!("ITER: {:?}", i); +//! } +//! println!(""); +//! } +//! Err(e) => { +//! println!("Error in CMAP iter start: {}", e); +//! } +//! } +//! +//! // Close this connection +//! match cmap::finalize(handle) +//! { +//! Ok(_) => {} +//! Err(e) => { +//! println!("Error in CMAP get: {}", e); +//! return; +//! } +//! }; +//! } + +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate bitflags; + +/// cfg is the internal configuration and information library for corosync, it is +/// mainly used by internal tools but may also contain API calls useful to some applications +/// that need detailed information about or control of the operation of corosync and the cluster. +pub mod cfg; +/// cmap is the internal 'database' of corosync - though it is NOT replicated. Mostly it contains +/// a copy of the corosync.conf file and information about the running state of the daemon. +/// The cmap API provides two 'maps'. Icmap, which is as above, and Stats, which contains very detailed +/// statistics on the running system, this includes network and IPC calls. +pub mod cmap; +/// cpg is the Control Process Groups subsystem of corosync and is usually used for sending +/// messages around the cluster. All processes using CPG belong to a named group (whose members +/// they can query) and all messages are sent with delivery guarantees. +pub mod cpg; +/// Quorum provides basic information about the quorate state of the cluster with callbacks +/// when nodelists change. +pub mod quorum; +///votequorum is the main quorum provider for corosync, using this API, users can query the state +/// of nodes in the cluster, request callbacks when the nodelists change, and set up a quorum device. +pub mod votequorum; + +mod sys; + +use num_enum::TryFromPrimitive; +use std::convert::TryFrom; +use std::error::Error; +use std::ffi::CString; +use std::fmt; +use std::ptr::copy_nonoverlapping; + +// This needs to be kept up-to-date! +/// Error codes returned from the corosync libraries +#[derive(Debug, Eq, PartialEq, Copy, Clone, TryFromPrimitive)] +#[repr(u32)] +pub enum CsError { + CsOk = 1, + CsErrLibrary = 2, + CsErrVersion = 3, + CsErrInit = 4, + CsErrTimeout = 5, + CsErrTryAgain = 6, + CsErrInvalidParam = 7, + CsErrNoMemory = 8, + CsErrBadHandle = 9, + CsErrBusy = 10, + CsErrAccess = 11, + CsErrNotExist = 12, + CsErrNameTooLong = 13, + CsErrExist = 14, + CsErrNoSpace = 15, + CsErrInterrupt = 16, + CsErrNameNotFound = 17, + CsErrNoResources = 18, + CsErrNotSupported = 19, + CsErrBadOperation = 20, + CsErrFailedOperation = 21, + CsErrMessageError = 22, + CsErrQueueFull = 23, + CsErrQueueNotAvailable = 24, + CsErrBadFlags = 25, + CsErrTooBig = 26, + CsErrNoSection = 27, + CsErrContextNotFound = 28, + CsErrTooManyGroups = 30, + CsErrSecurity = 100, + #[num_enum(default)] + CsErrRustCompat = 998, // Set if we get a unknown return from corosync + CsErrRustString = 999, // Set if we get a string conversion error +} + +/// Result type returned from most corosync library calls. +/// Contains a [CsError] and possibly other data as required +pub type Result<T> = ::std::result::Result<T, CsError>; + +impl fmt::Display for CsError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + CsError::CsOk => write!(f, "OK"), + CsError::CsErrLibrary => write!(f, "ErrLibrary"), + CsError::CsErrVersion => write!(f, "ErrVersion"), + CsError::CsErrInit => write!(f, "ErrInit"), + CsError::CsErrTimeout => write!(f, "ErrTimeout"), + CsError::CsErrTryAgain => write!(f, "ErrTryAgain"), + CsError::CsErrInvalidParam => write!(f, "ErrInvalidParam"), + CsError::CsErrNoMemory => write!(f, "ErrNoMemory"), + CsError::CsErrBadHandle => write!(f, "ErrbadHandle"), + CsError::CsErrBusy => write!(f, "ErrBusy"), + CsError::CsErrAccess => write!(f, "ErrAccess"), + CsError::CsErrNotExist => write!(f, "ErrNotExist"), + CsError::CsErrNameTooLong => write!(f, "ErrNameTooLong"), + CsError::CsErrExist => write!(f, "ErrExist"), + CsError::CsErrNoSpace => write!(f, "ErrNoSpace"), + CsError::CsErrInterrupt => write!(f, "ErrInterrupt"), + CsError::CsErrNameNotFound => write!(f, "ErrNameNotFound"), + CsError::CsErrNoResources => write!(f, "ErrNoResources"), + CsError::CsErrNotSupported => write!(f, "ErrNotSupported"), + CsError::CsErrBadOperation => write!(f, "ErrBadOperation"), + CsError::CsErrFailedOperation => write!(f, "ErrFailedOperation"), + CsError::CsErrMessageError => write!(f, "ErrMEssageError"), + CsError::CsErrQueueFull => write!(f, "ErrQueueFull"), + CsError::CsErrQueueNotAvailable => write!(f, "ErrQueueNotAvailable"), + CsError::CsErrBadFlags => write!(f, "ErrBadFlags"), + CsError::CsErrTooBig => write!(f, "ErrTooBig"), + CsError::CsErrNoSection => write!(f, "ErrNoSection"), + CsError::CsErrContextNotFound => write!(f, "ErrContextNotFound"), + CsError::CsErrTooManyGroups => write!(f, "ErrTooManyGroups"), + CsError::CsErrSecurity => write!(f, "ErrSecurity"), + CsError::CsErrRustCompat => write!(f, "ErrRustCompat"), + CsError::CsErrRustString => write!(f, "ErrRustString"), + } + } +} + +impl Error for CsError {} + +// This is dependant on the num_enum crate, converts a C cs_error_t into the Rust enum +// There seems to be some debate as to whether this should be part of the language: +// https://internals.rust-lang.org/t/pre-rfc-enum-from-integer/6348/25 +impl CsError { + fn from_c(cserr: u32) -> CsError { + match CsError::try_from(cserr) { + Ok(e) => e, + Err(_) => CsError::CsErrRustCompat, + } + } +} + +/// Flags to use with dispatch functions, eg [cpg::dispatch] +/// One will dispatch a single callback (blocking) and return. +/// All will loop trying to dispatch all possible callbacks. +/// Blocking is like All but will block between callbacks. +/// OneNonBlocking will dispatch a single callback only if one is available, +/// otherwise it will return even if no callback is available. +#[derive(Copy, Clone)] +// The numbers match the C enum, of course. +pub enum DispatchFlags { + One = 1, + All = 2, + Blocking = 3, + OneNonblocking = 4, +} + +/// Flags to use with (most) tracking API calls +#[derive(Copy, Clone)] +// Same here +pub enum TrackFlags { + Current = 1, + Changes = 2, + ChangesOnly = 4, +} + +/// A corosync nodeid +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct NodeId { + id: u32, +} + +impl fmt::Display for NodeId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.id) + } +} + +// Conversion from a NodeId to and from u32 +impl From<u32> for NodeId { + fn from(id: u32) -> NodeId { + NodeId { id } + } +} + +impl From<NodeId> for u32 { + fn from(nodeid: NodeId) -> u32 { + nodeid.id + } +} + +// General internal routine to copy bytes from a C array into a Rust String +fn string_from_bytes(bytes: *const ::std::os::raw::c_char, max_length: usize) -> Result<String> { + let mut newbytes = vec![0u8; max_length]; + + // Get length of the string in old-fashioned style + let mut length: usize = 0; + let mut count = 0; + let mut tmpbytes = bytes; + while count < max_length || length == 0 { + if unsafe { *tmpbytes } == 0 && length == 0 { + length = count; + break; + } + count += 1; + tmpbytes = unsafe { tmpbytes.offset(1) } + } + + // Cope with an empty string + if length == 0 { + return Ok(String::new()); + } + + unsafe { + // We need to fully copy it, not shallow copy it. + // Messy casting on both parts of the copy here to get it to work on both signed + // and unsigned char machines + copy_nonoverlapping(bytes as *mut i8, newbytes.as_mut_ptr() as *mut i8, length); + } + + let cs = match CString::new(&newbytes[0..length]) { + Ok(c1) => c1, + Err(_) => return Err(CsError::CsErrRustString), + }; + + // This is just to convert the error type + match cs.into_string() { + Ok(s) => Ok(s), + Err(_) => Err(CsError::CsErrRustString), + } +} diff --git a/bindings/rust/src/quorum.rs b/bindings/rust/src/quorum.rs new file mode 100644 index 0000000..25c2fe6 --- /dev/null +++ b/bindings/rust/src/quorum.rs @@ -0,0 +1,298 @@ +// libquorum interface for Rust +// Copyright (c) 2021 Red Hat, Inc. +// +// All rights reserved. +// +// Author: Christine Caulfield (ccaulfi@redhat.com) +// + +#![allow(clippy::type_complexity)] +#![allow(clippy::needless_range_loop)] +#![allow(clippy::single_match)] + +// For the code generated by bindgen +use crate::sys::quorum as ffi; + +use crate::{CsError, DispatchFlags, NodeId, Result, TrackFlags}; +use std::collections::HashMap; +use std::os::raw::{c_int, c_void}; +use std::slice; +use std::sync::Mutex; + +/// Data for model1 [initialize] +#[derive(Copy, Clone)] +pub enum ModelData { + ModelNone, + ModelV1(Model1Data), +} + +/// Value returned from [initialize]. Indicates whether quorum is currently active on this cluster. +pub enum QuorumType { + Free, + Set, +} + +/// Flags for [initialize], none currently supported +#[derive(Copy, Clone)] +pub enum Model1Flags { + None, +} + +/// RingId returned in quorum_notification_fn +pub struct RingId { + pub nodeid: NodeId, + pub seq: u64, +} + +// Used to convert a QUORUM handle into one of ours +lazy_static! { + static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new()); +} + +fn list_to_vec(list_entries: u32, list: *const u32) -> Vec<NodeId> { + let mut r_member_list = Vec::<NodeId>::new(); + let temp_members: &[u32] = unsafe { slice::from_raw_parts(list, list_entries as usize) }; + for i in 0..list_entries as usize { + r_member_list.push(NodeId::from(temp_members[i])); + } + r_member_list +} + +// Called from quorum callback function - munge params back to Rust from C +extern "C" fn rust_quorum_notification_fn( + handle: ffi::quorum_handle_t, + quorate: u32, + ring_id: ffi::quorum_ring_id, + member_list_entries: u32, + member_list: *const u32, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_ring_id = RingId { + nodeid: NodeId::from(ring_id.nodeid), + seq: ring_id.seq, + }; + let r_member_list = list_to_vec(member_list_entries, member_list); + let r_quorate = match quorate { + 0 => false, + 1 => true, + _ => false, + }; + match &h.model_data { + ModelData::ModelV1(md) => { + if let Some(cb) = md.quorum_notification_fn { + (cb)(h, r_quorate, r_ring_id, r_member_list); + } + } + _ => {} + } + } +} + +extern "C" fn rust_nodelist_notification_fn( + handle: ffi::quorum_handle_t, + ring_id: ffi::quorum_ring_id, + member_list_entries: u32, + member_list: *const u32, + joined_list_entries: u32, + joined_list: *const u32, + left_list_entries: u32, + left_list: *const u32, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_ring_id = RingId { + nodeid: NodeId::from(ring_id.nodeid), + seq: ring_id.seq, + }; + + let r_member_list = list_to_vec(member_list_entries, member_list); + let r_joined_list = list_to_vec(joined_list_entries, joined_list); + let r_left_list = list_to_vec(left_list_entries, left_list); + + match &h.model_data { + ModelData::ModelV1(md) => { + if let Some(cb) = md.nodelist_notification_fn { + (cb)(h, r_ring_id, r_member_list, r_joined_list, r_left_list); + } + } + _ => {} + } + } +} + +#[derive(Copy, Clone)] +/// Data for model1 [initialize] +pub struct Model1Data { + pub flags: Model1Flags, + pub quorum_notification_fn: + Option<fn(hande: &Handle, quorate: bool, ring_id: RingId, member_list: Vec<NodeId>)>, + pub nodelist_notification_fn: Option< + fn( + hande: &Handle, + ring_id: RingId, + member_list: Vec<NodeId>, + joined_list: Vec<NodeId>, + left_list: Vec<NodeId>, + ), + >, +} + +/// A handle into the quorum library. Returned from [initialize] and needed for all other calls +#[derive(Copy, Clone)] +pub struct Handle { + quorum_handle: u64, + model_data: ModelData, +} + +/// Initialize a connection to the quorum library. You must call this before doing anything +/// else and use the passed back [Handle]. +/// Remember to free the handle using [finalize] when finished. +pub fn initialize(model_data: &ModelData, context: u64) -> Result<(Handle, QuorumType)> { + let mut handle: ffi::quorum_handle_t = 0; + let mut quorum_type: u32 = 0; + + let mut m = match model_data { + ModelData::ModelV1(_v1) => ffi::quorum_model_v1_data_t { + model: ffi::QUORUM_MODEL_V1, + quorum_notify_fn: Some(rust_quorum_notification_fn), + nodelist_notify_fn: Some(rust_nodelist_notification_fn), + }, + // Only V1 supported. No point in doing legacy stuff in a new binding + _ => return Err(CsError::CsErrInvalidParam), + }; + + handle = unsafe { + let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void; + let c_model: *mut ffi::quorum_model_data_t = + &mut m as *mut _ as *mut ffi::quorum_model_data_t; + let res = ffi::quorum_model_initialize( + &mut handle, + m.model, + c_model, + &mut quorum_type, + c_context, + ); + + if res == ffi::CS_OK { + handle + } else { + return Err(CsError::from_c(res)); + } + }; + + let quorum_type = match quorum_type { + 0 => QuorumType::Free, + 1 => QuorumType::Set, + _ => QuorumType::Set, + }; + let rhandle = Handle { + quorum_handle: handle, + model_data: *model_data, + }; + HANDLE_HASH.lock().unwrap().insert(handle, rhandle); + Ok((rhandle, quorum_type)) +} + +/// Finish with a connection to corosync +pub fn finalize(handle: Handle) -> Result<()> { + let res = unsafe { ffi::quorum_finalize(handle.quorum_handle) }; + if res == ffi::CS_OK { + HANDLE_HASH.lock().unwrap().remove(&handle.quorum_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Not sure if an FD is the right thing to return here, but it will do for now. +/// Return a file descriptor to use for poll/select on the QUORUM handle +pub fn fd_get(handle: Handle) -> Result<i32> { + let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let res = unsafe { ffi::quorum_fd_get(handle.quorum_handle, c_fd) }; + if res == ffi::CS_OK { + Ok(c_fd as i32) + } else { + Err(CsError::from_c(res)) + } +} + +/// Display any/all active QUORUM callbacks for this [Handle], see [DispatchFlags] for details +pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { + let res = unsafe { ffi::quorum_dispatch(handle.quorum_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Return the quorate status of the cluster +pub fn getquorate(handle: Handle) -> Result<bool> { + let c_quorate: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let (res, r_quorate) = unsafe { + let res = ffi::quorum_getquorate(handle.quorum_handle, c_quorate); + let r_quorate: i32 = *c_quorate; + (res, r_quorate) + }; + if res == ffi::CS_OK { + match r_quorate { + 0 => Ok(false), + 1 => Ok(true), + _ => Err(CsError::CsErrLibrary), + } + } else { + Err(CsError::from_c(res)) + } +} + +/// Track node and quorum changes +pub fn trackstart(handle: Handle, flags: TrackFlags) -> Result<()> { + let res = unsafe { ffi::quorum_trackstart(handle.quorum_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Stop tracking node and quorum changes +pub fn trackstop(handle: Handle) -> Result<()> { + let res = unsafe { ffi::quorum_trackstop(handle.quorum_handle) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source +pub fn context_get(handle: Handle) -> Result<u64> { + let (res, context) = unsafe { + let mut context: u64 = 0; + let c_context: *mut c_void = &mut context as *mut _ as *mut c_void; + let r = ffi::quorum_context_get(handle.quorum_handle, c_context as *mut *const c_void); + (r, context) + }; + if res == ffi::CS_OK { + Ok(context) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source. +/// Normally this is set in [initialize], but this allows it to be changed +pub fn context_set(handle: Handle, context: u64) -> Result<()> { + let res = unsafe { + let c_context = context as *mut c_void; + ffi::quorum_context_set(handle.quorum_handle, c_context) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} diff --git a/bindings/rust/src/sys/mod.rs b/bindings/rust/src/sys/mod.rs new file mode 100644 index 0000000..03dfec2 --- /dev/null +++ b/bindings/rust/src/sys/mod.rs @@ -0,0 +1,7 @@ +#![allow(non_camel_case_types, non_snake_case, dead_code, improper_ctypes)] + +pub mod cfg; +pub mod cmap; +pub mod cpg; +pub mod quorum; +pub mod votequorum; diff --git a/bindings/rust/src/votequorum.rs b/bindings/rust/src/votequorum.rs new file mode 100644 index 0000000..4718b58 --- /dev/null +++ b/bindings/rust/src/votequorum.rs @@ -0,0 +1,501 @@ +// libvotequorum interface for Rust +// Copyright (c) 2021 Red Hat, Inc. +// +// All rights reserved. +// +// Author: Christine Caulfield (ccaulfi@redhat.com) +// + +#![allow(clippy::type_complexity)] +#![allow(clippy::needless_range_loop)] +#![allow(clippy::single_match)] + +// For the code generated by bindgen +use crate::sys::votequorum as ffi; + +use std::collections::HashMap; +use std::ffi::CString; +use std::fmt; +use std::os::raw::{c_int, c_void}; +use std::slice; +use std::sync::Mutex; + +use crate::string_from_bytes; +use crate::{CsError, DispatchFlags, NodeId, Result, TrackFlags}; + +/// RingId returned by votequorum_notification_fn +pub struct RingId { + pub nodeid: NodeId, + pub seq: u64, +} + +// Used to convert a VOTEQUORUM handle into one of ours +lazy_static! { + static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new()); +} + +/// Current state of a node in the cluster, part of the [NodeInfo] and [Node] structs +pub enum NodeState { + Member, + Dead, + Leaving, + Unknown, +} +impl NodeState { + pub fn new(state: u32) -> NodeState { + match state { + 1 => NodeState::Member, + 2 => NodeState::Dead, + 3 => NodeState::Leaving, + _ => NodeState::Unknown, + } + } +} +impl fmt::Debug for NodeState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + NodeState::Member => write!(f, "Member"), + NodeState::Dead => write!(f, "Dead"), + NodeState::Leaving => write!(f, "Leaving"), + _ => write!(f, "Unknown"), + } + } +} + +/// Basic information about a node in the cluster. Contains [NodeId], and [NodeState] +pub struct Node { + nodeid: NodeId, + state: NodeState, +} +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "nodeid: {}, state: {:?}", self.nodeid, self.state) + } +} + +bitflags! { +/// Flags in the [NodeInfo] struct + pub struct NodeInfoFlags: u32 + { + const VOTEQUORUM_INFO_TWONODE = 1; + const VOTEQUORUM_INFO_QUORATE = 2; + const VOTEQUORUM_INFO_WAIT_FOR_ALL = 4; + const VOTEQUORUM_INFO_LAST_MAN_STANDING = 8; + const VOTEQUORUM_INFO_AUTO_TIE_BREAKER = 16; + const VOTEQUORUM_INFO_ALLOW_DOWNSCALE = 32; + const VOTEQUORUM_INFO_QDEVICE_REGISTERED = 64; + const VOTEQUORUM_INFO_QDEVICE_ALIVE = 128; + const VOTEQUORUM_INFO_QDEVICE_CAST_VOTE = 256; + const VOTEQUORUM_INFO_QDEVICE_MASTER_WINS = 512; + } +} + +/// Detailed information about a node in the cluster, returned from [get_info] +pub struct NodeInfo { + pub node_id: NodeId, + pub node_state: NodeState, + pub node_votes: u32, + pub node_expected_votes: u32, + pub highest_expected: u32, + pub quorum: u32, + pub flags: NodeInfoFlags, + pub qdevice_votes: u32, + pub qdevice_name: String, +} + +// Turn a C nodeID list into a vec of NodeIds +fn list_to_vec(list_entries: u32, list: *const u32) -> Vec<NodeId> { + let mut r_member_list = Vec::<NodeId>::new(); + let temp_members: &[u32] = unsafe { slice::from_raw_parts(list, list_entries as usize) }; + for i in 0..list_entries as usize { + r_member_list.push(NodeId::from(temp_members[i])); + } + r_member_list +} + +// Called from votequorum callback function - munge params back to Rust from C +extern "C" fn rust_expectedvotes_notification_fn( + handle: ffi::votequorum_handle_t, + context: u64, + expected_votes: u32, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + if let Some(cb) = h.callbacks.expectedvotes_notification_fn { + (cb)(h, context, expected_votes); + } + } +} + +// Called from votequorum callback function - munge params back to Rust from C +extern "C" fn rust_quorum_notification_fn( + handle: ffi::votequorum_handle_t, + context: u64, + quorate: u32, + node_list_entries: u32, + node_list: *mut ffi::votequorum_node_t, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_quorate = match quorate { + 0 => false, + 1 => true, + _ => false, + }; + let mut r_node_list = Vec::<Node>::new(); + let temp_members: &[ffi::votequorum_node_t] = + unsafe { slice::from_raw_parts(node_list, node_list_entries as usize) }; + for i in 0..node_list_entries as usize { + r_node_list.push(Node { + nodeid: NodeId::from(temp_members[i].nodeid), + state: NodeState::new(temp_members[i].state), + }); + } + if let Some(cb) = h.callbacks.quorum_notification_fn { + (cb)(h, context, r_quorate, r_node_list); + } + } +} + +// Called from votequorum callback function - munge params back to Rust from C +extern "C" fn rust_nodelist_notification_fn( + handle: ffi::votequorum_handle_t, + context: u64, + ring_id: ffi::votequorum_ring_id_t, + node_list_entries: u32, + node_list: *mut u32, +) { + if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) { + let r_ring_id = RingId { + nodeid: NodeId::from(ring_id.nodeid), + seq: ring_id.seq, + }; + + let r_node_list = list_to_vec(node_list_entries, node_list); + + if let Some(cb) = h.callbacks.nodelist_notification_fn { + (cb)(h, context, r_ring_id, r_node_list); + } + } +} + +/// Callbacks that can be called from votequorum, pass these in to [initialize] +#[derive(Copy, Clone)] +pub struct Callbacks { + pub quorum_notification_fn: + Option<fn(hande: &Handle, context: u64, quorate: bool, node_list: Vec<Node>)>, + pub nodelist_notification_fn: + Option<fn(hande: &Handle, context: u64, ring_id: RingId, node_list: Vec<NodeId>)>, + pub expectedvotes_notification_fn: + Option<fn(handle: &Handle, context: u64, expected_votes: u32)>, +} + +/// A handle into the votequorum library. Returned from [initialize] and needed for all other calls +#[derive(Copy, Clone)] +pub struct Handle { + votequorum_handle: u64, + callbacks: Callbacks, +} + +/// Initialize a connection to the votequorum library. You must call this before doing anything +/// else and use the passed back [Handle]. +/// Remember to free the handle using [finalize] when finished. +pub fn initialize(callbacks: &Callbacks) -> Result<Handle> { + let mut handle: ffi::votequorum_handle_t = 0; + + let mut c_callbacks = ffi::votequorum_callbacks_t { + votequorum_quorum_notify_fn: Some(rust_quorum_notification_fn), + votequorum_nodelist_notify_fn: Some(rust_nodelist_notification_fn), + votequorum_expectedvotes_notify_fn: Some(rust_expectedvotes_notification_fn), + }; + + unsafe { + let res = ffi::votequorum_initialize(&mut handle, &mut c_callbacks); + if res == ffi::CS_OK { + let rhandle = Handle { + votequorum_handle: handle, + callbacks: *callbacks, + }; + HANDLE_HASH.lock().unwrap().insert(handle, rhandle); + Ok(rhandle) + } else { + Err(CsError::from_c(res)) + } + } +} + +/// Finish with a connection to corosync +pub fn finalize(handle: Handle) -> Result<()> { + let res = unsafe { ffi::votequorum_finalize(handle.votequorum_handle) }; + if res == ffi::CS_OK { + HANDLE_HASH + .lock() + .unwrap() + .remove(&handle.votequorum_handle); + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +// Not sure if an FD is the right thing to return here, but it will do for now. +/// Return a file descriptor to use for poll/select on the VOTEQUORUM handle +pub fn fd_get(handle: Handle) -> Result<i32> { + let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int; + let res = unsafe { ffi::votequorum_fd_get(handle.votequorum_handle, c_fd) }; + if res == ffi::CS_OK { + Ok(c_fd as i32) + } else { + Err(CsError::from_c(res)) + } +} + +const VOTEQUORUM_QDEVICE_MAX_NAME_LEN: usize = 255; + +/// Returns detailed information about a node in a [NodeInfo] structure +pub fn get_info(handle: Handle, nodeid: NodeId) -> Result<NodeInfo> { + let mut c_info = ffi::votequorum_info { + node_id: 0, + node_state: 0, + node_votes: 0, + node_expected_votes: 0, + highest_expected: 0, + total_votes: 0, + quorum: 0, + flags: 0, + qdevice_votes: 0, + qdevice_name: [0; 255usize], + }; + let res = unsafe { + ffi::votequorum_getinfo(handle.votequorum_handle, u32::from(nodeid), &mut c_info) + }; + + if res == ffi::CS_OK { + let info = NodeInfo { + node_id: NodeId::from(c_info.node_id), + node_state: NodeState::new(c_info.node_state), + node_votes: c_info.node_votes, + node_expected_votes: c_info.node_expected_votes, + highest_expected: c_info.highest_expected, + quorum: c_info.quorum, + flags: NodeInfoFlags { bits: c_info.flags }, + qdevice_votes: c_info.qdevice_votes, + qdevice_name: match string_from_bytes( + c_info.qdevice_name.as_ptr(), + VOTEQUORUM_QDEVICE_MAX_NAME_LEN, + ) { + Ok(s) => s, + Err(_) => String::new(), + }, + }; + Ok(info) + } else { + Err(CsError::from_c(res)) + } +} + +/// Call any/all active votequorum callbacks for this [Handle]. see [DispatchFlags] for details +pub fn dispatch(handle: Handle, flags: DispatchFlags) -> Result<()> { + let res = unsafe { ffi::votequorum_dispatch(handle.votequorum_handle, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Track node and votequorum changes +pub fn trackstart(handle: Handle, context: u64, flags: TrackFlags) -> Result<()> { + let res = + unsafe { ffi::votequorum_trackstart(handle.votequorum_handle, context, flags as u32) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Stop tracking node and votequorum changes +pub fn trackstop(handle: Handle) -> Result<()> { + let res = unsafe { ffi::votequorum_trackstop(handle.votequorum_handle) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Get the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source +pub fn context_get(handle: Handle) -> Result<u64> { + let (res, context) = unsafe { + let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void; + let r = ffi::votequorum_context_get(handle.votequorum_handle, &mut c_context); + let context: u64 = c_context as u64; + (r, context) + }; + if res == ffi::CS_OK { + Ok(context) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current 'context' value for this handle. +/// The context value is an arbitrary value that is always passed +/// back to callbacks to help identify the source. +/// Normally this is set in [trackstart], but this allows it to be changed +pub fn context_set(handle: Handle, context: u64) -> Result<()> { + let res = unsafe { + let c_context = context as *mut c_void; + ffi::votequorum_context_set(handle.votequorum_handle, c_context) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current expected_votes for the cluster, this value must +/// be valid and not result in an inquorate cluster. +pub fn set_expected(handle: Handle, expected_votes: u32) -> Result<()> { + let res = unsafe { ffi::votequorum_setexpected(handle.votequorum_handle, expected_votes) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Set the current votes for a node +pub fn set_votes(handle: Handle, nodeid: NodeId, votes: u32) -> Result<()> { + let res = + unsafe { ffi::votequorum_setvotes(handle.votequorum_handle, u32::from(nodeid), votes) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Register a quorum device +pub fn qdevice_register(handle: Handle, name: &str) -> Result<()> { + let c_string = { + match CString::new(name) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let res = + unsafe { ffi::votequorum_qdevice_register(handle.votequorum_handle, c_string.as_ptr()) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Unregister a quorum device +pub fn qdevice_unregister(handle: Handle, name: &str) -> Result<()> { + let c_string = { + match CString::new(name) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let res = + unsafe { ffi::votequorum_qdevice_unregister(handle.votequorum_handle, c_string.as_ptr()) }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Update the name of a quorum device +pub fn qdevice_update(handle: Handle, oldname: &str, newname: &str) -> Result<()> { + let on_string = { + match CString::new(oldname) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + let nn_string = { + match CString::new(newname) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let res = unsafe { + ffi::votequorum_qdevice_update( + handle.votequorum_handle, + on_string.as_ptr(), + nn_string.as_ptr(), + ) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Poll a quorum device +/// This must be done more often than the qdevice timeout (default 10s) while the device is active +/// and the [RingId] must match the current value returned from the callbacks for it to be accepted. +pub fn qdevice_poll(handle: Handle, name: &str, cast_vote: bool, ring_id: &RingId) -> Result<()> { + let c_string = { + match CString::new(name) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let c_cast_vote: u32 = u32::from(cast_vote); + let c_ring_id = ffi::votequorum_ring_id_t { + nodeid: u32::from(ring_id.nodeid), + seq: ring_id.seq, + }; + + let res = unsafe { + ffi::votequorum_qdevice_poll( + handle.votequorum_handle, + c_string.as_ptr(), + c_cast_vote, + c_ring_id, + ) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} + +/// Allow qdevice to tell votequorum if master_wins can be enabled or not +pub fn qdevice_master_wins(handle: Handle, name: &str, master_wins: bool) -> Result<()> { + let c_string = { + match CString::new(name) { + Ok(cs) => cs, + Err(_) => return Err(CsError::CsErrInvalidParam), + } + }; + + let c_master_wins: u32 = u32::from(master_wins); + + let res = unsafe { + ffi::votequorum_qdevice_master_wins( + handle.votequorum_handle, + c_string.as_ptr(), + c_master_wins, + ) + }; + if res == ffi::CS_OK { + Ok(()) + } else { + Err(CsError::from_c(res)) + } +} |