diff options
Diffstat (limited to 'third_party/rust/nix/src/mqueue.rs')
-rw-r--r-- | third_party/rust/nix/src/mqueue.rs | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/third_party/rust/nix/src/mqueue.rs b/third_party/rust/nix/src/mqueue.rs new file mode 100644 index 0000000000..fb07d2accb --- /dev/null +++ b/third_party/rust/nix/src/mqueue.rs @@ -0,0 +1,358 @@ +//! Posix Message Queue functions +//! +//! # Example +//! +// no_run because a kernel module may be required. +//! ```no_run +//! # use std::ffi::CString; +//! # use nix::mqueue::*; +//! use nix::sys::stat::Mode; +//! +//! const MSG_SIZE: mq_attr_member_t = 32; +//! let mq_name= "/a_nix_test_queue"; +//! +//! let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; +//! let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; +//! let mqd0 = mq_open(mq_name, oflag0, mode, None).unwrap(); +//! let msg_to_send = b"msg_1"; +//! mq_send(&mqd0, msg_to_send, 1).unwrap(); +//! +//! let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY; +//! let mqd1 = mq_open(mq_name, oflag1, mode, None).unwrap(); +//! let mut buf = [0u8; 32]; +//! let mut prio = 0u32; +//! let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap(); +//! assert_eq!(prio, 1); +//! assert_eq!(msg_to_send, &buf[0..len]); +//! +//! mq_close(mqd1).unwrap(); +//! mq_close(mqd0).unwrap(); +//! ``` +//! [Further reading and details on the C API](https://man7.org/linux/man-pages/man7/mq_overview.7.html) + +use crate::errno::Errno; +use crate::NixPath; +use crate::Result; + +use crate::sys::stat::Mode; +use libc::{self, c_char, mqd_t, size_t}; +use std::mem; +#[cfg(any( + target_os = "linux", + target_os = "netbsd", + target_os = "dragonfly" +))] +use std::os::unix::io::{ + AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd, +}; + +libc_bitflags! { + /// Used with [`mq_open`]. + pub struct MQ_OFlag: libc::c_int { + /// Open the message queue for receiving messages. + O_RDONLY; + /// Open the queue for sending messages. + O_WRONLY; + /// Open the queue for both receiving and sending messages + O_RDWR; + /// Create a message queue. + O_CREAT; + /// If set along with `O_CREAT`, `mq_open` will fail if the message + /// queue name exists. + O_EXCL; + /// `mq_send` and `mq_receive` should fail with `EAGAIN` rather than + /// wait for resources that are not currently available. + O_NONBLOCK; + /// Set the close-on-exec flag for the message queue descriptor. + O_CLOEXEC; + } +} + +/// A message-queue attribute, optionally used with [`mq_setattr`] and +/// [`mq_getattr`] and optionally [`mq_open`], +#[repr(C)] +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub struct MqAttr { + mq_attr: libc::mq_attr, +} + +/// Identifies an open POSIX Message Queue +// A safer wrapper around libc::mqd_t, which is a pointer on some platforms +// Deliberately is not Clone to prevent use-after-close scenarios +#[repr(transparent)] +#[derive(Debug)] +#[allow(missing_copy_implementations)] +pub struct MqdT(mqd_t); + +// x32 compatibility +// See https://sourceware.org/bugzilla/show_bug.cgi?id=21279 +/// Size of a message queue attribute member +#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] +#[cfg_attr(docsrs, doc(cfg(all())))] +pub type mq_attr_member_t = i64; +/// Size of a message queue attribute member +#[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))] +#[cfg_attr(docsrs, doc(cfg(all())))] +pub type mq_attr_member_t = libc::c_long; + +impl MqAttr { + /// Create a new message queue attribute + /// + /// # Arguments + /// + /// - `mq_flags`: Either `0` or `O_NONBLOCK`. + /// - `mq_maxmsg`: Maximum number of messages on the queue. + /// - `mq_msgsize`: Maximum message size in bytes. + /// - `mq_curmsgs`: Number of messages currently in the queue. + pub fn new( + mq_flags: mq_attr_member_t, + mq_maxmsg: mq_attr_member_t, + mq_msgsize: mq_attr_member_t, + mq_curmsgs: mq_attr_member_t, + ) -> MqAttr { + let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit(); + unsafe { + let p = attr.as_mut_ptr(); + (*p).mq_flags = mq_flags; + (*p).mq_maxmsg = mq_maxmsg; + (*p).mq_msgsize = mq_msgsize; + (*p).mq_curmsgs = mq_curmsgs; + MqAttr { + mq_attr: attr.assume_init(), + } + } + } + + /// The current flags, either `0` or `O_NONBLOCK`. + pub const fn flags(&self) -> mq_attr_member_t { + self.mq_attr.mq_flags + } + + /// The max number of messages that can be held by the queue + pub const fn maxmsg(&self) -> mq_attr_member_t { + self.mq_attr.mq_maxmsg + } + + /// The maximum size of each message (in bytes) + pub const fn msgsize(&self) -> mq_attr_member_t { + self.mq_attr.mq_msgsize + } + + /// The number of messages currently held in the queue + pub const fn curmsgs(&self) -> mq_attr_member_t { + self.mq_attr.mq_curmsgs + } +} + +/// Open a message queue +/// +/// See also [`mq_open(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_open.html) +// The mode.bits() cast is only lossless on some OSes +#[allow(clippy::cast_lossless)] +pub fn mq_open<P>( + name: &P, + oflag: MQ_OFlag, + mode: Mode, + attr: Option<&MqAttr>, +) -> Result<MqdT> +where + P: ?Sized + NixPath, +{ + let res = name.with_nix_path(|cstr| match attr { + Some(mq_attr) => unsafe { + libc::mq_open( + cstr.as_ptr(), + oflag.bits(), + mode.bits() as libc::c_int, + &mq_attr.mq_attr as *const libc::mq_attr, + ) + }, + None => unsafe { libc::mq_open(cstr.as_ptr(), oflag.bits()) }, + })?; + + Errno::result(res).map(MqdT) +} + +/// Remove a message queue +/// +/// See also [`mq_unlink(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_unlink.html) +pub fn mq_unlink<P>(name: &P) -> Result<()> +where + P: ?Sized + NixPath, +{ + let res = + name.with_nix_path(|cstr| unsafe { libc::mq_unlink(cstr.as_ptr()) })?; + Errno::result(res).map(drop) +} + +/// Close a message queue +/// +/// See also [`mq_close(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_close.html) +pub fn mq_close(mqdes: MqdT) -> Result<()> { + let res = unsafe { libc::mq_close(mqdes.0) }; + Errno::result(res).map(drop) +} + +/// Receive a message from a message queue +/// +/// See also [`mq_receive(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html) +pub fn mq_receive( + mqdes: &MqdT, + message: &mut [u8], + msg_prio: &mut u32, +) -> Result<usize> { + let len = message.len() as size_t; + let res = unsafe { + libc::mq_receive( + mqdes.0, + message.as_mut_ptr() as *mut c_char, + len, + msg_prio as *mut u32, + ) + }; + Errno::result(res).map(|r| r as usize) +} + +feature! { + #![feature = "time"] + use crate::sys::time::TimeSpec; + /// Receive a message from a message queue with a timeout + /// + /// See also ['mq_timedreceive(2)'](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html) + pub fn mq_timedreceive( + mqdes: &MqdT, + message: &mut [u8], + msg_prio: &mut u32, + abstime: &TimeSpec, + ) -> Result<usize> { + let len = message.len() as size_t; + let res = unsafe { + libc::mq_timedreceive( + mqdes.0, + message.as_mut_ptr() as *mut c_char, + len, + msg_prio as *mut u32, + abstime.as_ref(), + ) + }; + Errno::result(res).map(|r| r as usize) + } +} + +/// Send a message to a message queue +/// +/// See also [`mq_send(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_send.html) +pub fn mq_send(mqdes: &MqdT, message: &[u8], msq_prio: u32) -> Result<()> { + let res = unsafe { + libc::mq_send( + mqdes.0, + message.as_ptr() as *const c_char, + message.len(), + msq_prio, + ) + }; + Errno::result(res).map(drop) +} + +/// Get message queue attributes +/// +/// See also [`mq_getattr(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_getattr.html) +pub fn mq_getattr(mqd: &MqdT) -> Result<MqAttr> { + let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit(); + let res = unsafe { libc::mq_getattr(mqd.0, attr.as_mut_ptr()) }; + Errno::result(res).map(|_| unsafe { + MqAttr { + mq_attr: attr.assume_init(), + } + }) +} + +/// Set the attributes of the message queue. Only `O_NONBLOCK` can be set, everything else will be ignored +/// Returns the old attributes +/// It is recommend to use the `mq_set_nonblock()` and `mq_remove_nonblock()` convenience functions as they are easier to use +/// +/// [Further reading](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_setattr.html) +pub fn mq_setattr(mqd: &MqdT, newattr: &MqAttr) -> Result<MqAttr> { + let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit(); + let res = unsafe { + libc::mq_setattr( + mqd.0, + &newattr.mq_attr as *const libc::mq_attr, + attr.as_mut_ptr(), + ) + }; + Errno::result(res).map(|_| unsafe { + MqAttr { + mq_attr: attr.assume_init(), + } + }) +} + +/// Convenience function. +/// Sets the `O_NONBLOCK` attribute for a given message queue descriptor +/// Returns the old attributes +#[allow(clippy::useless_conversion)] // Not useless on all OSes +pub fn mq_set_nonblock(mqd: &MqdT) -> Result<MqAttr> { + let oldattr = mq_getattr(mqd)?; + let newattr = MqAttr::new( + mq_attr_member_t::from(MQ_OFlag::O_NONBLOCK.bits()), + oldattr.mq_attr.mq_maxmsg, + oldattr.mq_attr.mq_msgsize, + oldattr.mq_attr.mq_curmsgs, + ); + mq_setattr(mqd, &newattr) +} + +/// Convenience function. +/// Removes `O_NONBLOCK` attribute for a given message queue descriptor +/// Returns the old attributes +pub fn mq_remove_nonblock(mqd: &MqdT) -> Result<MqAttr> { + let oldattr = mq_getattr(mqd)?; + let newattr = MqAttr::new( + 0, + oldattr.mq_attr.mq_maxmsg, + oldattr.mq_attr.mq_msgsize, + oldattr.mq_attr.mq_curmsgs, + ); + mq_setattr(mqd, &newattr) +} + +#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))] +impl AsFd for MqdT { + /// Borrow the underlying message queue descriptor. + fn as_fd(&self) -> BorrowedFd { + // SAFETY: [MqdT] will only contain a valid fd by construction. + unsafe { BorrowedFd::borrow_raw(self.0) } + } +} + +#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))] +impl AsRawFd for MqdT { + /// Return the underlying message queue descriptor. + /// + /// Returned descriptor is a "shallow copy" of the descriptor, so it refers + /// to the same underlying kernel object as `self`. + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))] +impl FromRawFd for MqdT { + /// Construct an [MqdT] from [RawFd]. + /// + /// # Safety + /// The `fd` given must be a valid and open file descriptor for a message + /// queue. + unsafe fn from_raw_fd(fd: RawFd) -> MqdT { + MqdT(fd) + } +} + +#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))] +impl IntoRawFd for MqdT { + /// Consume this [MqdT] and return a [RawFd]. + fn into_raw_fd(self) -> RawFd { + self.0 + } +} |