1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
use crate::{BusType, Error, Message, to_c_str, Watch};
use std::{ptr, str};
use std::ffi::CStr;
use std::os::raw::{c_void};
#[derive(Debug)]
pub struct ConnHandle(*mut ffi::DBusConnection);
unsafe impl Send for ConnHandle {}
unsafe impl Sync for ConnHandle {}
impl Drop for ConnHandle {
fn drop(&mut self) {
unsafe {
ffi::dbus_connection_close(self.0);
ffi::dbus_connection_unref(self.0);
}
}
}
/// Experimental rewrite of Connection [unstable / experimental]
///
/// Slightly lower level, with better support for async operations.
/// Also, this struct is Send + Sync.
///
/// Blocking operations should be clearly marked as such, although if you
/// try to access the connection from several threads at the same time,
/// blocking might occur due to an internal mutex inside the dbus library.
///
/// This version avoids dbus_connection_dispatch, and thus avoids
/// callbacks from that function. Instead the same functionality needs to be
/// implemented by these bindings somehow - this is not done yet.
#[derive(Debug)]
pub struct TxRx {
handle: ConnHandle,
}
impl TxRx {
#[inline(always)]
pub (crate) fn conn(&self) -> *mut ffi::DBusConnection {
self.handle.0
}
fn conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<TxRx, Error> {
let handle = ConnHandle(ptr);
/* No, we don't want our app to suddenly quit if dbus goes down */
unsafe { ffi::dbus_connection_set_exit_on_disconnect(ptr, 0) };
let c = TxRx { handle };
Ok(c)
}
/// Creates a new D-Bus connection.
///
/// Blocking: until the connection is up and running.
pub fn get_private(bus: BusType) -> Result<TxRx, Error> {
let mut e = Error::empty();
let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) };
if conn == ptr::null_mut() {
return Err(e)
}
Self::conn_from_ptr(conn)
}
/// Creates a new D-Bus connection to a remote address.
///
/// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
///
/// Blocking: until the connection is established.
pub fn open_private(address: &str) -> Result<TxRx, Error> {
let mut e = Error::empty();
let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
if conn == ptr::null_mut() {
return Err(e)
}
Self::conn_from_ptr(conn)
}
/// Registers a new D-Bus connection with the bus.
///
/// Note: `get_private` does this automatically, useful with `open_private`
///
/// Blocking: until a "Hello" response is received from the server.
pub fn register(&mut self) -> Result<(), Error> {
// This function needs to take &mut self, because it changes unique_name and unique_name takes a &self
let mut e = Error::empty();
if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
Err(e)
} else {
Ok(())
}
}
/// Gets whether the connection is currently open.
pub fn is_connected(&self) -> bool {
unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
}
/// Get the connection's unique name.
///
/// It's usually something like ":1.54"
pub fn unique_name(&self) -> Option<&str> {
let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
if c == ptr::null_mut() { return None; }
let s = unsafe { CStr::from_ptr(c) };
str::from_utf8(s.to_bytes()).ok()
}
/// Puts a message into libdbus out queue. Use "flush" or "read_write" to make sure it is sent over the wire.
///
/// Returns a serial number than can be used to match against a reply.
pub fn send(&self, msg: Message) -> Result<u32, ()> {
let mut serial = 0u32;
let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
if r == 0 { return Err(()); }
Ok(serial)
}
/// Flush the queue of outgoing messages.
///
/// Blocking: until the outgoing queue is empty.
pub fn flush(&self) { unsafe { ffi::dbus_connection_flush(self.conn()) } }
/// Read and write to the connection.
///
/// Incoming messages are put in the internal queue, outgoing messages are written.
///
/// Blocking: If there are no messages, for up to timeout_ms milliseconds, or forever if timeout_ms is None.
/// For non-blocking behaviour, set timeout_ms to Some(0).
pub fn read_write(&self, timeout_ms: Option<i32>) -> Result<(), ()> {
let t = timeout_ms.unwrap_or(-1);
if unsafe { ffi::dbus_connection_read_write(self.conn(), t) == 0 } {
Err(())
} else {
Ok(())
}
}
/// Removes a message from the incoming queue, or returns None if the queue is empty.
///
/// Use "read_write" first, so that messages are put into the incoming queue.
/// For unhandled messages, please call MessageDispatcher::default_dispatch to return
/// default replies for method calls.
pub fn pop_message(&self) -> Option<Message> {
let mptr = unsafe { ffi::dbus_connection_pop_message(self.conn()) };
if mptr == ptr::null_mut() {
None
} else {
Some(Message::from_ptr(mptr, false))
}
}
/// Get an up-to-date list of file descriptors to watch.
///
/// Might be changed into something that allows for callbacks when the watch list is changed.
pub fn watch_fds(&mut self) -> Result<Vec<Watch>, ()> {
extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 {
unsafe {
let wlist: &mut Vec<Watch> = &mut *(data as *mut _);
wlist.push(Watch::from_raw(watch));
}
1
}
let mut r = vec!();
if unsafe { ffi::dbus_connection_set_watch_functions(self.conn(),
Some(add_watch_cb), None, None, &mut r as *mut _ as *mut _, None) } == 0 { return Err(()) }
assert!(unsafe { ffi::dbus_connection_set_watch_functions(self.conn(),
None, None, None, ptr::null_mut(), None) } != 0);
Ok(r)
}
}
#[test]
fn test_txrx_send_sync() {
fn is_send<T: Send>(_: &T) {}
fn is_sync<T: Sync>(_: &T) {}
let c = TxRx::get_private(BusType::Session).unwrap();
is_send(&c);
is_sync(&c);
}
#[test]
fn txrx_simple_test() {
let mut c = TxRx::get_private(BusType::Session).unwrap();
assert!(c.is_connected());
let fds = c.watch_fds().unwrap();
println!("{:?}", fds);
assert!(fds.len() > 0);
let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
let reply = c.send(m).unwrap();
let my_name = c.unique_name().unwrap();
loop {
while let Some(mut msg) = c.pop_message() {
println!("{:?}", msg);
if msg.get_reply_serial() == Some(reply) {
let r = msg.as_result().unwrap();
let z: ::arg::Array<&str, _> = r.get1().unwrap();
for n in z {
println!("{}", n);
if n == my_name { return; } // Hooray, we found ourselves!
}
assert!(false);
} else if let Some(r) = crate::MessageDispatcher::<()>::default_dispatch(&msg) {
c.send(r).unwrap();
}
}
c.read_write(Some(100)).unwrap();
}
}
|