diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/mio/src/deprecated | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio/src/deprecated')
-rw-r--r-- | third_party/rust/mio/src/deprecated/event_loop.rs | 346 | ||||
-rw-r--r-- | third_party/rust/mio/src/deprecated/handler.rs | 37 | ||||
-rw-r--r-- | third_party/rust/mio/src/deprecated/io.rs | 28 | ||||
-rw-r--r-- | third_party/rust/mio/src/deprecated/mod.rs | 36 | ||||
-rw-r--r-- | third_party/rust/mio/src/deprecated/notify.rs | 63 | ||||
-rw-r--r-- | third_party/rust/mio/src/deprecated/unix.rs | 420 |
6 files changed, 930 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/deprecated/event_loop.rs b/third_party/rust/mio/src/deprecated/event_loop.rs new file mode 100644 index 0000000000..a4c4580b3a --- /dev/null +++ b/third_party/rust/mio/src/deprecated/event_loop.rs @@ -0,0 +1,346 @@ +use {channel, Poll, Events, Token}; +use event::Evented; +use deprecated::{Handler, NotifyError}; +use event_imp::{Event, Ready, PollOpt}; +use timer::{self, Timer, Timeout}; +use std::{io, fmt, usize}; +use std::default::Default; +use std::time::Duration; + +#[derive(Debug, Default, Clone)] +pub struct EventLoopBuilder { + config: Config, +} + +/// `EventLoop` configuration details +#[derive(Clone, Debug)] +struct Config { + // == Notifications == + notify_capacity: usize, + messages_per_tick: usize, + + // == Timer == + timer_tick: Duration, + timer_wheel_size: usize, + timer_capacity: usize, +} + +impl Default for Config { + fn default() -> Config { + // Default EventLoop configuration values + Config { + notify_capacity: 4_096, + messages_per_tick: 256, + timer_tick: Duration::from_millis(100), + timer_wheel_size: 1_024, + timer_capacity: 65_536, + } + } +} + +impl EventLoopBuilder { + /// Construct a new `EventLoopBuilder` with the default configuration + /// values. + pub fn new() -> EventLoopBuilder { + EventLoopBuilder::default() + } + + /// Sets the maximum number of messages that can be buffered on the event + /// loop's notification channel before a send will fail. + /// + /// The default value for this is 4096. + pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self { + self.config.notify_capacity = capacity; + self + } + + /// Sets the maximum number of messages that can be processed on any tick of + /// the event loop. + /// + /// The default value for this is 256. + pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self { + self.config.messages_per_tick = messages; + self + } + + pub fn timer_tick(&mut self, val: Duration) -> &mut Self { + self.config.timer_tick = val; + self + } + + pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self { + self.config.timer_wheel_size = size; + self + } + + pub fn timer_capacity(&mut self, cap: usize) -> &mut Self { + self.config.timer_capacity = cap; + self + } + + /// Constructs a new `EventLoop` using the configured values. The + /// `EventLoop` will not be running. + pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> { + EventLoop::configured(self.config) + } +} + +/// Single threaded IO event loop. +pub struct EventLoop<H: Handler> { + run: bool, + poll: Poll, + events: Events, + timer: Timer<H::Timeout>, + notify_tx: channel::SyncSender<H::Message>, + notify_rx: channel::Receiver<H::Message>, + config: Config, +} + +// Token used to represent notifications +const NOTIFY: Token = Token(usize::MAX - 1); +const TIMER: Token = Token(usize::MAX - 2); + +impl<H: Handler> EventLoop<H> { + + /// Constructs a new `EventLoop` using the default configuration values. + /// The `EventLoop` will not be running. + pub fn new() -> io::Result<EventLoop<H>> { + EventLoop::configured(Config::default()) + } + + fn configured(config: Config) -> io::Result<EventLoop<H>> { + // Create the IO poller + let poll = Poll::new()?; + + let timer = timer::Builder::default() + .tick_duration(config.timer_tick) + .num_slots(config.timer_wheel_size) + .capacity(config.timer_capacity) + .build(); + + // Create cross thread notification queue + let (tx, rx) = channel::sync_channel(config.notify_capacity); + + // Register the notification wakeup FD with the IO poller + poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?; + poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?; + + Ok(EventLoop { + run: true, + poll, + timer, + notify_tx: tx, + notify_rx: rx, + config, + events: Events::with_capacity(1024), + }) + } + + /// Returns a sender that allows sending messages to the event loop in a + /// thread-safe way, waking up the event loop if needed. + /// + /// # Implementation Details + /// + /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated + /// buffer size. The size can be changed by modifying + /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity). + /// When a message is sent to the EventLoop, it is first pushed on to the + /// queue. Then, if the EventLoop is currently running, an atomic flag is + /// set to indicate that the next loop iteration should be started without + /// waiting. + /// + /// If the loop is blocked waiting for IO events, then it is woken up. The + /// strategy for waking up the event loop is platform dependent. For + /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe + /// is used. + /// + /// The strategy of setting an atomic flag if the event loop is not already + /// sleeping allows avoiding an expensive wakeup operation if at all possible. + pub fn channel(&self) -> Sender<H::Message> { + Sender::new(self.notify_tx.clone()) + } + + /// Schedules a timeout after the requested time interval. When the + /// duration has been reached, + /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked + /// passing in the supplied token. + /// + /// Returns a handle to the timeout that can be used to cancel the timeout + /// using [#clear_timeout](#method.clear_timeout). + pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> { + self.timer.set_timeout(delay, token) + } + + /// If the supplied timeout has not been triggered, cancel it such that it + /// will not be triggered in the future. + pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool { + self.timer.cancel_timeout(&timeout).is_some() + } + + /// Tells the event loop to exit after it is done handling all events in the + /// current iteration. + pub fn shutdown(&mut self) { + self.run = false; + } + + /// Indicates whether the event loop is currently running. If it's not it has either + /// stopped or is scheduled to stop on the next tick. + pub fn is_running(&self) -> bool { + self.run + } + + /// Registers an IO handle with the event loop. + pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> + where E: Evented + { + self.poll.register(io, token, interest, opt) + } + + /// Re-Registers an IO handle with the event loop. + pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> + where E: Evented + { + self.poll.reregister(io, token, interest, opt) + } + + /// Keep spinning the event loop indefinitely, and notify the handler whenever + /// any of the registered handles are ready. + pub fn run(&mut self, handler: &mut H) -> io::Result<()> { + self.run = true; + + while self.run { + // Execute ticks as long as the event loop is running + self.run_once(handler, None)?; + } + + Ok(()) + } + + /// Deregisters an IO handle with the event loop. + /// + /// Both kqueue and epoll will automatically clear any pending events when closing a + /// file descriptor (socket). In that case, this method does not need to be called + /// prior to dropping a connection from the slab. + /// + /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with + /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error. + pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented { + self.poll.deregister(io) + } + + /// Spin the event loop once, with a given timeout (forever if `None`), + /// and notify the handler if any of the registered handles become ready + /// during that time. + pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> { + trace!("event loop tick"); + + // Check the registered IO handles for any new events. Each poll + // is for one second, so a shutdown request can last as long as + // one second before it takes effect. + let events = match self.io_poll(timeout) { + Ok(e) => e, + Err(err) => { + if err.kind() == io::ErrorKind::Interrupted { + handler.interrupted(self); + 0 + } else { + return Err(err); + } + } + }; + + self.io_process(handler, events); + handler.tick(self); + Ok(()) + } + + #[inline] + fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> { + self.poll.poll(&mut self.events, timeout) + } + + // Process IO events that have been previously polled + fn io_process(&mut self, handler: &mut H, cnt: usize) { + let mut i = 0; + + trace!("io_process(..); cnt={}; len={}", cnt, self.events.len()); + + // Iterate over the notifications. Each event provides the token + // it was registered with (which usually represents, at least, the + // handle that the event is about) as well as information about + // what kind of event occurred (readable, writable, signal, etc.) + while i < cnt { + let evt = self.events.get(i).unwrap(); + + trace!("event={:?}; idx={:?}", evt, i); + + match evt.token() { + NOTIFY => self.notify(handler), + TIMER => self.timer_process(handler), + _ => self.io_event(handler, evt) + } + + i += 1; + } + } + + fn io_event(&mut self, handler: &mut H, evt: Event) { + handler.ready(self, evt.token(), evt.readiness()); + } + + fn notify(&mut self, handler: &mut H) { + for _ in 0..self.config.messages_per_tick { + match self.notify_rx.try_recv() { + Ok(msg) => handler.notify(self, msg), + _ => break, + } + } + + // Re-register + let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()); + } + + fn timer_process(&mut self, handler: &mut H) { + while let Some(t) = self.timer.poll() { + handler.timeout(self, t); + } + } +} + +impl<H: Handler> fmt::Debug for EventLoop<H> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("EventLoop") + .field("run", &self.run) + .field("poll", &self.poll) + .field("config", &self.config) + .finish() + } +} + +/// Sends messages to the EventLoop from other threads. +pub struct Sender<M> { + tx: channel::SyncSender<M> +} + +impl<M> fmt::Debug for Sender<M> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Sender<?> {{ ... }}") + } +} + +impl<M> Clone for Sender <M> { + fn clone(&self) -> Sender<M> { + Sender { tx: self.tx.clone() } + } +} + +impl<M> Sender<M> { + fn new(tx: channel::SyncSender<M>) -> Sender<M> { + Sender { tx } + } + + pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> { + self.tx.try_send(msg)?; + Ok(()) + } +} diff --git a/third_party/rust/mio/src/deprecated/handler.rs b/third_party/rust/mio/src/deprecated/handler.rs new file mode 100644 index 0000000000..db1bc314a7 --- /dev/null +++ b/third_party/rust/mio/src/deprecated/handler.rs @@ -0,0 +1,37 @@ +use {Ready, Token}; +use deprecated::{EventLoop}; + +#[allow(unused_variables)] +pub trait Handler: Sized { + type Timeout; + type Message; + + /// Invoked when the socket represented by `token` is ready to be operated + /// on. `events` indicates the specific operations that are + /// ready to be performed. + /// + /// For example, when a TCP socket is ready to be read from, `events` will + /// have `readable` set. When the socket is ready to be written to, + /// `events` will have `writable` set. + /// + /// This function will only be invoked a single time per socket per event + /// loop tick. + fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) { + } + + /// Invoked when a message has been received via the event loop's channel. + fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) { + } + + /// Invoked when a timeout has completed. + fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) { + } + + /// Invoked when `EventLoop` has been interrupted by a signal interrupt. + fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) { + } + + /// Invoked at the end of an event loop tick. + fn tick(&mut self, event_loop: &mut EventLoop<Self>) { + } +} diff --git a/third_party/rust/mio/src/deprecated/io.rs b/third_party/rust/mio/src/deprecated/io.rs new file mode 100644 index 0000000000..16ff27993b --- /dev/null +++ b/third_party/rust/mio/src/deprecated/io.rs @@ -0,0 +1,28 @@ +use ::io::MapNonBlock; +use std::io::{self, Read, Write}; + +pub trait TryRead { + fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>>; +} + +pub trait TryWrite { + fn try_write(&mut self, buf: &[u8]) -> io::Result<Option<usize>>; +} + +impl<T: Read> TryRead for T { + fn try_read(&mut self, dst: &mut [u8]) -> io::Result<Option<usize>> { + self.read(dst).map_non_block() + } +} + +impl<T: Write> TryWrite for T { + fn try_write(&mut self, src: &[u8]) -> io::Result<Option<usize>> { + self.write(src).map_non_block() + } +} + +pub trait TryAccept { + type Output; + + fn accept(&self) -> io::Result<Option<Self::Output>>; +} diff --git a/third_party/rust/mio/src/deprecated/mod.rs b/third_party/rust/mio/src/deprecated/mod.rs new file mode 100644 index 0000000000..124a2eee3d --- /dev/null +++ b/third_party/rust/mio/src/deprecated/mod.rs @@ -0,0 +1,36 @@ +#![allow(deprecated)] + +mod event_loop; +mod io; +mod handler; +mod notify; + +#[cfg(all(unix, not(target_os = "fuchsia")))] +pub mod unix; + +pub use self::event_loop::{ + EventLoop, + EventLoopBuilder, + Sender, +}; +pub use self::io::{ + TryAccept, + TryRead, + TryWrite, +}; +pub use self::handler::{ + Handler, +}; +pub use self::notify::{ + NotifyError, +}; +#[cfg(all(unix, not(target_os = "fuchsia")))] +pub use self::unix::{ + pipe, + PipeReader, + PipeWriter, + UnixListener, + UnixSocket, + UnixStream, + Shutdown, +}; diff --git a/third_party/rust/mio/src/deprecated/notify.rs b/third_party/rust/mio/src/deprecated/notify.rs new file mode 100644 index 0000000000..c8432d6b0e --- /dev/null +++ b/third_party/rust/mio/src/deprecated/notify.rs @@ -0,0 +1,63 @@ +use {channel}; +use std::{fmt, io, error, any}; + +pub enum NotifyError<T> { + Io(io::Error), + Full(T), + Closed(Option<T>), +} + +impl<M> fmt::Debug for NotifyError<M> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + NotifyError::Io(ref e) => { + write!(fmt, "NotifyError::Io({:?})", e) + } + NotifyError::Full(..) => { + write!(fmt, "NotifyError::Full(..)") + } + NotifyError::Closed(..) => { + write!(fmt, "NotifyError::Closed(..)") + } + } + } +} + +impl<M> fmt::Display for NotifyError<M> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + NotifyError::Io(ref e) => { + write!(fmt, "IO error: {}", e) + } + NotifyError::Full(..) => write!(fmt, "Full"), + NotifyError::Closed(..) => write!(fmt, "Closed") + } + } +} + +impl<M: any::Any> error::Error for NotifyError<M> { + fn description(&self) -> &str { + match *self { + NotifyError::Io(ref err) => err.description(), + NotifyError::Closed(..) => "The receiving end has hung up", + NotifyError::Full(..) => "Queue is full" + } + } + + fn cause(&self) -> Option<&error::Error> { + match *self { + NotifyError::Io(ref err) => Some(err), + _ => None + } + } +} + +impl<M> From<channel::TrySendError<M>> for NotifyError<M> { + fn from(src: channel::TrySendError<M>) -> NotifyError<M> { + match src { + channel::TrySendError::Io(e) => NotifyError::Io(e), + channel::TrySendError::Full(v) => NotifyError::Full(v), + channel::TrySendError::Disconnected(v) => NotifyError::Closed(Some(v)), + } + } +} diff --git a/third_party/rust/mio/src/deprecated/unix.rs b/third_party/rust/mio/src/deprecated/unix.rs new file mode 100644 index 0000000000..97c6a60ba4 --- /dev/null +++ b/third_party/rust/mio/src/deprecated/unix.rs @@ -0,0 +1,420 @@ +use {io, sys, Ready, Poll, PollOpt, Token}; +use event::Evented; +use deprecated::TryAccept; +use io::MapNonBlock; +use std::io::{Read, Write}; +use std::path::Path; +pub use std::net::Shutdown; +use std::process; + +pub use sys::Io; + +#[derive(Debug)] +pub struct UnixSocket { + sys: sys::UnixSocket, +} + +impl UnixSocket { + /// Returns a new, unbound, non-blocking Unix domain socket + pub fn stream() -> io::Result<UnixSocket> { + sys::UnixSocket::stream() + .map(From::from) + } + + /// Connect the socket to the specified address + pub fn connect<P: AsRef<Path> + ?Sized>(self, addr: &P) -> io::Result<(UnixStream, bool)> { + let complete = match self.sys.connect(addr) { + Ok(()) => true, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => false, + Err(e) => return Err(e), + }; + Ok((From::from(self.sys), complete)) + } + + /// Bind the socket to the specified address + pub fn bind<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> { + self.sys.bind(addr) + } + + /// Listen for incoming requests + pub fn listen(self, backlog: usize) -> io::Result<UnixListener> { + self.sys.listen(backlog)?; + Ok(From::from(self.sys)) + } + + pub fn try_clone(&self) -> io::Result<UnixSocket> { + self.sys.try_clone() + .map(From::from) + } +} + +impl Evented for UnixSocket { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.sys.deregister(poll) + } +} + +impl From<sys::UnixSocket> for UnixSocket { + fn from(sys: sys::UnixSocket) -> UnixSocket { + UnixSocket { sys } + } +} + +/* + * + * ===== UnixStream ===== + * + */ + +#[derive(Debug)] +pub struct UnixStream { + sys: sys::UnixSocket, +} + +impl UnixStream { + pub fn connect<P: AsRef<Path> + ?Sized>(path: &P) -> io::Result<UnixStream> { + UnixSocket::stream() + .and_then(|sock| sock.connect(path)) + .map(|(sock, _)| sock) + } + + pub fn try_clone(&self) -> io::Result<UnixStream> { + self.sys.try_clone() + .map(From::from) + } + + pub fn shutdown(&self, how: Shutdown) -> io::Result<usize> { + self.sys.shutdown(how).map(|_| 0) + } + + pub fn read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> { + self.sys.read_recv_fd(buf) + } + + pub fn try_read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<Option<(usize, Option<RawFd>)>> { + self.read_recv_fd(buf).map_non_block() + } + + pub fn write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<usize> { + self.sys.write_send_fd(buf, fd) + } + + pub fn try_write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<Option<usize>> { + self.write_send_fd(buf, fd).map_non_block() + } +} + +impl Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.sys.read(buf) + } +} + +impl Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.sys.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.sys.flush() + } +} + +impl Evented for UnixStream { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.sys.deregister(poll) + } +} + +impl From<sys::UnixSocket> for UnixStream { + fn from(sys: sys::UnixSocket) -> UnixStream { + UnixStream { sys } + } +} + +/* + * + * ===== UnixListener ===== + * + */ + +#[derive(Debug)] +pub struct UnixListener { + sys: sys::UnixSocket, +} + +impl UnixListener { + pub fn bind<P: AsRef<Path> + ?Sized>(addr: &P) -> io::Result<UnixListener> { + UnixSocket::stream().and_then(|sock| { + sock.bind(addr)?; + sock.listen(256) + }) + } + + pub fn accept(&self) -> io::Result<UnixStream> { + self.sys.accept().map(From::from) + } + + pub fn try_clone(&self) -> io::Result<UnixListener> { + self.sys.try_clone().map(From::from) + } +} + +impl Evented for UnixListener { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.sys.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.sys.deregister(poll) + } +} + +impl TryAccept for UnixListener { + type Output = UnixStream; + + fn accept(&self) -> io::Result<Option<UnixStream>> { + UnixListener::accept(self).map_non_block() + } +} + +impl From<sys::UnixSocket> for UnixListener { + fn from(sys: sys::UnixSocket) -> UnixListener { + UnixListener { sys } + } +} + +/* + * + * ===== Pipe ===== + * + */ + +pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> { + let (rd, wr) = sys::pipe()?; + Ok((From::from(rd), From::from(wr))) +} + +#[derive(Debug)] +pub struct PipeReader { + io: Io, +} + +impl PipeReader { + pub fn from_stdout(stdout: process::ChildStdout) -> io::Result<Self> { + if let Err(e) = sys::set_nonblock(stdout.as_raw_fd()) { + return Err(e); + } + Ok(PipeReader::from(unsafe { Io::from_raw_fd(stdout.into_raw_fd()) })) + } + pub fn from_stderr(stderr: process::ChildStderr) -> io::Result<Self> { + if let Err(e) = sys::set_nonblock(stderr.as_raw_fd()) { + return Err(e); + } + Ok(PipeReader::from(unsafe { Io::from_raw_fd(stderr.into_raw_fd()) })) + } +} + +impl Read for PipeReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.read(buf) + } +} + +impl<'a> Read for &'a PipeReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (&self.io).read(buf) + } +} + +impl Evented for PipeReader { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.io.deregister(poll) + } +} + +impl From<Io> for PipeReader { + fn from(io: Io) -> PipeReader { + PipeReader { io } + } +} + +#[derive(Debug)] +pub struct PipeWriter { + io: Io, +} + +impl PipeWriter { + pub fn from_stdin(stdin: process::ChildStdin) -> io::Result<Self> { + if let Err(e) = sys::set_nonblock(stdin.as_raw_fd()) { + return Err(e); + } + Ok(PipeWriter::from(unsafe { Io::from_raw_fd(stdin.into_raw_fd()) })) + } +} + +impl Write for PipeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl<'a> Write for &'a PipeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl Evented for PipeWriter { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.io.deregister(poll) + } +} + +impl From<Io> for PipeWriter { + fn from(io: Io) -> PipeWriter { + PipeWriter { io } + } +} + +/* + * + * ===== Conversions ===== + * + */ + +use std::os::unix::io::{RawFd, IntoRawFd, AsRawFd, FromRawFd}; + +impl IntoRawFd for UnixSocket { + fn into_raw_fd(self) -> RawFd { + self.sys.into_raw_fd() + } +} + +impl AsRawFd for UnixSocket { + fn as_raw_fd(&self) -> RawFd { + self.sys.as_raw_fd() + } +} + +impl FromRawFd for UnixSocket { + unsafe fn from_raw_fd(fd: RawFd) -> UnixSocket { + UnixSocket { sys: FromRawFd::from_raw_fd(fd) } + } +} + +impl IntoRawFd for UnixStream { + fn into_raw_fd(self) -> RawFd { + self.sys.into_raw_fd() + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.sys.as_raw_fd() + } +} + +impl FromRawFd for UnixStream { + unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { + UnixStream { sys: FromRawFd::from_raw_fd(fd) } + } +} + +impl IntoRawFd for UnixListener { + fn into_raw_fd(self) -> RawFd { + self.sys.into_raw_fd() + } +} + +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.sys.as_raw_fd() + } +} + +impl FromRawFd for UnixListener { + unsafe fn from_raw_fd(fd: RawFd) -> UnixListener { + UnixListener { sys: FromRawFd::from_raw_fd(fd) } + } +} + +impl IntoRawFd for PipeReader { + fn into_raw_fd(self) -> RawFd { + self.io.into_raw_fd() + } +} + +impl AsRawFd for PipeReader { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +impl FromRawFd for PipeReader { + unsafe fn from_raw_fd(fd: RawFd) -> PipeReader { + PipeReader { io: FromRawFd::from_raw_fd(fd) } + } +} + +impl IntoRawFd for PipeWriter { + fn into_raw_fd(self) -> RawFd { + self.io.into_raw_fd() + } +} + +impl AsRawFd for PipeWriter { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +impl FromRawFd for PipeWriter { + unsafe fn from_raw_fd(fd: RawFd) -> PipeWriter { + PipeWriter { io: FromRawFd::from_raw_fd(fd) } + } +} |