//! Bindings to IOCP, I/O Completion Ports

use std::cmp;
use std::fmt;
use std::io;
use std::mem;
use std::os::windows::io::*;
use std::time::Duration;

use crate::handle::Handle;
use crate::Overlapped;
use winapi::shared::basetsd::*;
use winapi::shared::ntdef::*;
use winapi::um::handleapi::*;
use winapi::um::ioapiset::*;
use winapi::um::minwinbase::*;

/// A handle to an Windows I/O Completion Port.
#[derive(Debug)]
pub struct CompletionPort {
    handle: Handle,
}

/// A status message received from an I/O completion port.
///
/// These statuses can be created via the `new` or `empty` constructors and then
/// provided to a completion port, or they are read out of a completion port.
/// The fields of each status are read through its accessor methods.
#[derive(Clone, Copy)]
pub struct CompletionStatus(OVERLAPPED_ENTRY);

impl fmt::Debug for CompletionStatus {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
    }
}

unsafe impl Send for CompletionStatus {}
unsafe impl Sync for CompletionStatus {}

impl CompletionPort {
    /// Creates a new I/O completion port with the specified concurrency value.
    ///
    /// The number of threads given corresponds to the level of concurrency
    /// allowed for threads associated with this port. Consult the Windows
    /// documentation for more information about this value.
    pub fn new(threads: u32) -> io::Result<CompletionPort> {
        let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, 0, threads) };
        if ret.is_null() {
            Err(io::Error::last_os_error())
        } else {
            Ok(CompletionPort {
                handle: Handle::new(ret),
            })
        }
    }

    /// Associates a new `HANDLE` to this I/O completion port.
    ///
    /// This function will associate the given handle to this port with the
    /// given `token` to be returned in status messages whenever it receives a
    /// notification.
    ///
    /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
    /// trait can be provided to this function, such as `std::fs::File` and
    /// friends.
    pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
        self._add(token, t.as_raw_handle())
    }

    /// Associates a new `SOCKET` to this I/O completion port.
    ///
    /// This function will associate the given socket to this port with the
    /// given `token` to be returned in status messages whenever it receives a
    /// notification.
    ///
    /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
    /// trait can be provided to this function, such as `std::net::TcpStream`
    /// and friends.
    pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
        self._add(token, t.as_raw_socket() as HANDLE)
    }

    fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
        assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
        let ret =
            unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as ULONG_PTR, 0) };
        if ret.is_null() {
            Err(io::Error::last_os_error())
        } else {
            debug_assert_eq!(ret, self.handle.raw());
            Ok(())
        }
    }

    /// Dequeue a completion status from this I/O completion port.
    ///
    /// This function will associate the calling thread with this completion
    /// port and then wait for a status message to become available. The precise
    /// semantics on when this function returns depends on the concurrency value
    /// specified when the port was created.
    ///
    /// A timeout can optionally be specified to this function. If `None` is
    /// provided this function will not time out, and otherwise it will time out
    /// after the specified duration has passed.
    ///
    /// On success this will return the status message which was dequeued from
    /// this completion port.
    pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
        let mut bytes = 0;
        let mut token = 0;
        let mut overlapped = 0 as *mut _;
        let timeout = crate::dur2ms(timeout);
        let ret = unsafe {
            GetQueuedCompletionStatus(
                self.handle.raw(),
                &mut bytes,
                &mut token,
                &mut overlapped,
                timeout,
            )
        };
        crate::cvt(ret).map(|_| {
            CompletionStatus(OVERLAPPED_ENTRY {
                dwNumberOfBytesTransferred: bytes,
                lpCompletionKey: token,
                lpOverlapped: overlapped,
                Internal: 0,
            })
        })
    }

    /// Dequeues a number of completion statuses from this I/O completion port.
    ///
    /// This function is the same as `get` except that it may return more than
    /// one status. A buffer of "zero" statuses is provided (the contents are
    /// not read) and then on success this function will return a sub-slice of
    /// statuses which represent those which were dequeued from this port. This
    /// function does not wait to fill up the entire list of statuses provided.
    ///
    /// Like with `get`, a timeout may be specified for this operation.
    pub fn get_many<'a>(
        &self,
        list: &'a mut [CompletionStatus],
        timeout: Option<Duration>,
    ) -> io::Result<&'a mut [CompletionStatus]> {
        debug_assert_eq!(
            mem::size_of::<CompletionStatus>(),
            mem::size_of::<OVERLAPPED_ENTRY>()
        );
        let mut removed = 0;
        let timeout = crate::dur2ms(timeout);
        let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG;
        let ret = unsafe {
            GetQueuedCompletionStatusEx(
                self.handle.raw(),
                list.as_ptr() as *mut _,
                len,
                &mut removed,
                timeout,
                FALSE as i32,
            )
        };
        match crate::cvt(ret) {
            Ok(_) => Ok(&mut list[..removed as usize]),
            Err(e) => Err(e),
        }
    }

    /// Posts a new completion status onto this I/O completion port.
    ///
    /// This function will post the given status, with custom parameters, to the
    /// port. Threads blocked in `get` or `get_many` will eventually receive
    /// this status.
    pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
        let ret = unsafe {
            PostQueuedCompletionStatus(
                self.handle.raw(),
                status.0.dwNumberOfBytesTransferred,
                status.0.lpCompletionKey,
                status.0.lpOverlapped,
            )
        };
        crate::cvt(ret).map(|_| ())
    }
}

