diff options
Diffstat (limited to 'third_party/rust/lmdb-rkv/src')
-rw-r--r-- | third_party/rust/lmdb-rkv/src/cursor.rs | 673 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/database.rs | 56 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/environment.rs | 680 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/error.rs | 156 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/flags.rs | 173 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/lib.rs | 106 | ||||
-rw-r--r-- | third_party/rust/lmdb-rkv/src/transaction.rs | 773 |
7 files changed, 2617 insertions, 0 deletions
diff --git a/third_party/rust/lmdb-rkv/src/cursor.rs b/third_party/rust/lmdb-rkv/src/cursor.rs new file mode 100644 index 0000000000..ff4e29587d --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/cursor.rs @@ -0,0 +1,673 @@ +use std::marker::PhantomData; +use std::{ + fmt, + mem, + ptr, + result, + slice, +}; + +use libc::{ + c_uint, + c_void, + size_t, + EINVAL, +}; + +use database::Database; +use error::{ + lmdb_result, + Error, + Result, +}; +use ffi; +use flags::WriteFlags; +use transaction::Transaction; + +/// An LMDB cursor. +pub trait Cursor<'txn> { + /// Returns a raw pointer to the underlying LMDB cursor. + /// + /// The caller **must** ensure that the pointer is not used after the + /// lifetime of the cursor. + fn cursor(&self) -> *mut ffi::MDB_cursor; + + /// Retrieves a key/data pair from the cursor. Depending on the cursor op, + /// the current key may be returned. + fn get(&self, key: Option<&[u8]>, data: Option<&[u8]>, op: c_uint) -> Result<(Option<&'txn [u8]>, &'txn [u8])> { + unsafe { + let mut key_val = slice_to_val(key); + let mut data_val = slice_to_val(data); + let key_ptr = key_val.mv_data; + lmdb_result(ffi::mdb_cursor_get(self.cursor(), &mut key_val, &mut data_val, op))?; + let key_out = if key_ptr != key_val.mv_data { + Some(val_to_slice(key_val)) + } else { + None + }; + let data_out = val_to_slice(data_val); + Ok((key_out, data_out)) + } + } + + /// Iterate over database items. The iterator will begin with item next + /// after the cursor, and continue until the end of the database. For new + /// cursors, the iterator will begin with the first item in the database. + /// + /// For databases with duplicate data items (`DatabaseFlags::DUP_SORT`), the + /// duplicate data items of each key will be returned before moving on to + /// the next key. + fn iter(&mut self) -> Iter<'txn> { + Iter::new(self.cursor(), ffi::MDB_NEXT, ffi::MDB_NEXT) + } + + /// Iterate over database items starting from the beginning of the database. + /// + /// For databases with duplicate data items (`DatabaseFlags::DUP_SORT`), the + /// duplicate data items of each key will be returned before moving on to + /// the next key. + fn iter_start(&mut self) -> Iter<'txn> { + Iter::new(self.cursor(), ffi::MDB_FIRST, ffi::MDB_NEXT) + } + + /// Iterate over database items starting from the given key. + /// + /// For databases with duplicate data items (`DatabaseFlags::DUP_SORT`), the + /// duplicate data items of each key will be returned before moving on to + /// the next key. + fn iter_from<K>(&mut self, key: K) -> Iter<'txn> + where + K: AsRef<[u8]>, + { + match self.get(Some(key.as_ref()), None, ffi::MDB_SET_RANGE) { + Ok(_) | Err(Error::NotFound) => (), + Err(error) => return Iter::Err(error), + }; + Iter::new(self.cursor(), ffi::MDB_GET_CURRENT, ffi::MDB_NEXT) + } + + /// Iterate over duplicate database items. The iterator will begin with the + /// item next after the cursor, and continue until the end of the database. + /// Each item will be returned as an iterator of its duplicates. + fn iter_dup(&mut self) -> IterDup<'txn> { + IterDup::new(self.cursor(), ffi::MDB_NEXT) + } + + /// Iterate over duplicate database items starting from the beginning of the + /// database. Each item will be returned as an iterator of its duplicates. + fn iter_dup_start(&mut self) -> IterDup<'txn> { + IterDup::new(self.cursor(), ffi::MDB_FIRST) + } + + /// Iterate over duplicate items in the database starting from the given + /// key. Each item will be returned as an iterator of its duplicates. + fn iter_dup_from<K>(&mut self, key: K) -> IterDup<'txn> + where + K: AsRef<[u8]>, + { + match self.get(Some(key.as_ref()), None, ffi::MDB_SET_RANGE) { + Ok(_) | Err(Error::NotFound) => (), + Err(error) => return IterDup::Err(error), + }; + IterDup::new(self.cursor(), ffi::MDB_GET_CURRENT) + } + + /// Iterate over the duplicates of the item in the database with the given key. + fn iter_dup_of<K>(&mut self, key: K) -> Iter<'txn> + where + K: AsRef<[u8]>, + { + match self.get(Some(key.as_ref()), None, ffi::MDB_SET) { + Ok(_) | Err(Error::NotFound) => (), + Err(error) => return Iter::Err(error), + }; + Iter::new(self.cursor(), ffi::MDB_GET_CURRENT, ffi::MDB_NEXT_DUP) + } +} + +/// A read-only cursor for navigating the items within a database. +pub struct RoCursor<'txn> { + cursor: *mut ffi::MDB_cursor, + _marker: PhantomData<fn() -> &'txn ()>, +} + +impl<'txn> Cursor<'txn> for RoCursor<'txn> { + fn cursor(&self) -> *mut ffi::MDB_cursor { + self.cursor + } +} + +impl<'txn> fmt::Debug for RoCursor<'txn> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("RoCursor").finish() + } +} + +impl<'txn> Drop for RoCursor<'txn> { + fn drop(&mut self) { + unsafe { ffi::mdb_cursor_close(self.cursor) } + } +} + +impl<'txn> RoCursor<'txn> { + /// Creates a new read-only cursor in the given database and transaction. + /// Prefer using `Transaction::open_cursor`. + pub(crate) fn new<T>(txn: &'txn T, db: Database) -> Result<RoCursor<'txn>> + where + T: Transaction, + { + let mut cursor: *mut ffi::MDB_cursor = ptr::null_mut(); + unsafe { + lmdb_result(ffi::mdb_cursor_open(txn.txn(), db.dbi(), &mut cursor))?; + } + Ok(RoCursor { + cursor, + _marker: PhantomData, + }) + } +} + +/// A read-write cursor for navigating items within a database. +pub struct RwCursor<'txn> { + cursor: *mut ffi::MDB_cursor, + _marker: PhantomData<fn() -> &'txn ()>, +} + +impl<'txn> Cursor<'txn> for RwCursor<'txn> { + fn cursor(&self) -> *mut ffi::MDB_cursor { + self.cursor + } +} + +impl<'txn> fmt::Debug for RwCursor<'txn> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("RwCursor").finish() + } +} + +impl<'txn> Drop for RwCursor<'txn> { + fn drop(&mut self) { + unsafe { ffi::mdb_cursor_close(self.cursor) } + } +} + +impl<'txn> RwCursor<'txn> { + /// Creates a new read-only cursor in the given database and transaction. + /// Prefer using `RwTransaction::open_rw_cursor`. + pub(crate) fn new<T>(txn: &'txn T, db: Database) -> Result<RwCursor<'txn>> + where + T: Transaction, + { + let mut cursor: *mut ffi::MDB_cursor = ptr::null_mut(); + unsafe { + lmdb_result(ffi::mdb_cursor_open(txn.txn(), db.dbi(), &mut cursor))?; + } + Ok(RwCursor { + cursor, + _marker: PhantomData, + }) + } + + /// Puts a key/data pair into the database. The cursor will be positioned at + /// the new data item, or on failure usually near it. + pub fn put<K, D>(&mut self, key: &K, data: &D, flags: WriteFlags) -> Result<()> + where + K: AsRef<[u8]>, + D: AsRef<[u8]>, + { + let key = key.as_ref(); + let data = data.as_ref(); + let mut key_val: ffi::MDB_val = ffi::MDB_val { + mv_size: key.len() as size_t, + mv_data: key.as_ptr() as *mut c_void, + }; + let mut data_val: ffi::MDB_val = ffi::MDB_val { + mv_size: data.len() as size_t, + mv_data: data.as_ptr() as *mut c_void, + }; + unsafe { lmdb_result(ffi::mdb_cursor_put(self.cursor(), &mut key_val, &mut data_val, flags.bits())) } + } + + /// Deletes the current key/data pair. + /// + /// ### Flags + /// + /// `WriteFlags::NO_DUP_DATA` may be used to delete all data items for the + /// current key, if the database was opened with `DatabaseFlags::DUP_SORT`. + pub fn del(&mut self, flags: WriteFlags) -> Result<()> { + unsafe { lmdb_result(ffi::mdb_cursor_del(self.cursor(), flags.bits())) } + } +} + +unsafe fn slice_to_val(slice: Option<&[u8]>) -> ffi::MDB_val { + match slice { + Some(slice) => ffi::MDB_val { + mv_size: slice.len() as size_t, + mv_data: slice.as_ptr() as *mut c_void, + }, + None => ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }, + } +} + +unsafe fn val_to_slice<'a>(val: ffi::MDB_val) -> &'a [u8] { + slice::from_raw_parts(val.mv_data as *const u8, val.mv_size as usize) +} + +/// An iterator over the key/value pairs in an LMDB database. +pub enum Iter<'txn> { + /// An iterator that returns an error on every call to Iter.next(). + /// Cursor.iter*() creates an Iter of this type when LMDB returns an error + /// on retrieval of a cursor. Using this variant instead of returning + /// an error makes Cursor.iter()* methods infallible, so consumers only + /// need to check the result of Iter.next(). + Err(Error), + + /// An iterator that returns an Item on calls to Iter.next(). + /// The Item is a Result<(&'txn [u8], &'txn [u8])>, so this variant + /// might still return an error, if retrieval of the key/value pair + /// fails for some reason. + Ok { + /// The LMDB cursor with which to iterate. + cursor: *mut ffi::MDB_cursor, + + /// The first operation to perform when the consumer calls Iter.next(). + op: c_uint, + + /// The next and subsequent operations to perform. + next_op: c_uint, + + /// A marker to ensure the iterator doesn't outlive the transaction. + _marker: PhantomData<fn(&'txn ())>, + }, +} + +impl<'txn> Iter<'txn> { + /// Creates a new iterator backed by the given cursor. + fn new<'t>(cursor: *mut ffi::MDB_cursor, op: c_uint, next_op: c_uint) -> Iter<'t> { + Iter::Ok { + cursor, + op, + next_op, + _marker: PhantomData, + } + } +} + +impl<'txn> fmt::Debug for Iter<'txn> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("Iter").finish() + } +} + +impl<'txn> Iterator for Iter<'txn> { + type Item = Result<(&'txn [u8], &'txn [u8])>; + + fn next(&mut self) -> Option<Result<(&'txn [u8], &'txn [u8])>> { + match self { + &mut Iter::Ok { + cursor, + ref mut op, + next_op, + _marker, + } => { + let mut key = ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }; + let mut data = ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }; + let op = mem::replace(op, next_op); + unsafe { + match ffi::mdb_cursor_get(cursor, &mut key, &mut data, op) { + ffi::MDB_SUCCESS => Some(Ok((val_to_slice(key), val_to_slice(data)))), + // EINVAL can occur when the cursor was previously seeked to a non-existent value, + // e.g. iter_from with a key greater than all values in the database. + ffi::MDB_NOTFOUND | EINVAL => None, + error => Some(Err(Error::from_err_code(error))), + } + } + }, + &mut Iter::Err(err) => Some(Err(err)), + } + } +} + +/// An iterator over the keys and duplicate values in an LMDB database. +/// +/// The yielded items of the iterator are themselves iterators over the duplicate values for a +/// specific key. +pub enum IterDup<'txn> { + /// An iterator that returns an error on every call to Iter.next(). + /// Cursor.iter*() creates an Iter of this type when LMDB returns an error + /// on retrieval of a cursor. Using this variant instead of returning + /// an error makes Cursor.iter()* methods infallible, so consumers only + /// need to check the result of Iter.next(). + Err(Error), + + /// An iterator that returns an Item on calls to Iter.next(). + /// The Item is a Result<(&'txn [u8], &'txn [u8])>, so this variant + /// might still return an error, if retrieval of the key/value pair + /// fails for some reason. + Ok { + /// The LMDB cursor with which to iterate. + cursor: *mut ffi::MDB_cursor, + + /// The first operation to perform when the consumer calls Iter.next(). + op: c_uint, + + /// A marker to ensure the iterator doesn't outlive the transaction. + _marker: PhantomData<fn(&'txn ())>, + }, +} + +impl<'txn> IterDup<'txn> { + /// Creates a new iterator backed by the given cursor. + fn new<'t>(cursor: *mut ffi::MDB_cursor, op: c_uint) -> IterDup<'t> { + IterDup::Ok { + cursor, + op, + _marker: PhantomData, + } + } +} + +impl<'txn> fmt::Debug for IterDup<'txn> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("IterDup").finish() + } +} + +impl<'txn> Iterator for IterDup<'txn> { + type Item = Iter<'txn>; + + fn next(&mut self) -> Option<Iter<'txn>> { + match self { + &mut IterDup::Ok { + cursor, + ref mut op, + _marker, + } => { + let mut key = ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }; + let mut data = ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }; + let op = mem::replace(op, ffi::MDB_NEXT_NODUP); + let err_code = unsafe { ffi::mdb_cursor_get(cursor, &mut key, &mut data, op) }; + + if err_code == ffi::MDB_SUCCESS { + Some(Iter::new(cursor, ffi::MDB_GET_CURRENT, ffi::MDB_NEXT_DUP)) + } else { + None + } + }, + &mut IterDup::Err(err) => Some(Iter::Err(err)), + } + } +} + +#[cfg(test)] +mod test { + use tempdir::TempDir; + + use super::*; + use environment::*; + use ffi::*; + use flags::*; + + #[test] + fn test_get() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); + + let cursor = txn.open_ro_cursor(db).unwrap(); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_FIRST).unwrap()); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_GET_CURRENT).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val2"[..]), cursor.get(None, None, MDB_NEXT).unwrap()); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_PREV).unwrap()); + assert_eq!((Some(&b"key3"[..]), &b"val3"[..]), cursor.get(None, None, MDB_LAST).unwrap()); + assert_eq!((None, &b"val2"[..]), cursor.get(Some(b"key2"), None, MDB_SET).unwrap()); + assert_eq!((Some(&b"key3"[..]), &b"val3"[..]), cursor.get(Some(&b"key3"[..]), None, MDB_SET_KEY).unwrap()); + assert_eq!((Some(&b"key3"[..]), &b"val3"[..]), cursor.get(Some(&b"key2\0"[..]), None, MDB_SET_RANGE).unwrap()); + } + + #[test] + fn test_get_dup() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap(); + + let cursor = txn.open_ro_cursor(db).unwrap(); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_FIRST).unwrap()); + assert_eq!((None, &b"val1"[..]), cursor.get(None, None, MDB_FIRST_DUP).unwrap()); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_GET_CURRENT).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val1"[..]), cursor.get(None, None, MDB_NEXT_NODUP).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val2"[..]), cursor.get(None, None, MDB_NEXT_DUP).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val3"[..]), cursor.get(None, None, MDB_NEXT_DUP).unwrap()); + assert!(cursor.get(None, None, MDB_NEXT_DUP).is_err()); + assert_eq!((Some(&b"key2"[..]), &b"val2"[..]), cursor.get(None, None, MDB_PREV_DUP).unwrap()); + assert_eq!((None, &b"val3"[..]), cursor.get(None, None, MDB_LAST_DUP).unwrap()); + assert_eq!((Some(&b"key1"[..]), &b"val3"[..]), cursor.get(None, None, MDB_PREV_NODUP).unwrap()); + assert_eq!((None, &b"val1"[..]), cursor.get(Some(&b"key1"[..]), None, MDB_SET).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val1"[..]), cursor.get(Some(&b"key2"[..]), None, MDB_SET_KEY).unwrap()); + assert_eq!((Some(&b"key2"[..]), &b"val1"[..]), cursor.get(Some(&b"key1\0"[..]), None, MDB_SET_RANGE).unwrap()); + assert_eq!((None, &b"val3"[..]), cursor.get(Some(&b"key1"[..]), Some(&b"val3"[..]), MDB_GET_BOTH).unwrap()); + assert_eq!( + (None, &b"val1"[..]), + cursor.get(Some(&b"key2"[..]), Some(&b"val"[..]), MDB_GET_BOTH_RANGE).unwrap() + ); + } + + #[test] + fn test_get_dupfixed() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val4", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val5", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val6", WriteFlags::empty()).unwrap(); + + let cursor = txn.open_ro_cursor(db).unwrap(); + assert_eq!((Some(&b"key1"[..]), &b"val1"[..]), cursor.get(None, None, MDB_FIRST).unwrap()); + assert_eq!((None, &b"val1val2val3"[..]), cursor.get(None, None, MDB_GET_MULTIPLE).unwrap()); + assert!(cursor.get(None, None, MDB_NEXT_MULTIPLE).is_err()); + } + + #[test] + fn test_iter() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let items: Vec<(&[u8], &[u8])> = + vec![(b"key1", b"val1"), (b"key2", b"val2"), (b"key3", b"val3"), (b"key5", b"val5")]; + + { + let mut txn = env.begin_rw_txn().unwrap(); + for &(ref key, ref data) in &items { + txn.put(db, key, data, WriteFlags::empty()).unwrap(); + } + txn.commit().unwrap(); + } + + let txn = env.begin_ro_txn().unwrap(); + let mut cursor = txn.open_ro_cursor(db).unwrap(); + + // Because Result implements FromIterator, we can collect the iterator + // of items of type Result<_, E> into a Result<Vec<_, E>> by specifying + // the collection type via the turbofish syntax. + assert_eq!(items, cursor.iter().collect::<Result<Vec<_>>>().unwrap()); + + // Alternately, we can collect it into an appropriately typed variable. + let retr: Result<Vec<_>> = cursor.iter_start().collect(); + assert_eq!(items, retr.unwrap()); + + cursor.get(Some(b"key2"), None, MDB_SET).unwrap(); + assert_eq!( + items.clone().into_iter().skip(2).collect::<Vec<_>>(), + cursor.iter().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!(items, cursor.iter_start().collect::<Result<Vec<_>>>().unwrap()); + + assert_eq!( + items.clone().into_iter().skip(1).collect::<Vec<_>>(), + cursor.iter_from(b"key2").collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + items.clone().into_iter().skip(3).collect::<Vec<_>>(), + cursor.iter_from(b"key4").collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + vec!().into_iter().collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_from(b"key6").collect::<Result<Vec<_>>>().unwrap() + ); + } + + #[test] + fn test_iter_empty_database() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + let txn = env.begin_ro_txn().unwrap(); + let mut cursor = txn.open_ro_cursor(db).unwrap(); + + assert_eq!(0, cursor.iter().count()); + assert_eq!(0, cursor.iter_start().count()); + assert_eq!(0, cursor.iter_from(b"foo").count()); + } + + #[test] + fn test_iter_empty_dup_database() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); + let txn = env.begin_ro_txn().unwrap(); + let mut cursor = txn.open_ro_cursor(db).unwrap(); + + assert_eq!(0, cursor.iter().count()); + assert_eq!(0, cursor.iter_start().count()); + assert_eq!(0, cursor.iter_from(b"foo").count()); + assert_eq!(0, cursor.iter_dup().count()); + assert_eq!(0, cursor.iter_dup_start().count()); + assert_eq!(0, cursor.iter_dup_from(b"foo").count()); + assert_eq!(0, cursor.iter_dup_of(b"foo").count()); + } + + #[test] + fn test_iter_dup() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); + + let items: Vec<(&[u8], &[u8])> = vec![ + (b"a", b"1"), + (b"a", b"2"), + (b"a", b"3"), + (b"b", b"1"), + (b"b", b"2"), + (b"b", b"3"), + (b"c", b"1"), + (b"c", b"2"), + (b"c", b"3"), + (b"e", b"1"), + (b"e", b"2"), + (b"e", b"3"), + ]; + + { + let mut txn = env.begin_rw_txn().unwrap(); + for &(ref key, ref data) in &items { + txn.put(db, key, data, WriteFlags::empty()).unwrap(); + } + txn.commit().unwrap(); + } + + let txn = env.begin_ro_txn().unwrap(); + let mut cursor = txn.open_ro_cursor(db).unwrap(); + assert_eq!(items, cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap()); + + cursor.get(Some(b"b"), None, MDB_SET).unwrap(); + assert_eq!( + items.clone().into_iter().skip(4).collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup().flatten().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!(items, cursor.iter_dup_start().flatten().collect::<Result<Vec<(&[u8], &[u8])>>>().unwrap()); + + assert_eq!( + items.clone().into_iter().skip(3).collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup_from(b"b").flatten().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + items.clone().into_iter().skip(3).collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup_from(b"ab").flatten().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + items.clone().into_iter().skip(9).collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup_from(b"d").flatten().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + vec!().into_iter().collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup_from(b"f").flatten().collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!( + items.clone().into_iter().skip(3).take(3).collect::<Vec<(&[u8], &[u8])>>(), + cursor.iter_dup_of(b"b").collect::<Result<Vec<_>>>().unwrap() + ); + + assert_eq!(0, cursor.iter_dup_of(b"foo").count()); + } + + #[test] + fn test_put_del() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + let mut cursor = txn.open_rw_cursor(db).unwrap(); + + cursor.put(b"key1", b"val1", WriteFlags::empty()).unwrap(); + cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap(); + cursor.put(b"key3", b"val3", WriteFlags::empty()).unwrap(); + + assert_eq!((Some(&b"key3"[..]), &b"val3"[..]), cursor.get(None, None, MDB_GET_CURRENT).unwrap()); + + cursor.del(WriteFlags::empty()).unwrap(); + assert_eq!((Some(&b"key2"[..]), &b"val2"[..]), cursor.get(None, None, MDB_LAST).unwrap()); + } +} diff --git a/third_party/rust/lmdb-rkv/src/database.rs b/third_party/rust/lmdb-rkv/src/database.rs new file mode 100644 index 0000000000..848af89380 --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/database.rs @@ -0,0 +1,56 @@ +use libc::c_uint; +use std::ffi::CString; +use std::ptr; + +use ffi; + +use error::{ + lmdb_result, + Result, +}; + +/// A handle to an individual database in an environment. +/// +/// A database handle denotes the name and parameters of a database in an environment. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Database { + dbi: ffi::MDB_dbi, +} + +impl Database { + /// Opens a new database handle in the given transaction. + /// + /// Prefer using `Environment::open_db`, `Environment::create_db`, `TransactionExt::open_db`, + /// or `RwTransaction::create_db`. + pub(crate) unsafe fn new(txn: *mut ffi::MDB_txn, name: Option<&str>, flags: c_uint) -> Result<Database> { + let c_name = name.map(|n| CString::new(n).unwrap()); + let name_ptr = if let Some(ref c_name) = c_name { + c_name.as_ptr() + } else { + ptr::null() + }; + let mut dbi: ffi::MDB_dbi = 0; + lmdb_result(ffi::mdb_dbi_open(txn, name_ptr, flags, &mut dbi))?; + Ok(Database { + dbi, + }) + } + + pub(crate) fn freelist_db() -> Database { + Database { + dbi: 0, + } + } + + /// Returns the underlying LMDB database handle. + /// + /// The caller **must** ensure that the handle is not used after the lifetime of the + /// environment, or after the database has been closed. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn dbi(&self) -> ffi::MDB_dbi { + self.dbi + } +} + +unsafe impl Sync for Database {} +unsafe impl Send for Database {} diff --git a/third_party/rust/lmdb-rkv/src/environment.rs b/third_party/rust/lmdb-rkv/src/environment.rs new file mode 100644 index 0000000000..87048ce00c --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/environment.rs @@ -0,0 +1,680 @@ +use libc::{ + c_uint, + size_t, +}; +use std::ffi::CString; +#[cfg(windows)] +use std::ffi::OsStr; +#[cfg(unix)] +use std::os::unix::ffi::OsStrExt; +use std::path::Path; +use std::sync::Mutex; +use std::{ + fmt, + mem, + ptr, + result, +}; + +use ffi; + +use byteorder::{ + ByteOrder, + NativeEndian, +}; + +use cursor::Cursor; +use database::Database; +use error::{ + lmdb_result, + Error, + Result, +}; +use flags::{ + DatabaseFlags, + EnvironmentFlags, +}; +use transaction::{ + RoTransaction, + RwTransaction, + Transaction, +}; + +#[cfg(windows)] +/// Adding a 'missing' trait from windows OsStrExt +trait OsStrExtLmdb { + fn as_bytes(&self) -> &[u8]; +} +#[cfg(windows)] +impl OsStrExtLmdb for OsStr { + fn as_bytes(&self) -> &[u8] { + &self.to_str().unwrap().as_bytes() + } +} + +/// An LMDB environment. +/// +/// An environment supports multiple databases, all residing in the same shared-memory map. +pub struct Environment { + env: *mut ffi::MDB_env, + dbi_open_mutex: Mutex<()>, +} + +impl Environment { + /// Creates a new builder for specifying options for opening an LMDB environment. + #[allow(clippy::new_ret_no_self)] + pub fn new() -> EnvironmentBuilder { + EnvironmentBuilder { + flags: EnvironmentFlags::empty(), + max_readers: None, + max_dbs: None, + map_size: None, + } + } + + /// Returns a raw pointer to the underlying LMDB environment. + /// + /// The caller **must** ensure that the pointer is not dereferenced after the lifetime of the + /// environment. + pub fn env(&self) -> *mut ffi::MDB_env { + self.env + } + + /// Opens a handle to an LMDB database. + /// + /// If `name` is `None`, then the returned handle will be for the default database. + /// + /// If `name` is not `None`, then the returned handle will be for a named database. In this + /// case the environment must be configured to allow named databases through + /// `EnvironmentBuilder::set_max_dbs`. + /// + /// The returned database handle may be shared among any transaction in the environment. + /// + /// This function will fail with `Error::BadRslot` if called by a thread which has an ongoing + /// transaction. + /// + /// The database name may not contain the null character. + pub fn open_db<'env>(&'env self, name: Option<&str>) -> Result<Database> { + let mutex = self.dbi_open_mutex.lock(); + let txn = self.begin_ro_txn()?; + let db = unsafe { txn.open_db(name)? }; + txn.commit()?; + drop(mutex); + Ok(db) + } + + /// Opens a handle to an LMDB database, creating the database if necessary. + /// + /// If the database is already created, the given option flags will be added to it. + /// + /// If `name` is `None`, then the returned handle will be for the default database. + /// + /// If `name` is not `None`, then the returned handle will be for a named database. In this + /// case the environment must be configured to allow named databases through + /// `EnvironmentBuilder::set_max_dbs`. + /// + /// The returned database handle may be shared among any transaction in the environment. + /// + /// This function will fail with `Error::BadRslot` if called by a thread with an open + /// transaction. + pub fn create_db<'env>(&'env self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> { + let mutex = self.dbi_open_mutex.lock(); + let txn = self.begin_rw_txn()?; + let db = unsafe { txn.create_db(name, flags)? }; + txn.commit()?; + drop(mutex); + Ok(db) + } + + /// Retrieves the set of flags which the database is opened with. + /// + /// The database must belong to to this environment. + pub fn get_db_flags(&self, db: Database) -> Result<DatabaseFlags> { + let txn = self.begin_ro_txn()?; + let mut flags: c_uint = 0; + unsafe { + lmdb_result(ffi::mdb_dbi_flags(txn.txn(), db.dbi(), &mut flags))?; + } + Ok(DatabaseFlags::from_bits(flags).unwrap()) + } + + /// Create a read-only transaction for use with the environment. + pub fn begin_ro_txn<'env>(&'env self) -> Result<RoTransaction<'env>> { + RoTransaction::new(self) + } + + /// Create a read-write transaction for use with the environment. This method will block while + /// there are any other read-write transactions open on the environment. + pub fn begin_rw_txn<'env>(&'env self) -> Result<RwTransaction<'env>> { + RwTransaction::new(self) + } + + /// Flush data buffers to disk. + /// + /// Data is always written to disk when `Transaction::commit` is called, but the operating + /// system may keep it buffered. LMDB always flushes the OS buffers upon commit as well, unless + /// the environment was opened with `MDB_NOSYNC` or in part `MDB_NOMETASYNC`. + pub fn sync(&self, force: bool) -> Result<()> { + unsafe { + lmdb_result(ffi::mdb_env_sync( + self.env(), + if force { + 1 + } else { + 0 + }, + )) + } + } + + /// Closes the database handle. Normally unnecessary. + /// + /// Closing a database handle is not necessary, but lets `Transaction::open_database` reuse the + /// handle value. Usually it's better to set a bigger `EnvironmentBuilder::set_max_dbs`, unless + /// that value would be large. + /// + /// ## Safety + /// + /// This call is not mutex protected. Databases should only be closed by a single thread, and + /// only if no other threads are going to reference the database handle or one of its cursors + /// any further. Do not close a handle if an existing transaction has modified its database. + /// Doing so can cause misbehavior from database corruption to errors like + /// `Error::BadValSize` (since the DB name is gone). + pub unsafe fn close_db(&mut self, db: Database) { + ffi::mdb_dbi_close(self.env, db.dbi()); + } + + /// Retrieves statistics about this environment. + pub fn stat(&self) -> Result<Stat> { + unsafe { + let mut stat = Stat::new(); + lmdb_try!(ffi::mdb_env_stat(self.env(), stat.mdb_stat())); + Ok(stat) + } + } + + /// Retrieves info about this environment. + pub fn info(&self) -> Result<Info> { + unsafe { + let mut info = Info(mem::zeroed()); + lmdb_try!(ffi::mdb_env_info(self.env(), &mut info.0)); + Ok(info) + } + } + + /// Retrieves the total number of pages on the freelist. + /// + /// Along with `Environment::info()`, this can be used to calculate the exact number + /// of used pages as well as free pages in this environment. + /// + /// ```ignore + /// let env = Environment::new().open("/tmp/test").unwrap(); + /// let info = env.info().unwrap(); + /// let stat = env.stat().unwrap(); + /// let freelist = env.freelist().unwrap(); + /// let last_pgno = info.last_pgno() + 1; // pgno is 0 based. + /// let total_pgs = info.map_size() / stat.page_size() as usize; + /// let pgs_in_use = last_pgno - freelist; + /// let pgs_free = total_pgs - pgs_in_use; + /// ``` + /// + /// Note: + /// + /// * LMDB stores all the freelists in the designated database 0 in each environment, + /// and the freelist count is stored at the beginning of the value as `libc::size_t` + /// in the native byte order. + /// + /// * It will create a read transaction to traverse the freelist database. + pub fn freelist(&self) -> Result<size_t> { + let mut freelist: size_t = 0; + let db = Database::freelist_db(); + let txn = self.begin_ro_txn()?; + let mut cursor = txn.open_ro_cursor(db)?; + + for result in cursor.iter() { + let (_key, value) = result?; + if value.len() < mem::size_of::<size_t>() { + return Err(Error::Corrupted); + } + + let s = &value[..mem::size_of::<size_t>()]; + if cfg!(target_pointer_width = "64") { + freelist += NativeEndian::read_u64(s) as size_t; + } else { + freelist += NativeEndian::read_u32(s) as size_t; + } + } + + Ok(freelist) + } + + /// Sets the size of the memory map to use for the environment. + /// + /// This could be used to resize the map when the environment is already open. + /// + /// Note: + /// + /// * No active transactions allowed when performing resizing in this process. + /// + /// * The size should be a multiple of the OS page size. Any attempt to set + /// a size smaller than the space already consumed by the environment will + /// be silently changed to the current size of the used space. + /// + /// * In the multi-process case, once a process resizes the map, other + /// processes need to either re-open the environment, or call set_map_size + /// with size 0 to update the environment. Otherwise, new transaction creation + /// will fail with `Error::MapResized`. + pub fn set_map_size(&self, size: size_t) -> Result<()> { + unsafe { lmdb_result(ffi::mdb_env_set_mapsize(self.env(), size)) } + } +} + +/// Environment statistics. +/// +/// Contains information about the size and layout of an LMDB environment or database. +pub struct Stat(ffi::MDB_stat); + +impl Stat { + /// Create a new Stat with zero'd inner struct `ffi::MDB_stat`. + pub(crate) fn new() -> Stat { + unsafe { Stat(mem::zeroed()) } + } + + /// Returns a mut pointer to `ffi::MDB_stat`. + pub(crate) fn mdb_stat(&mut self) -> *mut ffi::MDB_stat { + &mut self.0 + } +} + +impl Stat { + /// Size of a database page. This is the same for all databases in the environment. + #[inline] + pub fn page_size(&self) -> u32 { + self.0.ms_psize + } + + /// Depth (height) of the B-tree. + #[inline] + pub fn depth(&self) -> u32 { + self.0.ms_depth + } + + /// Number of internal (non-leaf) pages. + #[inline] + pub fn branch_pages(&self) -> usize { + self.0.ms_branch_pages + } + + /// Number of leaf pages. + #[inline] + pub fn leaf_pages(&self) -> usize { + self.0.ms_leaf_pages + } + + /// Number of overflow pages. + #[inline] + pub fn overflow_pages(&self) -> usize { + self.0.ms_overflow_pages + } + + /// Number of data items. + #[inline] + pub fn entries(&self) -> usize { + self.0.ms_entries + } +} + +/// Environment information. +/// +/// Contains environment information about the map size, readers, last txn id etc. +pub struct Info(ffi::MDB_envinfo); + +impl Info { + /// Size of memory map. + #[inline] + pub fn map_size(&self) -> usize { + self.0.me_mapsize + } + + /// Last used page number + #[inline] + pub fn last_pgno(&self) -> usize { + self.0.me_last_pgno + } + + /// Last transaction ID + #[inline] + pub fn last_txnid(&self) -> usize { + self.0.me_last_txnid + } + + /// Max reader slots in the environment + #[inline] + pub fn max_readers(&self) -> u32 { + self.0.me_maxreaders + } + + /// Max reader slots used in the environment + #[inline] + pub fn num_readers(&self) -> u32 { + self.0.me_numreaders + } +} + +unsafe impl Send for Environment {} +unsafe impl Sync for Environment {} + +impl fmt::Debug for Environment { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("Environment").finish() + } +} + +impl Drop for Environment { + fn drop(&mut self) { + unsafe { ffi::mdb_env_close(self.env) } + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +//// Environment Builder +/////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Options for opening or creating an environment. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct EnvironmentBuilder { + flags: EnvironmentFlags, + max_readers: Option<c_uint>, + max_dbs: Option<c_uint>, + map_size: Option<size_t>, +} + +impl EnvironmentBuilder { + /// Open an environment. + /// + /// On UNIX, the database files will be opened with 644 permissions. + /// + /// The path may not contain the null character, Windows UNC (Uniform Naming Convention) + /// paths are not supported either. + pub fn open(&self, path: &Path) -> Result<Environment> { + self.open_with_permissions(path, 0o644) + } + + /// Open an environment with the provided UNIX permissions. + /// + /// On Windows, the permissions will be ignored. + /// + /// The path may not contain the null character, Windows UNC (Uniform Naming Convention) + /// paths are not supported either. + pub fn open_with_permissions(&self, path: &Path, mode: ffi::mdb_mode_t) -> Result<Environment> { + let mut env: *mut ffi::MDB_env = ptr::null_mut(); + unsafe { + lmdb_try!(ffi::mdb_env_create(&mut env)); + if let Some(max_readers) = self.max_readers { + lmdb_try_with_cleanup!(ffi::mdb_env_set_maxreaders(env, max_readers), ffi::mdb_env_close(env)) + } + if let Some(max_dbs) = self.max_dbs { + lmdb_try_with_cleanup!(ffi::mdb_env_set_maxdbs(env, max_dbs), ffi::mdb_env_close(env)) + } + if let Some(map_size) = self.map_size { + lmdb_try_with_cleanup!(ffi::mdb_env_set_mapsize(env, map_size), ffi::mdb_env_close(env)) + } + let path = match CString::new(path.as_os_str().as_bytes()) { + Ok(path) => path, + Err(..) => return Err(::Error::Invalid), + }; + lmdb_try_with_cleanup!( + ffi::mdb_env_open(env, path.as_ptr(), self.flags.bits(), mode), + ffi::mdb_env_close(env) + ); + } + Ok(Environment { + env, + dbi_open_mutex: Mutex::new(()), + }) + } + + /// Sets the provided options in the environment. + pub fn set_flags(&mut self, flags: EnvironmentFlags) -> &mut EnvironmentBuilder { + self.flags = flags; + self + } + + /// Sets the maximum number of threads or reader slots for the environment. + /// + /// This defines the number of slots in the lock table that is used to track readers in the + /// the environment. The default is 126. Starting a read-only transaction normally ties a lock + /// table slot to the current thread until the environment closes or the thread exits. If + /// `MDB_NOTLS` is in use, `Environment::open_txn` instead ties the slot to the `Transaction` + /// object until it or the `Environment` object is destroyed. + pub fn set_max_readers(&mut self, max_readers: c_uint) -> &mut EnvironmentBuilder { + self.max_readers = Some(max_readers); + self + } + + /// Sets the maximum number of named databases for the environment. + /// + /// This function is only needed if multiple databases will be used in the + /// environment. Simpler applications that use the environment as a single + /// unnamed database can ignore this option. + /// + /// Currently a moderate number of slots are cheap but a huge number gets + /// expensive: 7-120 words per transaction, and every `Transaction::open_db` + /// does a linear search of the opened slots. + pub fn set_max_dbs(&mut self, max_dbs: c_uint) -> &mut EnvironmentBuilder { + self.max_dbs = Some(max_dbs); + self + } + + /// Sets the size of the memory map to use for the environment. + /// + /// The size should be a multiple of the OS page size. The default is + /// 1048576 bytes. The size of the memory map is also the maximum size + /// of the database. The value should be chosen as large as possible, + /// to accommodate future growth of the database. It may be increased at + /// later times. + /// + /// Any attempt to set a size smaller than the space already consumed + /// by the environment will be silently changed to the current size of the used space. + pub fn set_map_size(&mut self, map_size: size_t) -> &mut EnvironmentBuilder { + self.map_size = Some(map_size); + self + } +} + +#[cfg(test)] +mod test { + + extern crate byteorder; + + use self::byteorder::{ + ByteOrder, + LittleEndian, + }; + use tempdir::TempDir; + + use flags::*; + + use super::*; + + #[test] + fn test_open() { + let dir = TempDir::new("test").unwrap(); + + // opening non-existent env with read-only should fail + assert!(Environment::new().set_flags(EnvironmentFlags::READ_ONLY).open(dir.path()).is_err()); + + // opening non-existent env should succeed + assert!(Environment::new().open(dir.path()).is_ok()); + + // opening env with read-only should succeed + assert!(Environment::new().set_flags(EnvironmentFlags::READ_ONLY).open(dir.path()).is_ok()); + } + + #[test] + fn test_begin_txn() { + let dir = TempDir::new("test").unwrap(); + + { + // writable environment + let env = Environment::new().open(dir.path()).unwrap(); + + assert!(env.begin_rw_txn().is_ok()); + assert!(env.begin_ro_txn().is_ok()); + } + + { + // read-only environment + let env = Environment::new().set_flags(EnvironmentFlags::READ_ONLY).open(dir.path()).unwrap(); + + assert!(env.begin_rw_txn().is_err()); + assert!(env.begin_ro_txn().is_ok()); + } + } + + #[test] + fn test_open_db() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().set_max_dbs(1).open(dir.path()).unwrap(); + + assert!(env.open_db(None).is_ok()); + assert!(env.open_db(Some("testdb")).is_err()); + } + + #[test] + fn test_create_db() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().set_max_dbs(11).open(dir.path()).unwrap(); + assert!(env.open_db(Some("testdb")).is_err()); + assert!(env.create_db(Some("testdb"), DatabaseFlags::empty()).is_ok()); + assert!(env.open_db(Some("testdb")).is_ok()) + } + + #[test] + fn test_close_database() { + let dir = TempDir::new("test").unwrap(); + let mut env = Environment::new().set_max_dbs(10).open(dir.path()).unwrap(); + + let db = env.create_db(Some("db"), DatabaseFlags::empty()).unwrap(); + unsafe { + env.close_db(db); + } + assert!(env.open_db(Some("db")).is_ok()); + } + + #[test] + fn test_sync() { + let dir = TempDir::new("test").unwrap(); + { + let env = Environment::new().open(dir.path()).unwrap(); + assert!(env.sync(true).is_ok()); + } + { + let env = Environment::new().set_flags(EnvironmentFlags::READ_ONLY).open(dir.path()).unwrap(); + assert!(env.sync(true).is_err()); + } + } + + #[test] + fn test_stat() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + + // Stats should be empty initially. + let stat = env.stat().unwrap(); + assert_eq!(stat.page_size(), 4096); + assert_eq!(stat.depth(), 0); + assert_eq!(stat.branch_pages(), 0); + assert_eq!(stat.leaf_pages(), 0); + assert_eq!(stat.overflow_pages(), 0); + assert_eq!(stat.entries(), 0); + + let db = env.open_db(None).unwrap(); + + // Write a few small values. + for i in 0..64 { + let mut value = [0u8; 8]; + LittleEndian::write_u64(&mut value, i); + let mut tx = env.begin_rw_txn().expect("begin_rw_txn"); + tx.put(db, &value, &value, WriteFlags::default()).expect("tx.put"); + tx.commit().expect("tx.commit") + } + + // Stats should now reflect inserted values. + let stat = env.stat().unwrap(); + assert_eq!(stat.page_size(), 4096); + assert_eq!(stat.depth(), 1); + assert_eq!(stat.branch_pages(), 0); + assert_eq!(stat.leaf_pages(), 1); + assert_eq!(stat.overflow_pages(), 0); + assert_eq!(stat.entries(), 64); + } + + #[test] + fn test_info() { + let map_size = 1024 * 1024; + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().set_map_size(map_size).open(dir.path()).unwrap(); + + let info = env.info().unwrap(); + assert_eq!(info.map_size(), map_size); + assert_eq!(info.last_pgno(), 1); + assert_eq!(info.last_txnid(), 0); + // The default max readers is 126. + assert_eq!(info.max_readers(), 126); + assert_eq!(info.num_readers(), 0); + } + + #[test] + fn test_freelist() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + + let db = env.open_db(None).unwrap(); + let mut freelist = env.freelist().unwrap(); + assert_eq!(freelist, 0); + + // Write a few small values. + for i in 0..64 { + let mut value = [0u8; 8]; + LittleEndian::write_u64(&mut value, i); + let mut tx = env.begin_rw_txn().expect("begin_rw_txn"); + tx.put(db, &value, &value, WriteFlags::default()).expect("tx.put"); + tx.commit().expect("tx.commit") + } + let mut tx = env.begin_rw_txn().expect("begin_rw_txn"); + tx.clear_db(db).expect("clear"); + tx.commit().expect("tx.commit"); + + // Freelist should not be empty after clear_db. + freelist = env.freelist().unwrap(); + assert!(freelist > 0); + } + + #[test] + fn test_set_map_size() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + + let mut info = env.info().unwrap(); + let default_size = info.map_size(); + + // Resizing to 0 merely reloads the map size + env.set_map_size(0).unwrap(); + info = env.info().unwrap(); + assert_eq!(info.map_size(), default_size); + + env.set_map_size(2 * default_size).unwrap(); + info = env.info().unwrap(); + assert_eq!(info.map_size(), 2 * default_size); + + env.set_map_size(4 * default_size).unwrap(); + info = env.info().unwrap(); + assert_eq!(info.map_size(), 4 * default_size); + + // Decreasing is also fine if the space hasn't been consumed. + env.set_map_size(2 * default_size).unwrap(); + info = env.info().unwrap(); + assert_eq!(info.map_size(), 2 * default_size); + } +} diff --git a/third_party/rust/lmdb-rkv/src/error.rs b/third_party/rust/lmdb-rkv/src/error.rs new file mode 100644 index 0000000000..c13c970980 --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/error.rs @@ -0,0 +1,156 @@ +use libc::c_int; +use std::error::Error as StdError; +use std::ffi::CStr; +use std::os::raw::c_char; +use std::{ + fmt, + result, + str, +}; + +use ffi; + +/// An LMDB error kind. +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub enum Error { + /// key/data pair already exists. + KeyExist, + /// key/data pair not found (EOF). + NotFound, + /// Requested page not found - this usually indicates corruption. + PageNotFound, + /// Located page was wrong type. + Corrupted, + /// Update of meta page failed or environment had fatal error. + Panic, + /// Environment version mismatch. + VersionMismatch, + /// File is not a valid LMDB file. + Invalid, + /// Environment mapsize reached. + MapFull, + /// Environment maxdbs reached. + DbsFull, + /// Environment maxreaders reached. + ReadersFull, + /// Too many TLS keys in use - Windows only. + TlsFull, + /// Txn has too many dirty pages. + TxnFull, + /// Cursor stack too deep - internal error. + CursorFull, + /// Page has not enough space - internal error. + PageFull, + /// Database contents grew beyond environment mapsize. + MapResized, + /// MDB_Incompatible: Operation and DB incompatible, or DB flags changed. + Incompatible, + /// Invalid reuse of reader locktable slot. + BadRslot, + /// Transaction cannot recover - it must be aborted. + BadTxn, + /// Unsupported size of key/DB name/data, or wrong DUP_FIXED size. + BadValSize, + /// The specified DBI was changed unexpectedly. + BadDbi, + /// Other error. + Other(c_int), +} + +impl Error { + /// Converts a raw error code to an `Error`. + pub fn from_err_code(err_code: c_int) -> Error { + match err_code { + ffi::MDB_KEYEXIST => Error::KeyExist, + ffi::MDB_NOTFOUND => Error::NotFound, + ffi::MDB_PAGE_NOTFOUND => Error::PageNotFound, + ffi::MDB_CORRUPTED => Error::Corrupted, + ffi::MDB_PANIC => Error::Panic, + ffi::MDB_VERSION_MISMATCH => Error::VersionMismatch, + ffi::MDB_INVALID => Error::Invalid, + ffi::MDB_MAP_FULL => Error::MapFull, + ffi::MDB_DBS_FULL => Error::DbsFull, + ffi::MDB_READERS_FULL => Error::ReadersFull, + ffi::MDB_TLS_FULL => Error::TlsFull, + ffi::MDB_TXN_FULL => Error::TxnFull, + ffi::MDB_CURSOR_FULL => Error::CursorFull, + ffi::MDB_PAGE_FULL => Error::PageFull, + ffi::MDB_MAP_RESIZED => Error::MapResized, + ffi::MDB_INCOMPATIBLE => Error::Incompatible, + ffi::MDB_BAD_RSLOT => Error::BadRslot, + ffi::MDB_BAD_TXN => Error::BadTxn, + ffi::MDB_BAD_VALSIZE => Error::BadValSize, + ffi::MDB_BAD_DBI => Error::BadDbi, + other => Error::Other(other), + } + } + + /// Converts an `Error` to the raw error code. + #[allow(clippy::trivially_copy_pass_by_ref)] + pub fn to_err_code(&self) -> c_int { + match *self { + Error::KeyExist => ffi::MDB_KEYEXIST, + Error::NotFound => ffi::MDB_NOTFOUND, + Error::PageNotFound => ffi::MDB_PAGE_NOTFOUND, + Error::Corrupted => ffi::MDB_CORRUPTED, + Error::Panic => ffi::MDB_PANIC, + Error::VersionMismatch => ffi::MDB_VERSION_MISMATCH, + Error::Invalid => ffi::MDB_INVALID, + Error::MapFull => ffi::MDB_MAP_FULL, + Error::DbsFull => ffi::MDB_DBS_FULL, + Error::ReadersFull => ffi::MDB_READERS_FULL, + Error::TlsFull => ffi::MDB_TLS_FULL, + Error::TxnFull => ffi::MDB_TXN_FULL, + Error::CursorFull => ffi::MDB_CURSOR_FULL, + Error::PageFull => ffi::MDB_PAGE_FULL, + Error::MapResized => ffi::MDB_MAP_RESIZED, + Error::Incompatible => ffi::MDB_INCOMPATIBLE, + Error::BadRslot => ffi::MDB_BAD_RSLOT, + Error::BadTxn => ffi::MDB_BAD_TXN, + Error::BadValSize => ffi::MDB_BAD_VALSIZE, + Error::BadDbi => ffi::MDB_BAD_DBI, + Error::Other(err_code) => err_code, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl StdError for Error { + fn description(&self) -> &str { + unsafe { + // This is safe since the error messages returned from mdb_strerror are static. + let err: *const c_char = ffi::mdb_strerror(self.to_err_code()) as *const c_char; + str::from_utf8_unchecked(CStr::from_ptr(err).to_bytes()) + } + } +} + +/// An LMDB result. +pub type Result<T> = result::Result<T, Error>; + +pub fn lmdb_result(err_code: c_int) -> Result<()> { + if err_code == ffi::MDB_SUCCESS { + Ok(()) + } else { + Err(Error::from_err_code(err_code)) + } +} + +#[cfg(test)] +mod test { + + use std::error::Error as StdError; + + use super::*; + + #[test] + fn test_description() { + assert_eq!("Permission denied", Error::from_err_code(13).description()); + assert_eq!("MDB_NOTFOUND: No matching key/data pair found", Error::NotFound.description()); + } +} diff --git a/third_party/rust/lmdb-rkv/src/flags.rs b/third_party/rust/lmdb-rkv/src/flags.rs new file mode 100644 index 0000000000..c467405071 --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/flags.rs @@ -0,0 +1,173 @@ +use libc::c_uint; + +use ffi::*; + +bitflags! { + #[doc="Environment options."] + #[derive(Default)] + pub struct EnvironmentFlags: c_uint { + + #[doc="Use a fixed address for the mmap region. This flag must be specified"] + #[doc="when creating the environment, and is stored persistently in the environment."] + #[doc="If successful, the memory map will always reside at the same virtual address"] + #[doc="and pointers used to reference data items in the database will be constant"] + #[doc="across multiple invocations. This option may not always work, depending on"] + #[doc="how the operating system has allocated memory to shared libraries and other uses."] + #[doc="The feature is highly experimental."] + const FIXED_MAP = MDB_FIXEDMAP; + + #[doc="By default, LMDB creates its environment in a directory whose pathname is given in"] + #[doc="`path`, and creates its data and lock files under that directory. With this option,"] + #[doc="`path` is used as-is for the database main data file. The database lock file is the"] + #[doc="`path` with `-lock` appended."] + const NO_SUB_DIR = MDB_NOSUBDIR; + + #[doc="Use a writeable memory map unless `READ_ONLY` is set. This is faster and uses"] + #[doc="fewer mallocs, but loses protection from application bugs like wild pointer writes"] + #[doc="and other bad updates into the database. Incompatible with nested transactions."] + #[doc="Processes with and without `WRITE_MAP` on the same environment do not cooperate"] + #[doc="well."] + const WRITE_MAP = MDB_WRITEMAP; + + #[doc="Open the environment in read-only mode. No write operations will be allowed."] + #[doc="When opening an environment, LMDB will still modify the lock file - except on"] + #[doc="read-only filesystems, where LMDB does not use locks."] + const READ_ONLY = MDB_RDONLY; + + #[doc="Flush system buffers to disk only once per transaction, omit the metadata flush."] + #[doc="Defer that until the system flushes files to disk, or next non-`READ_ONLY` commit"] + #[doc="or `Environment::sync`. This optimization maintains database integrity, but a"] + #[doc="system crash may undo the last committed transaction. I.e. it preserves the ACI"] + #[doc="(atomicity, consistency, isolation) but not D (durability) database property."] + #[doc="\n\nThis flag may be changed at any time using `Environment::set_flags`."] + const NO_META_SYNC = MDB_NOMETASYNC; + + #[doc="Don't flush system buffers to disk when committing a transaction. This optimization"] + #[doc="means a system crash can corrupt the database or lose the last transactions if"] + #[doc="buffers are not yet flushed to disk. The risk is governed by how often the system"] + #[doc="flushes dirty buffers to disk and how often `Environment::sync` is called. However,"] + #[doc="if the filesystem preserves write order and the `WRITE_MAP` flag is not used,"] + #[doc="transactions exhibit ACI (atomicity, consistency, isolation) properties and only"] + #[doc="lose D (durability). I.e. database integrity is maintained, but a system"] + #[doc="crash may undo the final transactions. Note that (`NO_SYNC | WRITE_MAP`) leaves the"] + #[doc="system with no hint for when to write transactions to disk, unless"] + #[doc="`Environment::sync` is called. (`MAP_ASYNC | WRITE_MAP`) may be preferable."] + #[doc="\n\nThis flag may be changed at any time using `Environment::set_flags`."] + const NO_SYNC = MDB_NOSYNC; + + #[doc="When using `WRITE_MAP`, use asynchronous flushes to disk. As with `NO_SYNC`, a"] + #[doc="system crash can then corrupt the database or lose the last transactions. Calling"] + #[doc="`Environment::sync` ensures on-disk database integrity until next commit."] + #[doc="\n\nThis flag may be changed at any time using `Environment::set_flags`."] + const MAP_ASYNC = MDB_MAPASYNC; + + #[doc="Don't use thread-local storage. Tie reader locktable slots to transaction objects"] + #[doc="instead of to threads. I.e. `RoTransaction::reset` keeps the slot reserved for the"] + #[doc="transaction object. A thread may use parallel read-only transactions. A read-only"] + #[doc="transaction may span threads if the user synchronizes its use. Applications that"] + #[doc="multiplex many the user synchronizes its use. Applications that multiplex many user"] + #[doc="threads over individual OS threads need this option. Such an application must also"] + #[doc="serialize the write transactions in an OS thread, since LMDB's write locking is"] + #[doc="unaware of the user threads."] + const NO_TLS = MDB_NOTLS; + + #[doc="Do not do any locking. If concurrent access is anticipated, the caller must manage"] + #[doc="all concurrency themself. For proper operation the caller must enforce"] + #[doc="single-writer semantics, and must ensure that no readers are using old"] + #[doc="transactions while a writer is active. The simplest approach is to use an exclusive"] + #[doc="lock so that no readers may be active at all when a writer begins."] + const NO_LOCK = MDB_NOLOCK; + + #[doc="Turn off readahead. Most operating systems perform readahead on read requests by"] + #[doc="default. This option turns it off if the OS supports it. Turning it off may help"] + #[doc="random read performance when the DB is larger than RAM and system RAM is full."] + #[doc="The option is not implemented on Windows."] + const NO_READAHEAD = MDB_NORDAHEAD; + + #[doc="Do not initialize malloc'd memory before writing to unused spaces in the data file."] + #[doc="By default, memory for pages written to the data file is obtained using malloc."] + #[doc="While these pages may be reused in subsequent transactions, freshly malloc'd pages"] + #[doc="will be initialized to zeroes before use. This avoids persisting leftover data from"] + #[doc="other code (that used the heap and subsequently freed the memory) into the data"] + #[doc="file. Note that many other system libraries may allocate and free memory from the"] + #[doc="heap for arbitrary uses. E.g., stdio may use the heap for file I/O buffers. This"] + #[doc="initialization step has a modest performance cost so some applications may want to"] + #[doc="disable it using this flag. This option can be a problem for applications which"] + #[doc="handle sensitive data like passwords, and it makes memory checkers like Valgrind"] + #[doc="noisy. This flag is not needed with `WRITE_MAP`, which writes directly to the mmap"] + #[doc="instead of using malloc for pages. The initialization is also skipped if writing"] + #[doc="with reserve; the caller is expected to overwrite all of the memory that was"] + #[doc="reserved in that case."] + #[doc="\n\nThis flag may be changed at any time using `Environment::set_flags`."] + const NO_MEM_INIT = MDB_NOMEMINIT; + } +} + +bitflags! { + #[doc="Database options."] + #[derive(Default)] + pub struct DatabaseFlags: c_uint { + + #[doc="Keys are strings to be compared in reverse order, from the end of the strings"] + #[doc="to the beginning. By default, Keys are treated as strings and compared from"] + #[doc="beginning to end."] + const REVERSE_KEY = MDB_REVERSEKEY; + + #[doc="Duplicate keys may be used in the database. (Or, from another perspective,"] + #[doc="keys may have multiple data items, stored in sorted order.) By default"] + #[doc="keys must be unique and may have only a single data item."] + const DUP_SORT = MDB_DUPSORT; + + #[doc="Keys are binary integers in native byte order. Setting this option requires all"] + #[doc="keys to be the same size, typically 32 or 64 bits."] + const INTEGER_KEY = MDB_INTEGERKEY; + + #[doc="This flag may only be used in combination with `DUP_SORT`. This option tells"] + #[doc="the library that the data items for this database are all the same size, which"] + #[doc="allows further optimizations in storage and retrieval. When all data items are"] + #[doc="the same size, the `GET_MULTIPLE` and `NEXT_MULTIPLE` cursor operations may be"] + #[doc="used to retrieve multiple items at once."] + const DUP_FIXED = MDB_DUPFIXED; + + #[doc="This option specifies that duplicate data items are also integers, and"] + #[doc="should be sorted as such."] + const INTEGER_DUP = MDB_INTEGERDUP; + + #[doc="This option specifies that duplicate data items should be compared as strings"] + #[doc="in reverse order."] + const REVERSE_DUP = MDB_REVERSEDUP; + } +} + +bitflags! { + #[doc="Write options."] + #[derive(Default)] + pub struct WriteFlags: c_uint { + + #[doc="Insert the new item only if the key does not already appear in the database."] + #[doc="The function will return `LmdbError::KeyExist` if the key already appears in the"] + #[doc="database, even if the database supports duplicates (`DUP_SORT`)."] + const NO_OVERWRITE = MDB_NOOVERWRITE; + + #[doc="Insert the new item only if it does not already appear in the database."] + #[doc="This flag may only be specified if the database was opened with `DUP_SORT`."] + #[doc="The function will return `LmdbError::KeyExist` if the item already appears in the"] + #[doc="database."] + const NO_DUP_DATA = MDB_NODUPDATA; + + #[doc="For `Cursor::put`. Replace the item at the current cursor position. The key"] + #[doc="parameter must match the current position. If using sorted duplicates (`DUP_SORT`)"] + #[doc="the data item must still sort into the same position. This is intended to be used"] + #[doc="when the new data is the same size as the old. Otherwise it will simply perform a"] + #[doc="delete of the old record followed by an insert."] + const CURRENT = MDB_CURRENT; + + #[doc="Append the given item to the end of the database. No key comparisons are performed."] + #[doc="This option allows fast bulk loading when keys are already known to be in the"] + #[doc="correct order. Loading unsorted keys with this flag will cause data corruption."] + const APPEND = MDB_APPEND; + + #[doc="Same as `APPEND`, but for sorted dup data."] + const APPEND_DUP = MDB_APPENDDUP; + } +} diff --git a/third_party/rust/lmdb-rkv/src/lib.rs b/third_party/rust/lmdb-rkv/src/lib.rs new file mode 100644 index 0000000000..2d42fd31c6 --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/lib.rs @@ -0,0 +1,106 @@ +//! Idiomatic and safe APIs for interacting with the +//! [Lightning Memory-mapped Database (LMDB)](https://symas.com/lmdb). + +#![deny(missing_docs)] +#![doc(html_root_url = "https://docs.rs/lmdb-rkv/0.14.0")] + +extern crate byteorder; +extern crate libc; +extern crate lmdb_sys as ffi; + +#[cfg(test)] +extern crate tempdir; +#[macro_use] +extern crate bitflags; + +pub use cursor::{ + Cursor, + Iter, + IterDup, + RoCursor, + RwCursor, +}; +pub use database::Database; +pub use environment::{ + Environment, + EnvironmentBuilder, + Info, + Stat, +}; +pub use error::{ + Error, + Result, +}; +pub use flags::*; +pub use transaction::{ + InactiveTransaction, + RoTransaction, + RwTransaction, + Transaction, +}; + +macro_rules! lmdb_try { + ($expr:expr) => {{ + match $expr { + ::ffi::MDB_SUCCESS => (), + err_code => return Err(::Error::from_err_code(err_code)), + } + }}; +} + +macro_rules! lmdb_try_with_cleanup { + ($expr:expr, $cleanup:expr) => {{ + match $expr { + ::ffi::MDB_SUCCESS => (), + err_code => { + let _ = $cleanup; + return Err(::Error::from_err_code(err_code)); + }, + } + }}; +} + +mod cursor; +mod database; +mod environment; +mod error; +mod flags; +mod transaction; + +#[cfg(test)] +mod test_utils { + + use byteorder::{ + ByteOrder, + LittleEndian, + }; + use tempdir::TempDir; + + use super::*; + + /// Regression test for https://github.com/danburkert/lmdb-rs/issues/21. + /// This test reliably segfaults when run against lmbdb compiled with opt level -O3 and newer + /// GCC compilers. + #[test] + fn issue_21_regression() { + const HEIGHT_KEY: [u8; 1] = [0]; + + let dir = TempDir::new("test").unwrap(); + + let env = { + let mut builder = Environment::new(); + builder.set_max_dbs(2); + builder.set_map_size(1_000_000); + builder.open(dir.path()).expect("open lmdb env") + }; + let index = env.create_db(None, DatabaseFlags::DUP_SORT).expect("open index db"); + + for height in 0..1000 { + let mut value = [0u8; 8]; + LittleEndian::write_u64(&mut value, height); + let mut tx = env.begin_rw_txn().expect("begin_rw_txn"); + tx.put(index, &HEIGHT_KEY, &value, WriteFlags::empty()).expect("tx.put"); + tx.commit().expect("tx.commit") + } + } +} diff --git a/third_party/rust/lmdb-rkv/src/transaction.rs b/third_party/rust/lmdb-rkv/src/transaction.rs new file mode 100644 index 0000000000..a1d01f86cc --- /dev/null +++ b/third_party/rust/lmdb-rkv/src/transaction.rs @@ -0,0 +1,773 @@ +use libc::{ + c_uint, + c_void, + size_t, +}; +use std::marker::PhantomData; +use std::{ + fmt, + mem, + ptr, + result, + slice, +}; + +use ffi; + +use cursor::{ + RoCursor, + RwCursor, +}; +use database::Database; +use environment::{ + Environment, + Stat, +}; +use error::{ + lmdb_result, + Error, + Result, +}; +use flags::{ + DatabaseFlags, + EnvironmentFlags, + WriteFlags, +}; + +/// An LMDB transaction. +/// +/// All database operations require a transaction. +pub trait Transaction: Sized { + /// Returns a raw pointer to the underlying LMDB transaction. + /// + /// The caller **must** ensure that the pointer is not used after the + /// lifetime of the transaction. + fn txn(&self) -> *mut ffi::MDB_txn; + + /// Commits the transaction. + /// + /// Any pending operations will be saved. + fn commit(self) -> Result<()> { + unsafe { + let result = lmdb_result(ffi::mdb_txn_commit(self.txn())); + mem::forget(self); + result + } + } + + /// Aborts the transaction. + /// + /// Any pending operations will not be saved. + fn abort(self) { + // Abort should be performed in transaction destructors. + } + + /// Opens a database in the transaction. + /// + /// If `name` is `None`, then the default database will be opened, otherwise + /// a named database will be opened. The database handle will be private to + /// the transaction until the transaction is successfully committed. If the + /// transaction is aborted the returned database handle should no longer be + /// used. + /// + /// Prefer using `Environment::open_db`. + /// + /// ## Safety + /// + /// This function (as well as `Environment::open_db`, + /// `Environment::create_db`, and `Database::create`) **must not** be called + /// from multiple concurrent transactions in the same environment. A + /// transaction which uses this function must finish (either commit or + /// abort) before any other transaction may use this function. + unsafe fn open_db(&self, name: Option<&str>) -> Result<Database> { + Database::new(self.txn(), name, 0) + } + + /// Gets an item from a database. + /// + /// This function retrieves the data associated with the given key in the + /// database. If the database supports duplicate keys + /// (`DatabaseFlags::DUP_SORT`) then the first data item for the key will be + /// returned. Retrieval of other items requires the use of + /// `Transaction::cursor_get`. If the item is not in the database, then + /// `Error::NotFound` will be returned. + fn get<'txn, K>(&'txn self, database: Database, key: &K) -> Result<&'txn [u8]> + where + K: AsRef<[u8]>, + { + let key = key.as_ref(); + let mut key_val: ffi::MDB_val = ffi::MDB_val { + mv_size: key.len() as size_t, + mv_data: key.as_ptr() as *mut c_void, + }; + let mut data_val: ffi::MDB_val = ffi::MDB_val { + mv_size: 0, + mv_data: ptr::null_mut(), + }; + unsafe { + match ffi::mdb_get(self.txn(), database.dbi(), &mut key_val, &mut data_val) { + ffi::MDB_SUCCESS => Ok(slice::from_raw_parts(data_val.mv_data as *const u8, data_val.mv_size as usize)), + err_code => Err(Error::from_err_code(err_code)), + } + } + } + + /// Open a new read-only cursor on the given database. + fn open_ro_cursor<'txn>(&'txn self, db: Database) -> Result<RoCursor<'txn>> { + RoCursor::new(self, db) + } + + /// Gets the option flags for the given database in the transaction. + fn db_flags(&self, db: Database) -> Result<DatabaseFlags> { + let mut flags: c_uint = 0; + unsafe { + lmdb_result(ffi::mdb_dbi_flags(self.txn(), db.dbi(), &mut flags))?; + } + Ok(DatabaseFlags::from_bits_truncate(flags)) + } + + /// Retrieves database statistics. + fn stat(&self, db: Database) -> Result<Stat> { + unsafe { + let mut stat = Stat::new(); + lmdb_try!(ffi::mdb_stat(self.txn(), db.dbi(), stat.mdb_stat())); + Ok(stat) + } + } +} + +/// An LMDB read-only transaction. +pub struct RoTransaction<'env> { + txn: *mut ffi::MDB_txn, + _marker: PhantomData<&'env ()>, +} + +impl<'env> fmt::Debug for RoTransaction<'env> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("RoTransaction").finish() + } +} + +impl<'env> Drop for RoTransaction<'env> { + fn drop(&mut self) { + unsafe { ffi::mdb_txn_abort(self.txn) } + } +} + +impl<'env> RoTransaction<'env> { + /// Creates a new read-only transaction in the given environment. Prefer + /// using `Environment::begin_ro_txn`. + pub(crate) fn new(env: &'env Environment) -> Result<RoTransaction<'env>> { + let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); + unsafe { + lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), ffi::MDB_RDONLY, &mut txn))?; + Ok(RoTransaction { + txn, + _marker: PhantomData, + }) + } + } + + /// Resets the read-only transaction. + /// + /// Abort the transaction like `Transaction::abort`, but keep the + /// transaction handle. `InactiveTransaction::renew` may reuse the handle. + /// This saves allocation overhead if the process will start a new read-only + /// transaction soon, and also locking overhead if + /// `EnvironmentFlags::NO_TLS` is in use. The reader table lock is released, + /// but the table slot stays tied to its thread or transaction. Reader locks + /// generally don't interfere with writers, but they keep old versions of + /// database pages allocated. Thus they prevent the old pages from being + /// reused when writers commit new data, and so under heavy load the + /// database size may grow much more rapidly than otherwise. + pub fn reset(self) -> InactiveTransaction<'env> { + let txn = self.txn; + unsafe { + mem::forget(self); + ffi::mdb_txn_reset(txn) + }; + InactiveTransaction { + txn, + _marker: PhantomData, + } + } +} + +impl<'env> Transaction for RoTransaction<'env> { + fn txn(&self) -> *mut ffi::MDB_txn { + self.txn + } +} + +/// An inactive read-only transaction. +pub struct InactiveTransaction<'env> { + txn: *mut ffi::MDB_txn, + _marker: PhantomData<&'env ()>, +} + +impl<'env> fmt::Debug for InactiveTransaction<'env> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("InactiveTransaction").finish() + } +} + +impl<'env> Drop for InactiveTransaction<'env> { + fn drop(&mut self) { + unsafe { ffi::mdb_txn_abort(self.txn) } + } +} + +impl<'env> InactiveTransaction<'env> { + /// Renews the inactive transaction, returning an active read-only + /// transaction. + /// + /// This acquires a new reader lock for a transaction handle that had been + /// released by `RoTransaction::reset`. + pub fn renew(self) -> Result<RoTransaction<'env>> { + let txn = self.txn; + unsafe { + mem::forget(self); + lmdb_result(ffi::mdb_txn_renew(txn))? + }; + Ok(RoTransaction { + txn, + _marker: PhantomData, + }) + } +} + +/// An LMDB read-write transaction. +pub struct RwTransaction<'env> { + txn: *mut ffi::MDB_txn, + _marker: PhantomData<&'env ()>, +} + +impl<'env> fmt::Debug for RwTransaction<'env> { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + f.debug_struct("RwTransaction").finish() + } +} + +impl<'env> Drop for RwTransaction<'env> { + fn drop(&mut self) { + unsafe { ffi::mdb_txn_abort(self.txn) } + } +} + +impl<'env> RwTransaction<'env> { + /// Creates a new read-write transaction in the given environment. Prefer + /// using `Environment::begin_ro_txn`. + pub(crate) fn new(env: &'env Environment) -> Result<RwTransaction<'env>> { + let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); + unsafe { + lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), EnvironmentFlags::empty().bits(), &mut txn))?; + Ok(RwTransaction { + txn, + _marker: PhantomData, + }) + } + } + + /// Opens a database in the provided transaction, creating it if necessary. + /// + /// If `name` is `None`, then the default database will be opened, otherwise + /// a named database will be opened. The database handle will be private to + /// the transaction until the transaction is successfully committed. If the + /// transaction is aborted the returned database handle should no longer be + /// used. + /// + /// Prefer using `Environment::create_db`. + /// + /// ## Safety + /// + /// This function (as well as `Environment::open_db`, + /// `Environment::create_db`, and `Database::open`) **must not** be called + /// from multiple concurrent transactions in the same environment. A + /// transaction which uses this function must finish (either commit or + /// abort) before any other transaction may use this function. + pub unsafe fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> { + Database::new(self.txn(), name, flags.bits() | ffi::MDB_CREATE) + } + + /// Opens a new read-write cursor on the given database and transaction. + pub fn open_rw_cursor<'txn>(&'txn mut self, db: Database) -> Result<RwCursor<'txn>> { + RwCursor::new(self, db) + } + + /// Stores an item into a database. + /// + /// This function stores key/data pairs in the database. The default + /// behavior is to enter the new key/data pair, replacing any previously + /// existing key if duplicates are disallowed, or adding a duplicate data + /// item if duplicates are allowed (`DatabaseFlags::DUP_SORT`). + pub fn put<K, D>(&mut self, database: Database, key: &K, data: &D, flags: WriteFlags) -> Result<()> + where + K: AsRef<[u8]>, + D: AsRef<[u8]>, + { + let key = key.as_ref(); + let data = data.as_ref(); + let mut key_val: ffi::MDB_val = ffi::MDB_val { + mv_size: key.len() as size_t, + mv_data: key.as_ptr() as *mut c_void, + }; + let mut data_val: ffi::MDB_val = ffi::MDB_val { + mv_size: data.len() as size_t, + mv_data: data.as_ptr() as *mut c_void, + }; + unsafe { lmdb_result(ffi::mdb_put(self.txn(), database.dbi(), &mut key_val, &mut data_val, flags.bits())) } + } + + /// Returns a buffer which can be used to write a value into the item at the + /// given key and with the given length. The buffer must be completely + /// filled by the caller. + pub fn reserve<'txn, K>( + &'txn mut self, + database: Database, + key: &K, + len: size_t, + flags: WriteFlags, + ) -> Result<&'txn mut [u8]> + where + K: AsRef<[u8]>, + { + let key = key.as_ref(); + let mut key_val: ffi::MDB_val = ffi::MDB_val { + mv_size: key.len() as size_t, + mv_data: key.as_ptr() as *mut c_void, + }; + let mut data_val: ffi::MDB_val = ffi::MDB_val { + mv_size: len, + mv_data: ptr::null_mut::<c_void>(), + }; + unsafe { + lmdb_result(ffi::mdb_put( + self.txn(), + database.dbi(), + &mut key_val, + &mut data_val, + flags.bits() | ffi::MDB_RESERVE, + ))?; + Ok(slice::from_raw_parts_mut(data_val.mv_data as *mut u8, data_val.mv_size as usize)) + } + } + + /// Deletes an item from a database. + /// + /// This function removes key/data pairs from the database. If the database + /// does not support sorted duplicate data items (`DatabaseFlags::DUP_SORT`) + /// the data parameter is ignored. If the database supports sorted + /// duplicates and the data parameter is `None`, all of the duplicate data + /// items for the key will be deleted. Otherwise, if the data parameter is + /// `Some` only the matching data item will be deleted. This function will + /// return `Error::NotFound` if the specified key/data pair is not in the + /// database. + pub fn del<K>(&mut self, database: Database, key: &K, data: Option<&[u8]>) -> Result<()> + where + K: AsRef<[u8]>, + { + let key = key.as_ref(); + let mut key_val: ffi::MDB_val = ffi::MDB_val { + mv_size: key.len() as size_t, + mv_data: key.as_ptr() as *mut c_void, + }; + let data_val: Option<ffi::MDB_val> = data.map(|data| ffi::MDB_val { + mv_size: data.len() as size_t, + mv_data: data.as_ptr() as *mut c_void, + }); + + if let Some(mut d) = data_val { + unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, &mut d)) } + } else { + unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, ptr::null_mut())) } + } + } + + /// Empties the given database. All items will be removed. + pub fn clear_db(&mut self, db: Database) -> Result<()> { + unsafe { lmdb_result(ffi::mdb_drop(self.txn(), db.dbi(), 0)) } + } + + /// Drops the database from the environment. + /// + /// ## Safety + /// + /// This method is unsafe in the same ways as `Environment::close_db`, and + /// should be used accordingly. + pub unsafe fn drop_db(&mut self, db: Database) -> Result<()> { + lmdb_result(ffi::mdb_drop(self.txn, db.dbi(), 1)) + } + + /// Begins a new nested transaction inside of this transaction. + pub fn begin_nested_txn<'txn>(&'txn mut self) -> Result<RwTransaction<'txn>> { + let mut nested: *mut ffi::MDB_txn = ptr::null_mut(); + unsafe { + let env: *mut ffi::MDB_env = ffi::mdb_txn_env(self.txn()); + ffi::mdb_txn_begin(env, self.txn(), 0, &mut nested); + } + Ok(RwTransaction { + txn: nested, + _marker: PhantomData, + }) + } +} + +impl<'env> Transaction for RwTransaction<'env> { + fn txn(&self) -> *mut ffi::MDB_txn { + self.txn + } +} + +#[cfg(test)] +mod test { + + use std::io::Write; + use std::sync::{ + Arc, + Barrier, + }; + use std::thread::{ + self, + JoinHandle, + }; + + use tempdir::TempDir; + + use super::*; + use cursor::Cursor; + use error::*; + use flags::*; + + #[test] + fn test_put_get_del() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + assert_eq!(b"val1", txn.get(db, b"key1").unwrap()); + assert_eq!(b"val2", txn.get(db, b"key2").unwrap()); + assert_eq!(b"val3", txn.get(db, b"key3").unwrap()); + assert_eq!(txn.get(db, b"key"), Err(Error::NotFound)); + + txn.del(db, b"key1", None).unwrap(); + assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound)); + } + + #[test] + fn test_put_get_del_multi() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn = env.begin_rw_txn().unwrap(); + { + let mut cur = txn.open_ro_cursor(db).unwrap(); + let iter = cur.iter_dup_of(b"key1"); + let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>(); + assert_eq!(vals, vec![b"val1", b"val2", b"val3"]); + } + txn.commit().unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.del(db, b"key1", Some(b"val2")).unwrap(); + txn.del(db, b"key2", None).unwrap(); + txn.commit().unwrap(); + + let txn = env.begin_rw_txn().unwrap(); + { + let mut cur = txn.open_ro_cursor(db).unwrap(); + let iter = cur.iter_dup_of(b"key1"); + let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>(); + assert_eq!(vals, vec![b"val1", b"val3"]); + + let iter = cur.iter_dup_of(b"key2"); + assert_eq!(0, iter.count()); + } + txn.commit().unwrap(); + } + + #[test] + fn test_reserve() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + { + let mut writer = txn.reserve(db, b"key1", 4, WriteFlags::empty()).unwrap(); + writer.write_all(b"val1").unwrap(); + } + txn.commit().unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + assert_eq!(b"val1", txn.get(db, b"key1").unwrap()); + assert_eq!(txn.get(db, b"key"), Err(Error::NotFound)); + + txn.del(db, b"key1", None).unwrap(); + assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound)); + } + + #[test] + fn test_inactive_txn() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + { + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + let txn = env.begin_ro_txn().unwrap(); + let inactive = txn.reset(); + let active = inactive.renew().unwrap(); + assert!(active.get(db, b"key").is_ok()); + } + + #[test] + fn test_nested_txn() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + + { + let mut nested = txn.begin_nested_txn().unwrap(); + nested.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + assert_eq!(nested.get(db, b"key1").unwrap(), b"val1"); + assert_eq!(nested.get(db, b"key2").unwrap(), b"val2"); + } + + assert_eq!(txn.get(db, b"key1").unwrap(), b"val1"); + assert_eq!(txn.get(db, b"key2"), Err(Error::NotFound)); + } + + #[test] + fn test_clear_db() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.open_db(None).unwrap(); + + { + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + { + let mut txn = env.begin_rw_txn().unwrap(); + txn.clear_db(db).unwrap(); + txn.commit().unwrap(); + } + + let txn = env.begin_ro_txn().unwrap(); + assert_eq!(txn.get(db, b"key"), Err(Error::NotFound)); + } + + #[test] + fn test_drop_db() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().set_max_dbs(2).open(dir.path()).unwrap(); + let db = env.create_db(Some("test"), DatabaseFlags::empty()).unwrap(); + + { + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + { + let mut txn = env.begin_rw_txn().unwrap(); + unsafe { + txn.drop_db(db).unwrap(); + } + txn.commit().unwrap(); + } + + assert_eq!(env.open_db(Some("test")), Err(Error::NotFound)); + } + + #[test] + fn test_concurrent_readers_single_writer() { + let dir = TempDir::new("test").unwrap(); + let env: Arc<Environment> = Arc::new(Environment::new().open(dir.path()).unwrap()); + + let n = 10usize; // Number of concurrent readers + let barrier = Arc::new(Barrier::new(n + 1)); + let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n); + + let key = b"key"; + let val = b"val"; + + for _ in 0..n { + let reader_env = env.clone(); + let reader_barrier = barrier.clone(); + + threads.push(thread::spawn(move || { + let db = reader_env.open_db(None).unwrap(); + { + let txn = reader_env.begin_ro_txn().unwrap(); + assert_eq!(txn.get(db, key), Err(Error::NotFound)); + txn.abort(); + } + reader_barrier.wait(); + reader_barrier.wait(); + { + let txn = reader_env.begin_ro_txn().unwrap(); + txn.get(db, key).unwrap() == val + } + })); + } + + let db = env.open_db(None).unwrap(); + let mut txn = env.begin_rw_txn().unwrap(); + barrier.wait(); + txn.put(db, key, val, WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + barrier.wait(); + + assert!(threads.into_iter().all(|b| b.join().unwrap())) + } + + #[test] + fn test_concurrent_writers() { + let dir = TempDir::new("test").unwrap(); + let env = Arc::new(Environment::new().open(dir.path()).unwrap()); + + let n = 10usize; // Number of concurrent writers + let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n); + + let key = "key"; + let val = "val"; + + for i in 0..n { + let writer_env = env.clone(); + + threads.push(thread::spawn(move || { + let db = writer_env.open_db(None).unwrap(); + let mut txn = writer_env.begin_rw_txn().unwrap(); + txn.put(db, &format!("{}{}", key, i), &format!("{}{}", val, i), WriteFlags::empty()).unwrap(); + txn.commit().is_ok() + })); + } + assert!(threads.into_iter().all(|b| b.join().unwrap())); + + let db = env.open_db(None).unwrap(); + let txn = env.begin_ro_txn().unwrap(); + + for i in 0..n { + assert_eq!(format!("{}{}", val, i).as_bytes(), txn.get(db, &format!("{}{}", key, i)).unwrap()); + } + } + + #[test] + fn test_stat() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::empty()).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 3); + } + + let mut txn = env.begin_rw_txn().unwrap(); + txn.del(db, b"key1", None).unwrap(); + txn.del(db, b"key2", None).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 1); + } + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key4", b"val4", WriteFlags::empty()).unwrap(); + txn.put(db, b"key5", b"val5", WriteFlags::empty()).unwrap(); + txn.put(db, b"key6", b"val6", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 4); + } + } + + #[test] + fn test_stat_dupsort() { + let dir = TempDir::new("test").unwrap(); + let env = Environment::new().open(dir.path()).unwrap(); + let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap(); + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 9); + } + + let mut txn = env.begin_rw_txn().unwrap(); + txn.del(db, b"key1", Some(b"val2")).unwrap(); + txn.del(db, b"key2", None).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 5); + } + + let mut txn = env.begin_rw_txn().unwrap(); + txn.put(db, b"key4", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key4", b"val2", WriteFlags::empty()).unwrap(); + txn.put(db, b"key4", b"val3", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + { + let txn = env.begin_ro_txn().unwrap(); + let stat = txn.stat(db).unwrap(); + assert_eq!(stat.entries(), 8); + } + } +} |