extern crate libc; extern crate alsa; use std::mem; use std::thread::{Builder, JoinHandle}; use std::io::{stderr, Write}; use std::ffi::{CString, CStr}; use self::alsa::{Seq, Direction}; use self::alsa::seq::{PortInfo, PortSubscribe, Addr, QueueTempo, EventType, PortCap, PortType}; use ::{MidiMessage, Ignore}; use ::errors::*; mod helpers { use super::alsa::seq::{Seq, Addr, ClientIter, PortIter, PortInfo, PortCap, MidiEvent, PortType}; use ::errors::PortInfoError; pub fn poll(fds: &mut [super::libc::pollfd], timeout: i32) -> i32 { unsafe { super::libc::poll(fds.as_mut_ptr(), fds.len() as super::libc::nfds_t, timeout) } } #[inline] pub fn get_ports(s: &Seq, capability: PortCap, f: F) -> Vec where F: Fn(PortInfo) -> T { ClientIter::new(s).flat_map(|c| PortIter::new(s, c.get_client())) .filter(|p| p.get_type().intersects(PortType::MIDI_GENERIC | PortType::SYNTH | PortType::APPLICATION)) .filter(|p| p.get_capability().intersects(capability)) .map(f) .collect() } #[inline] pub fn get_port_count(s: &Seq, capability: PortCap) -> usize { ClientIter::new(s).flat_map(|c| PortIter::new(s, c.get_client())) .filter(|p| p.get_type().intersects(PortType::MIDI_GENERIC | PortType::SYNTH | PortType::APPLICATION)) .filter(|p| p.get_capability().intersects(capability)) .count() } #[inline] pub fn get_port_name(s: &Seq, addr: Addr) -> Result { use std::fmt::Write; let pinfo = match s.get_any_port_info(addr) { Ok(p) => p, Err(_) => return Err(PortInfoError::InvalidPort) }; let cinfo = s.get_any_client_info(pinfo.get_client()).map_err(|_| PortInfoError::CannotRetrievePortName)?; let mut output = String::new(); write!(&mut output, "{}:{} {}:{}", cinfo.get_name().map_err(|_| PortInfoError::CannotRetrievePortName)?, pinfo.get_name().map_err(|_| PortInfoError::CannotRetrievePortName)?, pinfo.get_client(), // These lines added to make sure devices are listed pinfo.get_port() // with full portnames added to ensure individual device names ).unwrap(); Ok(output) } pub struct EventDecoder { ev: MidiEvent } impl EventDecoder { pub fn new(merge_commands: bool) -> EventDecoder { let coder = MidiEvent::new(0).unwrap(); coder.enable_running_status(merge_commands); EventDecoder { ev: coder } } #[inline] pub fn get_wrapped(&mut self) -> &mut MidiEvent { &mut self.ev } } pub struct EventEncoder { ev: MidiEvent, buffer_size: u32 } unsafe impl Send for EventEncoder {} impl EventEncoder { #[inline] pub fn new(buffer_size: u32) -> EventEncoder { EventEncoder { ev: MidiEvent::new(buffer_size).unwrap(), buffer_size: buffer_size } } #[inline] pub fn get_buffer_size(&self) -> u32 { self.buffer_size } #[inline] pub fn resize_buffer(&mut self, bufsize: u32) -> Result<(), ()> { match self.ev.resize_buffer(bufsize) { Ok(_) => { self.buffer_size = bufsize; Ok(()) }, Err(_) => Err(()) } } #[inline] pub fn get_wrapped(&mut self) -> &mut MidiEvent { &mut self.ev } } } const INITIAL_CODER_BUFFER_SIZE: usize = 32; pub struct MidiInput { ignore_flags: Ignore, seq: Option, } #[derive(Clone, PartialEq)] pub struct MidiInputPort { addr: Addr } pub struct MidiInputConnection { subscription: Option, thread: Option, T)>>, vport: i32, // TODO: probably port numbers are only u8, therefore could use Option trigger_send_fd: i32, } struct HandlerData { ignore_flags: Ignore, seq: Seq, trigger_rcv_fd: i32, callback: Box, queue_id: i32, // an input queue is needed to get timestamped events } impl MidiInput { pub fn new(client_name: &str) -> Result { let seq = match Seq::open(None, None, true) { Ok(s) => s, Err(_) => { return Err(InitError); } }; let c_client_name = CString::new(client_name).map_err(|_| InitError)?; seq.set_client_name(&c_client_name).map_err(|_| InitError)?; Ok(MidiInput { ignore_flags: Ignore::None, seq: Some(seq), }) } pub fn ignore(&mut self, flags: Ignore) { self.ignore_flags = flags; } pub(crate) fn ports_internal(&self) -> Vec<::common::MidiInputPort> { helpers::get_ports(self.seq.as_ref().unwrap(), PortCap::READ | PortCap::SUBS_READ, |p| ::common::MidiInputPort { imp: MidiInputPort { addr: p.addr() } }) } pub fn port_count(&self) -> usize { helpers::get_port_count(self.seq.as_ref().unwrap(), PortCap::READ | PortCap::SUBS_READ) } pub fn port_name(&self, port: &MidiInputPort) -> Result { helpers::get_port_name(self.seq.as_ref().unwrap(), port.addr) } fn init_queue(&mut self) -> i32 { let seq = self.seq.as_mut().unwrap(); let mut queue_id = 0; // Create the input queue if !cfg!(feature = "avoid_timestamping") { queue_id = seq.alloc_named_queue(unsafe { CStr::from_bytes_with_nul_unchecked(b"midir queue\0") }).unwrap(); // Set arbitrary tempo (mm=100) and resolution (240) let qtempo = QueueTempo::empty().unwrap(); qtempo.set_tempo(600_000); qtempo.set_ppq(240); seq.set_queue_tempo(queue_id, &qtempo).unwrap(); let _ = seq.drain_output(); } queue_id } fn init_trigger(&mut self) -> Result<[i32; 2], ()> { let mut trigger_fds = [-1, -1]; if unsafe { self::libc::pipe(trigger_fds.as_mut_ptr()) } == -1 { Err(()) } else { Ok(trigger_fds) } } fn create_port(&mut self, port_name: &CStr, queue_id: i32) -> Result { let mut pinfo = PortInfo::empty().unwrap(); // these functions are private, and the values are zeroed already by `empty()` //pinfo.set_client(0); //pinfo.set_port(0); pinfo.set_capability(PortCap::WRITE | PortCap::SUBS_WRITE); pinfo.set_type(PortType::MIDI_GENERIC | PortType::APPLICATION); pinfo.set_midi_channels(16); if !cfg!(feature = "avoid_timestamping") { pinfo.set_timestamping(true); pinfo.set_timestamp_real(true); pinfo.set_timestamp_queue(queue_id); } pinfo.set_name(port_name); match self.seq.as_mut().unwrap().create_port(&mut pinfo) { Ok(_) => Ok(pinfo.get_port()), Err(_) => Err(()) } } fn start_input_queue(&mut self, queue_id: i32) { if !cfg!(feature = "avoid_timestamping") { let seq = self.seq.as_mut().unwrap(); let _ = seq.control_queue(queue_id, EventType::Start, 0, None); let _ = seq.drain_output(); } } pub fn connect( mut self, port: &MidiInputPort, port_name: &str, callback: F, data: T ) -> Result, ConnectError> where F: FnMut(u64, &[u8], &mut T) + Send + 'static { let trigger_fds = match self.init_trigger() { Ok(fds) => fds, Err(()) => { return Err(ConnectError::other("could not create communication pipe for ALSA handler", self)); } }; let queue_id = self.init_queue(); let src_pinfo = match self.seq.as_ref().unwrap().get_any_port_info(port.addr) { Ok(p) => p, Err(_) => return Err(ConnectError::new(ConnectErrorKind::InvalidPort, self)) }; let c_port_name = match CString::new(port_name) { Ok(c_port_name) => c_port_name, Err(_) => return Err(ConnectError::other("port_name must not contain null bytes", self)) }; let vport = match self.create_port(&c_port_name, queue_id) { Ok(vp) => vp, Err(_) => { return Err(ConnectError::other("could not create ALSA input port", self)); } }; // Make subscription let sub = PortSubscribe::empty().unwrap(); sub.set_sender(src_pinfo.addr()); sub.set_dest(Addr { client: self.seq.as_ref().unwrap().client_id().unwrap(), port: vport}); if self.seq.as_ref().unwrap().subscribe_port(&sub).is_err() { return Err(ConnectError::other("could not create ALSA input subscription", self)); } let subscription = sub; // Start the input queue self.start_input_queue(queue_id); // Start our MIDI input thread. let handler_data = HandlerData { ignore_flags: self.ignore_flags, seq: self.seq.take().unwrap(), trigger_rcv_fd: trigger_fds[0], callback: Box::new(callback), queue_id: queue_id }; let threadbuilder = Builder::new(); let name = format!("midir ALSA input handler (port '{}')", port_name); let threadbuilder = threadbuilder.name(name); let thread = match threadbuilder.spawn(move || { let mut d = data; let h = handle_input(handler_data, &mut d); (h, d) // return both the handler data and the user data }) { Ok(handle) => handle, Err(_) => { //unsafe { snd_seq_unsubscribe_port(self.seq.as_mut_ptr(), sub.as_ptr()) }; return Err(ConnectError::other("could not start ALSA input handler thread", self)); } }; Ok(MidiInputConnection { subscription: Some(subscription), thread: Some(thread), vport: vport, trigger_send_fd: trigger_fds[1] }) } pub fn create_virtual( mut self, port_name: &str, callback: F, data: T ) -> Result, ConnectError> where F: FnMut(u64, &[u8], &mut T) + Send + 'static { let trigger_fds = match self.init_trigger() { Ok(fds) => fds, Err(()) => { return Err(ConnectError::other("could not create communication pipe for ALSA handler", self)); } }; let queue_id = self.init_queue(); let c_port_name = match CString::new(port_name) { Ok(c_port_name) => c_port_name, Err(_) => return Err(ConnectError::other("port_name must not contain null bytes", self)) }; let vport = match self.create_port(&c_port_name, queue_id) { Ok(vp) => vp, Err(_) => { return Err(ConnectError::other("could not create ALSA input port", self)); } }; // Start the input queue self.start_input_queue(queue_id); // Start our MIDI input thread. let handler_data = HandlerData { ignore_flags: self.ignore_flags, seq: self.seq.take().unwrap(), trigger_rcv_fd: trigger_fds[0], callback: Box::new(callback), queue_id: queue_id }; let threadbuilder = Builder::new(); let thread = match threadbuilder.spawn(move || { let mut d = data; let h = handle_input(handler_data, &mut d); (h, d) // return both the handler data and the user data }) { Ok(handle) => handle, Err(_) => { //unsafe { snd_seq_unsubscribe_port(self.seq.as_mut_ptr(), sub.as_ptr()) }; return Err(ConnectError::other("could not start ALSA input handler thread", self)); } }; Ok(MidiInputConnection { subscription: None, thread: Some(thread), vport: vport, trigger_send_fd: trigger_fds[1] }) } } impl MidiInputConnection { pub fn close(mut self) -> (MidiInput, T) { let (handler_data, user_data) = self.close_internal(); (MidiInput { ignore_flags: handler_data.ignore_flags, seq: Some(handler_data.seq), }, user_data) } /// This must only be called if the handler thread has not yet been shut down fn close_internal(&mut self) -> (HandlerData, T) { // Request the thread to stop. let _res = unsafe { self::libc::write(self.trigger_send_fd, &false as *const bool as *const _, mem::size_of::() as self::libc::size_t) }; let thread = self.thread.take().unwrap(); // Join the thread to get the handler_data back let (handler_data, user_data) = match thread.join() { Ok(data) => data, // TODO: handle this more gracefully? Err(e) => { if let Some(e) = e.downcast_ref::<&'static str>() { panic!("Error when joining ALSA thread: {}", e); } else { panic!("Unknown error when joining ALSA thread: {:?}", e); } } }; // TODO: find out why snd_seq_unsubscribe_port takes a long time if there was not yet any input message if let Some(ref subscription) = self.subscription { let _ = handler_data.seq.unsubscribe_port(subscription.get_sender(), subscription.get_dest()); } // Close the trigger fds (TODO: make sure that these are closed even in the presence of panic in thread) unsafe { self::libc::close(handler_data.trigger_rcv_fd); self::libc::close(self.trigger_send_fd); } // Stop and free the input queue if !cfg!(feature = "avoid_timestamping") { let _ = handler_data.seq.control_queue(handler_data.queue_id, EventType::Stop, 0, None); let _ = handler_data.seq.drain_output(); let _ = handler_data.seq.free_queue(handler_data.queue_id); } // Delete the port let _ = handler_data.seq.delete_port(self.vport); (handler_data, user_data) } } impl Drop for MidiInputConnection { fn drop(&mut self) { // Use `self.thread` as a flag whether the connection has already been dropped if self.thread.is_some() { self.close_internal(); } } } pub struct MidiOutput { seq: Option, // TODO: if `Seq` is marked as non-zero, this should just be pointer-sized } #[derive(Clone, PartialEq)] pub struct MidiOutputPort { addr: Addr } pub struct MidiOutputConnection { seq: Option, vport: i32, coder: helpers::EventEncoder, subscription: Option } impl MidiOutput { pub fn new(client_name: &str) -> Result { let seq = match Seq::open(None, Some(Direction::Playback), true) { Ok(s) => s, Err(_) => { return Err(InitError); } }; let c_client_name = CString::new(client_name).map_err(|_| InitError)?; seq.set_client_name(&c_client_name).map_err(|_| InitError)?; Ok(MidiOutput { seq: Some(seq), }) } pub(crate) fn ports_internal(&self) -> Vec<::common::MidiOutputPort> { helpers::get_ports(self.seq.as_ref().unwrap(), PortCap::WRITE | PortCap::SUBS_WRITE, |p| ::common::MidiOutputPort { imp: MidiOutputPort { addr: p.addr() } }) } pub fn port_count(&self) -> usize { helpers::get_port_count(self.seq.as_ref().unwrap(), PortCap::WRITE | PortCap::SUBS_WRITE) } pub fn port_name(&self, port: &MidiOutputPort) -> Result { helpers::get_port_name(self.seq.as_ref().unwrap(), port.addr) } pub fn connect(mut self, port: &MidiOutputPort, port_name: &str) -> Result> { let pinfo = match self.seq.as_ref().unwrap().get_any_port_info(port.addr) { Ok(p) => p, Err(_) => return Err(ConnectError::new(ConnectErrorKind::InvalidPort, self)) }; let c_port_name = match CString::new(port_name) { Ok(c_port_name) => c_port_name, Err(_) => return Err(ConnectError::other("port_name must not contain null bytes", self)) }; let vport = match self.seq.as_ref().unwrap().create_simple_port(&c_port_name, PortCap::READ | PortCap::SUBS_READ, PortType::MIDI_GENERIC | PortType::APPLICATION) { Ok(vport) => vport, Err(_) => return Err(ConnectError::other("could not create ALSA output port", self)) }; // Make subscription let sub = PortSubscribe::empty().unwrap(); sub.set_sender(Addr { client: self.seq.as_ref().unwrap().client_id().unwrap(), port: vport }); sub.set_dest(pinfo.addr()); sub.set_time_update(true); sub.set_time_real(true); if self.seq.as_ref().unwrap().subscribe_port(&sub).is_err() { return Err(ConnectError::other("could not create ALSA output subscription", self)); } Ok(MidiOutputConnection { seq: self.seq.take(), vport: vport, coder: helpers::EventEncoder::new(INITIAL_CODER_BUFFER_SIZE as u32), subscription: Some(sub) }) } pub fn create_virtual( mut self, port_name: &str ) -> Result> { let c_port_name = match CString::new(port_name) { Ok(c_port_name) => c_port_name, Err(_) => return Err(ConnectError::other("port_name must not contain null bytes", self)) }; let vport = match self.seq.as_ref().unwrap().create_simple_port(&c_port_name, PortCap::READ | PortCap::SUBS_READ, PortType::MIDI_GENERIC | PortType::APPLICATION) { Ok(vport) => vport, Err(_) => return Err(ConnectError::other("could not create ALSA output port", self)) }; Ok(MidiOutputConnection { seq: self.seq.take(), vport: vport, coder: helpers::EventEncoder::new(INITIAL_CODER_BUFFER_SIZE as u32), subscription: None }) } } impl MidiOutputConnection { pub fn close(mut self) -> MidiOutput { self.close_internal(); MidiOutput { seq: self.seq.take(), } } pub fn send(&mut self, message: &[u8]) -> Result<(), SendError> { let nbytes = message.len(); assert!(nbytes <= u32::max_value() as usize); if nbytes > self.coder.get_buffer_size() as usize { if self.coder.resize_buffer(nbytes as u32).is_err() { return Err(SendError::Other("could not resize ALSA encoding buffer")); } } let mut ev = match self.coder.get_wrapped().encode(message) { Ok((_, Some(ev))) => ev, _ => return Err(SendError::InvalidData("ALSA encoder reported invalid data")) }; ev.set_source(self.vport); ev.set_subs(); ev.set_direct(); // Send the event. if self.seq.as_ref().unwrap().event_output(&mut ev).is_err() { return Err(SendError::Other("could not send encoded ALSA message")); } let _ = self.seq.as_mut().unwrap().drain_output(); Ok(()) } fn close_internal(&mut self) { let seq = self.seq.as_mut().unwrap(); if let Some(ref subscription) = self.subscription { let _ = seq.unsubscribe_port(subscription.get_sender(), subscription.get_dest()); } let _ = seq.delete_port(self.vport); } } impl Drop for MidiOutputConnection { fn drop(&mut self) { if self.seq.is_some() { self.close_internal(); } } } fn handle_input(mut data: HandlerData, user_data: &mut T) -> HandlerData { use self::alsa::PollDescriptors; use self::alsa::seq::Connect; let mut continue_sysex: bool = false; // ALSA documentation says: // The required buffer size for a sequencer event it as most 12 bytes, except for System Exclusive events (which we handle separately) let mut buffer = [0; 12]; let mut coder = helpers::EventDecoder::new(false); let mut poll_fds: Box<[self::libc::pollfd]>; { let poll_desc_info = (&data.seq, Some(Direction::Capture)); let poll_fd_count = poll_desc_info.count() + 1; let mut vec = Vec::with_capacity(poll_fd_count); unsafe { vec.set_len(poll_fd_count); poll_fds = vec.into_boxed_slice(); } poll_desc_info.fill(&mut poll_fds[1..]).unwrap(); } poll_fds[0].fd = data.trigger_rcv_fd; poll_fds[0].events = self::libc::POLLIN; let mut message = MidiMessage::new(); { // open scope where we can borrow data.seq let mut seq_input = data.seq.input(); let mut do_input = true; while do_input { if let Ok(0) = seq_input.event_input_pending(true) { // No data pending if helpers::poll(&mut poll_fds, -1) >= 0 { // Read from our "channel" whether we should stop the thread if poll_fds[0].revents & self::libc::POLLIN != 0 { let _res = unsafe { self::libc::read(poll_fds[0].fd, mem::transmute(&mut do_input), mem::size_of::() as self::libc::size_t) }; } } continue; } // This is a bit weird, but we now have to decode an ALSA MIDI // event (back) into MIDI bytes. We'll ignore non-MIDI types. // The ALSA sequencer has a maximum buffer size for MIDI sysex // events of 256 bytes. If a device sends sysex messages larger // than this, they are segmented into 256 byte chunks. So, // we'll watch for this and concatenate sysex chunks into a // single sysex message if necessary. // // TODO: Figure out if this is still true (seems to not be the case) // If not (i.e., each event represents a complete message), we can // call the user callback with the byte buffer directly, without the // copying to `message.bytes` first. if !continue_sysex { message.bytes.clear() } let ignore_flags = data.ignore_flags; // If here, there should be data. let mut ev = match seq_input.event_input() { Ok(ev) => ev, Err(ref e) if e.errno() == alsa::nix::errno::Errno::ENOSPC => { let _ = writeln!(stderr(), "\nError in handle_input: ALSA MIDI input buffer overrun!\n"); continue; }, Err(ref e) if e.errno() == alsa::nix::errno::Errno::EAGAIN => { let _ = writeln!(stderr(), "\nError in handle_input: no input event from ALSA MIDI input buffer!\n"); continue; }, Err(ref e) => { let _ = writeln!(stderr(), "\nError in handle_input: unknown ALSA MIDI input error ({})!\n", e); //perror("System reports"); continue; } }; let do_decode = match ev.get_type() { EventType::PortSubscribed => { if cfg!(debug) { println!("Notice from handle_input: ALSA port connection made!") }; false }, EventType::PortUnsubscribed => { if cfg!(debug) { let _ = writeln!(stderr(), "Notice from handle_input: ALSA port connection has closed!"); let connect = ev.get_data::().unwrap(); let _ = writeln!(stderr(), "sender = {}:{}, dest = {}:{}", connect.sender.client, connect.sender.port, connect.dest.client, connect.dest.port ); } false }, EventType::Qframe => { // MIDI time code !ignore_flags.contains(Ignore::Time) }, EventType::Tick => { // 0xF9 ... MIDI timing tick !ignore_flags.contains(Ignore::Time) }, EventType::Clock => { // 0xF8 ... MIDI timing (clock) tick !ignore_flags.contains(Ignore::Time) }, EventType::Sensing => { // Active sensing !ignore_flags.contains(Ignore::ActiveSense) }, EventType::Sysex => { if !ignore_flags.contains(Ignore::Sysex) { // Directly copy the data from the external buffer to our message message.bytes.extend_from_slice(ev.get_ext().unwrap()); continue_sysex = *message.bytes.last().unwrap() != 0xF7; } false // don't ever decode sysex messages (it would unnecessarily copy the message content to another buffer) }, _ => true }; // NOTE: SysEx messages have already been "decoded" at this point! if do_decode { if let Ok(nbytes) = coder.get_wrapped().decode(&mut buffer, &mut ev) { if nbytes > 0 { message.bytes.extend_from_slice(&buffer[0..nbytes]); } } } if message.bytes.len() == 0 || continue_sysex { continue; } // Calculate the time stamp: // Use the ALSA sequencer event time data. // (thanks to Pedro Lopez-Cabanillas!). let alsa_time = ev.get_time().unwrap(); let secs = alsa_time.as_secs(); let nsecs = alsa_time.subsec_nanos(); message.timestamp = ( secs as u64 * 1_000_000 ) + ( nsecs as u64 / 1_000 ); (data.callback)(message.timestamp, &message.bytes, user_data); } } // close scope where data.seq is borrowed data // return data back to thread owner }