impl AsRawHandle for CompletionPort {
    fn as_raw_handle(&self) -> HANDLE {
        self.handle.raw()
    }
}

impl FromRawHandle for CompletionPort {
    unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
        CompletionPort {
            handle: Handle::new(handle),
        }
    }
}

impl IntoRawHandle for CompletionPort {
    fn into_raw_handle(self) -> HANDLE {
        self.handle.into_raw()
    }
}

impl CompletionStatus {
    /// Creates a new completion status with the provided parameters.
    ///
    /// This function is useful when creating a status to send to a port with
    /// the `post` method. The parameters are opaquely passed through and not
    /// interpreted by the system at all.
    pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus {
        assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
        CompletionStatus(OVERLAPPED_ENTRY {
            dwNumberOfBytesTransferred: bytes,
            lpCompletionKey: token as ULONG_PTR,
            lpOverlapped: overlapped as *mut _,
            Internal: 0,
        })
    }

    /// Creates a new borrowed completion status from the borrowed
    /// `OVERLAPPED_ENTRY` argument provided.
    ///
    /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
    /// returning the wrapped structure.
    pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
        unsafe { &*(entry as *const _ as *const _) }
    }

    /// Creates a new "zero" completion status.
    ///
    /// This function is useful when creating a stack buffer or vector of
    /// completion statuses to be passed to the `get_many` function.
    pub fn zero() -> CompletionStatus {
        CompletionStatus::new(0, 0, 0 as *mut _)
    }

    /// Returns the number of bytes that were transferred for the I/O operation
    /// associated with this completion status.
    pub fn bytes_transferred(&self) -> u32 {
        self.0.dwNumberOfBytesTransferred
    }

    /// Returns the completion key value associated with the file handle whose
    /// I/O operation has completed.
    ///
    /// A completion key is a per-handle key that is specified when it is added
    /// to an I/O completion port via `add_handle` or `add_socket`.
    pub fn token(&self) -> usize {
        self.0.lpCompletionKey as usize
    }

    /// Returns a pointer to the `Overlapped` structure that was specified when
    /// the I/O operation was started.
    pub fn overlapped(&self) -> *mut OVERLAPPED {
        self.0.lpOverlapped
    }

    /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
    pub fn entry(&self) -> &OVERLAPPED_ENTRY {
        &self.0
    }
}

#[cfg(test)]
mod tests {
    use std::mem;
    use std::time::Duration;

    use winapi::shared::basetsd::*;
    use winapi::shared::winerror::*;

    use crate::iocp::{CompletionPort, CompletionStatus};

    #[test]
    fn is_send_sync() {
        fn is_send_sync<T: Send + Sync>() {}
        is_send_sync::<CompletionPort>();
    }

    #[test]
    fn token_right_size() {
        assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>());
    }

    #[test]
    fn timeout() {
        let c = CompletionPort::new(1).unwrap();
        let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
        assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
    }

    #[test]
    fn get() {
        let c = CompletionPort::new(1).unwrap();
        c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
        let s = c.get(None).unwrap();
        assert_eq!(s.bytes_transferred(), 1);
        assert_eq!(s.token(), 2);
        assert_eq!(s.overlapped(), 3 as *mut _);
    }

    #[test]
    fn get_many() {
        let c = CompletionPort::new(1).unwrap();

        c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
        c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();

        let mut s = vec![CompletionStatus::zero(); 4];
        {
            let s = c.get_many(&mut s, None).unwrap();
            assert_eq!(s.len(), 2);
            assert_eq!(s[0].bytes_transferred(), 1);
            assert_eq!(s[0].token(), 2);
            assert_eq!(s[0].overlapped(), 3 as *mut _);
            assert_eq!(s[1].bytes_transferred(), 4);
            assert_eq!(s[1].token(), 5);
            assert_eq!(s[1].overlapped(), 6 as *mut _);
        }
        assert_eq!(s[2].bytes_transferred(), 0);
        assert_eq!(s[2].token(), 0);
        assert_eq!(s[2].overlapped(), 0 as *mut _);
    }
}