summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio/src/deprecated
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/mio/src/deprecated
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio/src/deprecated')
-rw-r--r--third_party/rust/mio/src/deprecated/event_loop.rs346
-rw-r--r--third_party/rust/mio/src/deprecated/handler.rs37
-rw-r--r--third_party/rust/mio/src/deprecated/io.rs28
-rw-r--r--third_party/rust/mio/src/deprecated/mod.rs36
-rw-r--r--third_party/rust/mio/src/deprecated/notify.rs63
-rw-r--r--third_party/rust/mio/src/deprecated/unix.rs420
6 files changed, 930 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/deprecated/event_loop.rs b/third_party/rust/mio/src/deprecated/event_loop.rs
new file mode 100644
index 0000000000..a4c4580b3a
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/event_loop.rs
@@ -0,0 +1,346 @@
+use {channel, Poll, Events, Token};
+use event::Evented;
+use deprecated::{Handler, NotifyError};
+use event_imp::{Event, Ready, PollOpt};
+use timer::{self, Timer, Timeout};
+use std::{io, fmt, usize};
+use std::default::Default;
+use std::time::Duration;
+
+#[derive(Debug, Default, Clone)]
+pub struct EventLoopBuilder {
+ config: Config,
+}
+
+/// `EventLoop` configuration details
+#[derive(Clone, Debug)]
+struct Config {
+ // == Notifications ==
+ notify_capacity: usize,
+ messages_per_tick: usize,
+
+ // == Timer ==
+ timer_tick: Duration,
+ timer_wheel_size: usize,
+ timer_capacity: usize,
+}
+
+impl Default for Config {
+ fn default() -> Config {
+ // Default EventLoop configuration values
+ Config {
+ notify_capacity: 4_096,
+ messages_per_tick: 256,
+ timer_tick: Duration::from_millis(100),
+ timer_wheel_size: 1_024,
+ timer_capacity: 65_536,
+ }
+ }
+}
+
+impl EventLoopBuilder {
+ /// Construct a new `EventLoopBuilder` with the default configuration
+ /// values.
+ pub fn new() -> EventLoopBuilder {
+ EventLoopBuilder::default()
+ }
+
+ /// Sets the maximum number of messages that can be buffered on the event
+ /// loop's notification channel before a send will fail.
+ ///
+ /// The default value for this is 4096.
+ pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self {
+ self.config.notify_capacity = capacity;
+ self
+ }
+
+ /// Sets the maximum number of messages that can be processed on any tick of
+ /// the event loop.
+ ///
+ /// The default value for this is 256.
+ pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self {
+ self.config.messages_per_tick = messages;
+ self
+ }
+
+ pub fn timer_tick(&mut self, val: Duration) -> &mut Self {
+ self.config.timer_tick = val;
+ self
+ }
+
+ pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self {
+ self.config.timer_wheel_size = size;
+ self
+ }
+
+ pub fn timer_capacity(&mut self, cap: usize) -> &mut Self {
+ self.config.timer_capacity = cap;
+ self
+ }
+
+ /// Constructs a new `EventLoop` using the configured values. The
+ /// `EventLoop` will not be running.
+ pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> {
+ EventLoop::configured(self.config)
+ }
+}
+
+/// Single threaded IO event loop.
+pub struct EventLoop<H: Handler> {
+ run: bool,
+ poll: Poll,
+ events: Events,
+ timer: Timer<H::Timeout>,
+ notify_tx: channel::SyncSender<H::Message>,
+ notify_rx: channel::Receiver<H::Message>,
+ config: Config,
+}
+
+// Token used to represent notifications
+const NOTIFY: Token = Token(usize::MAX - 1);
+const TIMER: Token = Token(usize::MAX - 2);
+
+impl<H: Handler> EventLoop<H> {
+
+ /// Constructs a new `EventLoop` using the default configuration values.
+ /// The `EventLoop` will not be running.
+ pub fn new() -> io::Result<EventLoop<H>> {
+ EventLoop::configured(Config::default())
+ }
+
+ fn configured(config: Config) -> io::Result<EventLoop<H>> {
+ // Create the IO poller
+ let poll = Poll::new()?;
+
+ let timer = timer::Builder::default()
+ .tick_duration(config.timer_tick)
+ .num_slots(config.timer_wheel_size)
+ .capacity(config.timer_capacity)
+ .build();
+
+ // Create cross thread notification queue
+ let (tx, rx) = channel::sync_channel(config.notify_capacity);
+
+ // Register the notification wakeup FD with the IO poller
+ poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
+ poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;
+
+ Ok(EventLoop {
+ run: true,
+ poll,
+ timer,
+ notify_tx: tx,
+ notify_rx: rx,
+ config,
+ events: Events::with_capacity(1024),
+ })
+ }
+
+ /// Returns a sender that allows sending messages to the event loop in a
+ /// thread-safe way, waking up the event loop if needed.
+ ///
+ /// # Implementation Details
+ ///
+ /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
+ /// buffer size. The size can be changed by modifying
+ /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
+ /// When a message is sent to the EventLoop, it is first pushed on to the
+ /// queue. Then, if the EventLoop is currently running, an atomic flag is
+ /// set to indicate that the next loop iteration should be started without
+ /// waiting.
+ ///
+ /// If the loop is blocked waiting for IO events, then it is woken up. The
+ /// strategy for waking up the event loop is platform dependent. For
+ /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
+ /// is used.
+ ///
+ /// The strategy of setting an atomic flag if the event loop is not already
+ /// sleeping allows avoiding an expensive wakeup operation if at all possible.
+ pub fn channel(&self) -> Sender<H::Message> {
+ Sender::new(self.notify_tx.clone())
+ }
+
+ /// Schedules a timeout after the requested time interval. When the
+ /// duration has been reached,
+ /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
+ /// passing in the supplied token.
+ ///
+ /// Returns a handle to the timeout that can be used to cancel the timeout
+ /// using [#clear_timeout](#method.clear_timeout).
+ pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
+ self.timer.set_timeout(delay, token)
+ }
+
+ /// If the supplied timeout has not been triggered, cancel it such that it
+ /// will not be triggered in the future.
+ pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
+ self.timer.cancel_timeout(&timeout).is_some()
+ }
+
+ /// Tells the event loop to exit after it is done handling all events in the
+ /// current iteration.
+ pub fn shutdown(&mut self) {
+ self.run = false;
+ }
+
+ /// Indicates whether the event loop is currently running. If it's not it has either
+ /// stopped or is scheduled to stop on the next tick.
+ pub fn is_running(&self) -> bool {
+ self.run
+ }
+
+ /// Registers an IO handle with the event loop.
+ pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
+ where E: Evented
+ {
+ self.poll.register(io, token, interest, opt)
+ }
+
+ /// Re-Registers an IO handle with the event loop.
+ pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
+ where E: Evented
+ {
+ self.poll.reregister(io, token, interest, opt)
+ }
+
+ /// Keep spinning the event loop indefinitely, and notify the handler whenever
+ /// any of the registered handles are ready.
+ pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
+ self.run = true;
+
+ while self.run {
+ // Execute ticks as long as the event loop is running
+ self.run_once(handler, None)?;
+ }
+
+ Ok(())
+ }
+
+ /// Deregisters an IO handle with the event loop.
+ ///
+ /// Both kqueue and epoll will automatically clear any pending events when closing a
+ /// file descriptor (socket). In that case, this method does not need to be called
+ /// prior to dropping a connection from the slab.
+ ///
+ /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
+ /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
+ pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
+ self.poll.deregister(io)
+ }
+
+ /// Spin the event loop once, with a given timeout (forever if `None`),
+ /// and notify the handler if any of the registered handles become ready
+ /// during that time.
+ pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
+ trace!("event loop tick");
+
+ // Check the registered IO handles for any new events. Each poll
+ // is for one second, so a shutdown request can last as long as
+ // one second before it takes effect.
+ let events = match self.io_poll(timeout) {
+ Ok(e) => e,
+ Err(err) => {
+ if err.kind() == io::ErrorKind::Interrupted {
+ handler.interrupted(self);
+ 0
+ } else {
+ return Err(err);
+ }
+ }
+ };
+
+ self.io_process(handler, events);
+ handler.tick(self);
+ Ok(())
+ }
+
+ #[inline]
+ fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
+ self.poll.poll(&mut self.events, timeout)
+ }
+
+ // Process IO events that have been previously polled
+ fn io_process(&mut self, handler: &mut H, cnt: usize) {
+ let mut i = 0;
+
+ trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());
+
+ // Iterate over the notifications. Each event provides the token
+ // it was registered with (which usually represents, at least, the
+ // handle that the event is about) as well as information about
+ // what kind of event occurred (readable, writable, signal, etc.)
+ while i < cnt {
+ let evt = self.events.get(i).unwrap();
+
+ trace!("event={:?}; idx={:?}", evt, i);
+
+ match evt.token() {
+ NOTIFY => self.notify(handler),
+ TIMER => self.timer_process(handler),
+ _ => self.io_event(handler, evt)
+ }
+
+ i += 1;
+ }
+ }
+
+ fn io_event(&mut self, handler: &mut H, evt: Event) {
+ handler.ready(self, evt.token(), evt.readiness());
+ }
+
+ fn notify(&mut self, handler: &mut H) {
+ for _ in 0..self.config.messages_per_tick {
+ match self.notify_rx.try_recv() {
+ Ok(msg) => handler.notify(self, msg),
+ _ => break,
+ }
+ }
+
+ // Re-register
+ let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
+ }
+
+ fn timer_process(&mut self, handler: &mut H) {
+ while let Some(t) = self.timer.poll() {
+ handler.timeout(self, t);
+ }
+ }
+}
+
+impl<H: Handler> fmt::Debug for EventLoop<H> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("EventLoop")
+ .field("run", &self.run)
+ .field("poll", &self.poll)
+ .field("config", &self.config)
+ .finish()
+ }
+}
+
+/// Sends messages to the EventLoop from other threads.
+pub struct Sender<M> {
+ tx: channel::SyncSender<M>
+}
+
+impl<M> fmt::Debug for Sender<M> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "Sender<?> {{ ... }}")
+ }
+}
+
+impl<M> Clone for Sender <M> {
+ fn clone(&self) -> Sender<M> {
+ Sender { tx: self.tx.clone() }
+ }
+}
+
+impl<M> Sender<M> {
+ fn new(tx: channel::SyncSender<M>) -> Sender<M> {
+ Sender { tx }
+ }
+
+ pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
+ self.tx.try_send(msg)?;
+ Ok(())
+ }
+}
diff --git a/third_party/rust/mio/src/deprecated/handler.rs b/third_party/rust/mio/src/deprecated/handler.rs
new file mode 100644
index 0000000000..db1bc314a7
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/handler.rs
@@ -0,0 +1,37 @@
+use {Ready, Token};
+use deprecated::{EventLoop};
+
+#[allow(unused_variables)]
+pub trait Handler: Sized {
+ type Timeout;
+ type Message;
+
+ /// Invoked when the socket represented by `token` is ready to be operated
+ /// on. `events` indicates the specific operations that are
+ /// ready to be performed.
+ ///
+ /// For example, when a TCP socket is ready to be read from, `events` will
+ /// have `readable` set. When the socket is ready to be written to,
+ /// `events` will have `writable` set.
+ ///
+ /// This function will only be invoked a single time per socket per event
+ /// loop tick.
+ fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
+ }
+
+ /// Invoked when a message has been received via the event loop's channel.
+ fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
+ }
+
+ /// Invoked when a timeout has completed.
+ fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
+ }
+
+ /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
+ fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
+ }
+
+ /// Invoked at the end of an event loop tick.
+ fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
+ }
+}
diff --git a/third_party/rust/mio/src/deprecated/io.rs b/third_party/rust/mio/src/deprecated/io.rs
new file mode 100644
index 0000000000..16ff27993b
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/io.rs
@@ -0,0 +1,28 @@
+use ::io::MapNonBlock;
+use std::io::{self, Read, Write};
+
+pub trait TryRead {
+ fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>>;
+}
+
+pub trait TryWrite {
+ fn try_write(&mut self, buf: &[u8]) -> io::Result<Option<usize>>;
+}
+
+impl<T: Read> TryRead for T {
+ fn try_read(&mut self, dst: &mut [u8]) -> io::Result<Option<usize>> {
+ self.read(dst).map_non_block()
+ }
+}
+
+impl<T: Write> TryWrite for T {
+ fn try_write(&mut self, src: &[u8]) -> io::Result<Option<usize>> {
+ self.write(src).map_non_block()
+ }
+}
+
+pub trait TryAccept {
+ type Output;
+
+ fn accept(&self) -> io::Result<Option<Self::Output>>;
+}
diff --git a/third_party/rust/mio/src/deprecated/mod.rs b/third_party/rust/mio/src/deprecated/mod.rs
new file mode 100644
index 0000000000..124a2eee3d
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/mod.rs
@@ -0,0 +1,36 @@
+#![allow(deprecated)]
+
+mod event_loop;
+mod io;
+mod handler;
+mod notify;
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub mod unix;
+
+pub use self::event_loop::{
+ EventLoop,
+ EventLoopBuilder,
+ Sender,
+};
+pub use self::io::{
+ TryAccept,
+ TryRead,
+ TryWrite,
+};
+pub use self::handler::{
+ Handler,
+};
+pub use self::notify::{
+ NotifyError,
+};
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub use self::unix::{
+ pipe,
+ PipeReader,
+ PipeWriter,
+ UnixListener,
+ UnixSocket,
+ UnixStream,
+ Shutdown,
+};
diff --git a/third_party/rust/mio/src/deprecated/notify.rs b/third_party/rust/mio/src/deprecated/notify.rs
new file mode 100644
index 0000000000..c8432d6b0e
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/notify.rs
@@ -0,0 +1,63 @@
+use {channel};
+use std::{fmt, io, error, any};
+
+pub enum NotifyError<T> {
+ Io(io::Error),
+ Full(T),
+ Closed(Option<T>),
+}
+
+impl<M> fmt::Debug for NotifyError<M> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ NotifyError::Io(ref e) => {
+ write!(fmt, "NotifyError::Io({:?})", e)
+ }
+ NotifyError::Full(..) => {
+ write!(fmt, "NotifyError::Full(..)")
+ }
+ NotifyError::Closed(..) => {
+ write!(fmt, "NotifyError::Closed(..)")
+ }
+ }
+ }
+}
+
+impl<M> fmt::Display for NotifyError<M> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ NotifyError::Io(ref e) => {
+ write!(fmt, "IO error: {}", e)
+ }
+ NotifyError::Full(..) => write!(fmt, "Full"),
+ NotifyError::Closed(..) => write!(fmt, "Closed")
+ }
+ }
+}
+
+impl<M: any::Any> error::Error for NotifyError<M> {
+ fn description(&self) -> &str {
+ match *self {
+ NotifyError::Io(ref err) => err.description(),
+ NotifyError::Closed(..) => "The receiving end has hung up",
+ NotifyError::Full(..) => "Queue is full"
+ }
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ match *self {
+ NotifyError::Io(ref err) => Some(err),
+ _ => None
+ }
+ }
+}
+
+impl<M> From<channel::TrySendError<M>> for NotifyError<M> {
+ fn from(src: channel::TrySendError<M>) -> NotifyError<M> {
+ match src {
+ channel::TrySendError::Io(e) => NotifyError::Io(e),
+ channel::TrySendError::Full(v) => NotifyError::Full(v),
+ channel::TrySendError::Disconnected(v) => NotifyError::Closed(Some(v)),
+ }
+ }
+}
diff --git a/third_party/rust/mio/src/deprecated/unix.rs b/third_party/rust/mio/src/deprecated/unix.rs
new file mode 100644
index 0000000000..97c6a60ba4
--- /dev/null
+++ b/third_party/rust/mio/src/deprecated/unix.rs
@@ -0,0 +1,420 @@
+use {io, sys, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use deprecated::TryAccept;
+use io::MapNonBlock;
+use std::io::{Read, Write};
+use std::path::Path;
+pub use std::net::Shutdown;
+use std::process;
+
+pub use sys::Io;
+
+#[derive(Debug)]
+pub struct UnixSocket {
+ sys: sys::UnixSocket,
+}
+
+impl UnixSocket {
+ /// Returns a new, unbound, non-blocking Unix domain socket
+ pub fn stream() -> io::Result<UnixSocket> {
+ sys::UnixSocket::stream()
+ .map(From::from)
+ }
+
+ /// Connect the socket to the specified address
+ pub fn connect<P: AsRef<Path> + ?Sized>(self, addr: &P) -> io::Result<(UnixStream, bool)> {
+ let complete = match self.sys.connect(addr) {
+ Ok(()) => true,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => false,
+ Err(e) => return Err(e),
+ };
+ Ok((From::from(self.sys), complete))
+ }
+
+ /// Bind the socket to the specified address
+ pub fn bind<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> {
+ self.sys.bind(addr)
+ }
+
+ /// Listen for incoming requests
+ pub fn listen(self, backlog: usize) -> io::Result<UnixListener> {
+ self.sys.listen(backlog)?;
+ Ok(From::from(self.sys))
+ }
+
+ pub fn try_clone(&self) -> io::Result<UnixSocket> {
+ self.sys.try_clone()
+ .map(From::from)
+ }
+}
+
+impl Evented for UnixSocket {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.sys.deregister(poll)
+ }
+}
+
+impl From<sys::UnixSocket> for UnixSocket {
+ fn from(sys: sys::UnixSocket) -> UnixSocket {
+ UnixSocket { sys }
+ }
+}
+
+/*
+ *
+ * ===== UnixStream =====
+ *
+ */
+
+#[derive(Debug)]
+pub struct UnixStream {
+ sys: sys::UnixSocket,
+}
+
+impl UnixStream {
+ pub fn connect<P: AsRef<Path> + ?Sized>(path: &P) -> io::Result<UnixStream> {
+ UnixSocket::stream()
+ .and_then(|sock| sock.connect(path))
+ .map(|(sock, _)| sock)
+ }
+
+ pub fn try_clone(&self) -> io::Result<UnixStream> {
+ self.sys.try_clone()
+ .map(From::from)
+ }
+
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<usize> {
+ self.sys.shutdown(how).map(|_| 0)
+ }
+
+ pub fn read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+ self.sys.read_recv_fd(buf)
+ }
+
+ pub fn try_read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<Option<(usize, Option<RawFd>)>> {
+ self.read_recv_fd(buf).map_non_block()
+ }
+
+ pub fn write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<usize> {
+ self.sys.write_send_fd(buf, fd)
+ }
+
+ pub fn try_write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<Option<usize>> {
+ self.write_send_fd(buf, fd).map_non_block()
+ }
+}
+
+impl Read for UnixStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.sys.read(buf)
+ }
+}
+
+impl Write for UnixStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.sys.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.sys.flush()
+ }
+}
+
+impl Evented for UnixStream {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.sys.deregister(poll)
+ }
+}
+
+impl From<sys::UnixSocket> for UnixStream {
+ fn from(sys: sys::UnixSocket) -> UnixStream {
+ UnixStream { sys }
+ }
+}
+
+/*
+ *
+ * ===== UnixListener =====
+ *
+ */
+
+#[derive(Debug)]
+pub struct UnixListener {
+ sys: sys::UnixSocket,
+}
+
+impl UnixListener {
+ pub fn bind<P: AsRef<Path> + ?Sized>(addr: &P) -> io::Result<UnixListener> {
+ UnixSocket::stream().and_then(|sock| {
+ sock.bind(addr)?;
+ sock.listen(256)
+ })
+ }
+
+ pub fn accept(&self) -> io::Result<UnixStream> {
+ self.sys.accept().map(From::from)
+ }
+
+ pub fn try_clone(&self) -> io::Result<UnixListener> {
+ self.sys.try_clone().map(From::from)
+ }
+}
+
+impl Evented for UnixListener {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.sys.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.sys.deregister(poll)
+ }
+}
+
+impl TryAccept for UnixListener {
+ type Output = UnixStream;
+
+ fn accept(&self) -> io::Result<Option<UnixStream>> {
+ UnixListener::accept(self).map_non_block()
+ }
+}
+
+impl From<sys::UnixSocket> for UnixListener {
+ fn from(sys: sys::UnixSocket) -> UnixListener {
+ UnixListener { sys }
+ }
+}
+
+/*
+ *
+ * ===== Pipe =====
+ *
+ */
+
+pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> {
+ let (rd, wr) = sys::pipe()?;
+ Ok((From::from(rd), From::from(wr)))
+}
+
+#[derive(Debug)]
+pub struct PipeReader {
+ io: Io,
+}
+
+impl PipeReader {
+ pub fn from_stdout(stdout: process::ChildStdout) -> io::Result<Self> {
+ if let Err(e) = sys::set_nonblock(stdout.as_raw_fd()) {
+ return Err(e);
+ }
+ Ok(PipeReader::from(unsafe { Io::from_raw_fd(stdout.into_raw_fd()) }))
+ }
+ pub fn from_stderr(stderr: process::ChildStderr) -> io::Result<Self> {
+ if let Err(e) = sys::set_nonblock(stderr.as_raw_fd()) {
+ return Err(e);
+ }
+ Ok(PipeReader::from(unsafe { Io::from_raw_fd(stderr.into_raw_fd()) }))
+ }
+}
+
+impl Read for PipeReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.read(buf)
+ }
+}
+
+impl<'a> Read for &'a PipeReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (&self.io).read(buf)
+ }
+}
+
+impl Evented for PipeReader {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.io.deregister(poll)
+ }
+}
+
+impl From<Io> for PipeReader {
+ fn from(io: Io) -> PipeReader {
+ PipeReader { io }
+ }
+}
+
+#[derive(Debug)]
+pub struct PipeWriter {
+ io: Io,
+}
+
+impl PipeWriter {
+ pub fn from_stdin(stdin: process::ChildStdin) -> io::Result<Self> {
+ if let Err(e) = sys::set_nonblock(stdin.as_raw_fd()) {
+ return Err(e);
+ }
+ Ok(PipeWriter::from(unsafe { Io::from_raw_fd(stdin.into_raw_fd()) }))
+ }
+}
+
+impl Write for PipeWriter {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.io.flush()
+ }
+}
+
+impl<'a> Write for &'a PipeWriter {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (&self.io).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.io).flush()
+ }
+}
+
+impl Evented for PipeWriter {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.io.deregister(poll)
+ }
+}
+
+impl From<Io> for PipeWriter {
+ fn from(io: Io) -> PipeWriter {
+ PipeWriter { io }
+ }
+}
+
+/*
+ *
+ * ===== Conversions =====
+ *
+ */
+
+use std::os::unix::io::{RawFd, IntoRawFd, AsRawFd, FromRawFd};
+
+impl IntoRawFd for UnixSocket {
+ fn into_raw_fd(self) -> RawFd {
+ self.sys.into_raw_fd()
+ }
+}
+
+impl AsRawFd for UnixSocket {
+ fn as_raw_fd(&self) -> RawFd {
+ self.sys.as_raw_fd()
+ }
+}
+
+impl FromRawFd for UnixSocket {
+ unsafe fn from_raw_fd(fd: RawFd) -> UnixSocket {
+ UnixSocket { sys: FromRawFd::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for UnixStream {
+ fn into_raw_fd(self) -> RawFd {
+ self.sys.into_raw_fd()
+ }
+}
+
+impl AsRawFd for UnixStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.sys.as_raw_fd()
+ }
+}
+
+impl FromRawFd for UnixStream {
+ unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
+ UnixStream { sys: FromRawFd::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for UnixListener {
+ fn into_raw_fd(self) -> RawFd {
+ self.sys.into_raw_fd()
+ }
+}
+
+impl AsRawFd for UnixListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.sys.as_raw_fd()
+ }
+}
+
+impl FromRawFd for UnixListener {
+ unsafe fn from_raw_fd(fd: RawFd) -> UnixListener {
+ UnixListener { sys: FromRawFd::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for PipeReader {
+ fn into_raw_fd(self) -> RawFd {
+ self.io.into_raw_fd()
+ }
+}
+
+impl AsRawFd for PipeReader {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
+
+impl FromRawFd for PipeReader {
+ unsafe fn from_raw_fd(fd: RawFd) -> PipeReader {
+ PipeReader { io: FromRawFd::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for PipeWriter {
+ fn into_raw_fd(self) -> RawFd {
+ self.io.into_raw_fd()
+ }
+}
+
+impl AsRawFd for PipeWriter {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
+
+impl FromRawFd for PipeWriter {
+ unsafe fn from_raw_fd(fd: RawFd) -> PipeWriter {
+ PipeWriter { io: FromRawFd::from_raw_fd(fd) }
+ }
+}