diff options
Diffstat (limited to 'third_party/rust/rkv/src/backend')
25 files changed, 3288 insertions, 0 deletions
diff --git a/third_party/rust/rkv/src/backend/common.rs b/third_party/rust/rkv/src/backend/common.rs new file mode 100644 index 0000000000..bea3839d03 --- /dev/null +++ b/third_party/rust/rkv/src/backend/common.rs @@ -0,0 +1,44 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(non_camel_case_types)] + +pub enum EnvironmentFlags { + FIXED_MAP, + NO_SUB_DIR, + WRITE_MAP, + READ_ONLY, + NO_META_SYNC, + NO_SYNC, + MAP_ASYNC, + NO_TLS, + NO_LOCK, + NO_READAHEAD, + NO_MEM_INIT, +} + +pub enum DatabaseFlags { + REVERSE_KEY, + #[cfg(feature = "db-dup-sort")] + DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DUP_FIXED, + #[cfg(feature = "db-int-key")] + INTEGER_KEY, + INTEGER_DUP, + REVERSE_DUP, +} + +pub enum WriteFlags { + NO_OVERWRITE, + NO_DUP_DATA, + CURRENT, + APPEND, + APPEND_DUP, +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb.rs b/third_party/rust/rkv/src/backend/impl_lmdb.rs new file mode 100644 index 0000000000..5364214598 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb.rs @@ -0,0 +1,34 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod arch_migrator; +mod arch_migrator_error; +mod cursor; +mod database; +mod environment; +mod error; +mod flags; +mod info; +mod iter; +mod stat; +mod transaction; + +pub use arch_migrator::{ + MigrateError as ArchMigrateError, MigrateResult as ArchMigrateResult, Migrator as ArchMigrator, +}; +pub use cursor::{RoCursorImpl, RwCursorImpl}; +pub use database::DatabaseImpl; +pub use environment::{EnvironmentBuilderImpl, EnvironmentImpl}; +pub use error::ErrorImpl; +pub use flags::{DatabaseFlagsImpl, EnvironmentFlagsImpl, WriteFlagsImpl}; +pub use info::InfoImpl; +pub use iter::IterImpl; +pub use stat::StatImpl; +pub use transaction::{RoTransactionImpl, RwTransactionImpl}; diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs new file mode 100644 index 0000000000..6a91db1a3b --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs @@ -0,0 +1,998 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(dead_code)] // TODO: Get rid of unused struct members +#![allow(clippy::upper_case_acronyms)] // TODO: Consider renaming things like `BRANCH` + +//! A utility for migrating data from one LMDB environment to another. Notably, this tool +//! can migrate data from an enviroment created with a different bit-depth than the +//! current rkv consumer, which enables the consumer to retrieve data from an environment +//! that can't be read directly using the rkv APIs. +//! +//! The utility supports both 32-bit and 64-bit LMDB source environments, and it +//! automatically migrates data in both the default database and any named (sub) +//! databases. It also migrates the source environment's "map size" and "max DBs" +//! configuration options to the destination environment. +//! +//! The destination environment must be at the rkv consumer's bit depth and should be +//! empty of data. It can be an empty directory, in which case the utility will create a +//! new LMDB environment within the directory. +//! +//! The tool currently has these limitations: +//! +//! 1. It doesn't support migration from environments created with +//! `EnvironmentFlags::NO_SUB_DIR`. To migrate such an environment, create a +//! temporary directory, copy the environment's data file to a file called data.mdb in +//! the temporary directory, then migrate the temporary directory as the source +//! environment. +//! 2. It doesn't support migration from databases created with DatabaseFlags::DUP_SORT` +//! (with or without `DatabaseFlags::DUP_FIXED`). +//! 3. It doesn't account for existing data in the destination environment, which means +//! that it can overwrite data (causing data loss) or fail to migrate data if the +//! destination environment contains existing data. +//! +//! ## Basic Usage +//! +//! Call `Migrator::new()` with the path to the source environment to create a `Migrator` +//! instance; then call the instance's `migrate()` method with the path to the destination +//! environment to migrate data from the source to the destination environment. For +//! example, this snippet migrates data from the tests/envs/ref_env_32 environment to a +//! new environment in a temporary directory: +//! +//! ``` +//! use rkv::migrator::LmdbArchMigrator as Migrator; +//! use std::path::Path; +//! use tempfile::tempdir; +//! let mut migrator = Migrator::new(Path::new("tests/envs/ref_env_32")).unwrap(); +//! migrator.migrate(&tempdir().unwrap().path()).unwrap(); +//! ``` +//! +//! Both `Migrator::new()` and `migrate()` return a `MigrateResult` that is either an +//! `Ok()` result or an `Err<MigrateError>`, where `MigrateError` is an enum whose +//! variants identify specific kinds of migration failures. + +use std::{ + collections::{BTreeMap, HashMap}, + convert::TryFrom, + fs::File, + io::{Cursor, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + rc::Rc, + str, +}; + +use bitflags::bitflags; +use byteorder::{LittleEndian, ReadBytesExt}; +use lmdb::{DatabaseFlags, Environment, Transaction, WriteFlags}; + +pub use super::arch_migrator_error::MigrateError; + +const PAGESIZE: u16 = 4096; + +// The magic number is 0xBEEFC0DE, which is 0xDEC0EFBE in little-endian. It appears at +// offset 12 on 32-bit systems and 16 on 64-bit systems. We don't support big-endian +// migration, but presumably we could do so by detecting the order of the bytes. +const MAGIC: [u8; 4] = [0xDE, 0xC0, 0xEF, 0xBE]; + +pub type MigrateResult<T> = Result<T, MigrateError>; + +bitflags! { + #[derive(Default)] + struct PageFlags: u16 { + const BRANCH = 0x01; + const LEAF = 0x02; + const OVERFLOW = 0x04; + const META = 0x08; + const DIRTY = 0x10; + const LEAF2 = 0x20; + const SUBP = 0x40; + const LOOSE = 0x4000; + const KEEP = 0x8000; + } +} + +bitflags! { + #[derive(Default)] + struct NodeFlags: u16 { + const BIGDATA = 0x01; + const SUBDATA = 0x02; + const DUPDATA = 0x04; + } +} + +// The bit depth of the executable that created an LMDB environment. The Migrator +// determines this automatically based on the location of the magic number in data.mdb. +#[derive(Clone, Copy, PartialEq)] +enum Bits { + U32, + U64, +} + +impl Bits { + // The size of usize for the bit-depth represented by the enum variant. + fn size(self) -> usize { + match self { + Bits::U32 => 4, + Bits::U64 => 8, + } + } +} + +// The equivalent of PAGEHDRSZ in LMDB, except that this one varies by bits. +fn page_header_size(bits: Bits) -> u64 { + match bits { + Bits::U32 => 12, + Bits::U64 => 16, + } +} + +// The equivalent of P_INVALID in LMDB, except that this one varies by bits. +fn validate_page_num(page_num: u64, bits: Bits) -> MigrateResult<()> { + let invalid_page_num = match bits { + Bits::U32 => u64::from(!0u32), + Bits::U64 => !0u64, + }; + + if page_num == invalid_page_num { + return Err(MigrateError::InvalidPageNum); + } + + Ok(()) +} + +#[derive(Clone, Debug, Default)] +struct Database { + md_pad: u32, + md_flags: DatabaseFlags, + md_depth: u16, + md_branch_pages: u64, + md_leaf_pages: u64, + md_overflow_pages: u64, + md_entries: u64, + md_root: u64, +} + +impl Database { + fn new(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<Database> { + Ok(Database { + md_pad: cursor.read_u32::<LittleEndian>()?, + md_flags: DatabaseFlags::from_bits(cursor.read_u16::<LittleEndian>()?.into()) + .ok_or(MigrateError::InvalidDatabaseBits)?, + md_depth: cursor.read_u16::<LittleEndian>()?, + md_branch_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_leaf_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_overflow_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_entries: cursor.read_uint::<LittleEndian>(bits.size())?, + md_root: cursor.read_uint::<LittleEndian>(bits.size())?, + }) + } +} + +#[derive(Debug, Default)] +struct Databases { + free: Database, + main: Database, +} + +#[derive(Debug, Default)] +struct MetaData { + mm_magic: u32, + mm_version: u32, + mm_address: u64, + mm_mapsize: u64, + mm_dbs: Databases, + mm_last_pg: u64, + mm_txnid: u64, +} + +#[derive(Debug)] +enum LeafNode { + Regular { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + value: Vec<u8>, + }, + BigData { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + overflow_pgno: u64, + }, + SubData { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + value: Vec<u8>, + db: Database, + }, +} + +#[derive(Debug, Default)] +struct BranchNode { + mp_pgno: u64, + mn_ksize: u16, + mn_data: Vec<u8>, +} + +#[derive(Debug)] +enum PageHeader { + Regular { + mp_pgno: u64, + mp_flags: PageFlags, + pb_lower: u16, + pb_upper: u16, + }, + Overflow { + mp_pgno: u64, + mp_flags: PageFlags, + pb_pages: u32, + }, +} + +#[derive(Debug)] +enum Page { + META(MetaData), + LEAF(Vec<LeafNode>), + BRANCH(Vec<BranchNode>), +} + +impl Page { + fn new(buf: Vec<u8>, bits: Bits) -> MigrateResult<Page> { + let mut cursor = std::io::Cursor::new(&buf[..]); + + match Self::parse_page_header(&mut cursor, bits)? { + PageHeader::Regular { + mp_flags, pb_lower, .. + } => { + if mp_flags.contains(PageFlags::LEAF2) || mp_flags.contains(PageFlags::SUBP) { + // We don't yet support DUPFIXED and DUPSORT databases. + return Err(MigrateError::UnsupportedPageHeaderVariant); + } + + if mp_flags.contains(PageFlags::META) { + let meta_data = Self::parse_meta_data(&mut cursor, bits)?; + Ok(Page::META(meta_data)) + } else if mp_flags.contains(PageFlags::LEAF) { + let nodes = Self::parse_leaf_nodes(&mut cursor, pb_lower, bits)?; + Ok(Page::LEAF(nodes)) + } else if mp_flags.contains(PageFlags::BRANCH) { + let nodes = Self::parse_branch_nodes(&mut cursor, pb_lower, bits)?; + Ok(Page::BRANCH(nodes)) + } else { + Err(MigrateError::UnexpectedPageHeaderVariant) + } + } + PageHeader::Overflow { .. } => { + // There isn't anything to do, nor should we try to instantiate + // a page of this type, as we only access them when reading + // a value that is too large to fit into a leaf node. + Err(MigrateError::UnexpectedPageHeaderVariant) + } + } + } + + fn parse_page_header(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<PageHeader> { + let mp_pgno = cursor.read_uint::<LittleEndian>(bits.size())?; + let _mp_pad = cursor.read_u16::<LittleEndian>()?; + let mp_flags = PageFlags::from_bits(cursor.read_u16::<LittleEndian>()?) + .ok_or(MigrateError::InvalidPageBits)?; + + if mp_flags.contains(PageFlags::OVERFLOW) { + let pb_pages = cursor.read_u32::<LittleEndian>()?; + Ok(PageHeader::Overflow { + mp_pgno, + mp_flags, + pb_pages, + }) + } else { + let pb_lower = cursor.read_u16::<LittleEndian>()?; + let pb_upper = cursor.read_u16::<LittleEndian>()?; + Ok(PageHeader::Regular { + mp_pgno, + mp_flags, + pb_lower, + pb_upper, + }) + } + } + + fn parse_meta_data(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<MetaData> { + cursor.seek(SeekFrom::Start(page_header_size(bits)))?; + + Ok(MetaData { + mm_magic: cursor.read_u32::<LittleEndian>()?, + mm_version: cursor.read_u32::<LittleEndian>()?, + mm_address: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_mapsize: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_dbs: Databases { + free: Database::new(cursor, bits)?, + main: Database::new(cursor, bits)?, + }, + mm_last_pg: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_txnid: cursor.read_uint::<LittleEndian>(bits.size())?, + }) + } + + fn parse_leaf_nodes( + cursor: &mut Cursor<&[u8]>, + pb_lower: u16, + bits: Bits, + ) -> MigrateResult<Vec<LeafNode>> { + cursor.set_position(page_header_size(bits)); + let num_keys = Self::num_keys(pb_lower, bits); + let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?; + + let mut leaf_nodes = Vec::with_capacity(num_keys as usize); + + for mp_ptr in mp_ptrs { + cursor.set_position(u64::from(mp_ptr)); + leaf_nodes.push(Self::parse_leaf_node(cursor, bits)?); + } + + Ok(leaf_nodes) + } + + fn parse_leaf_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<LeafNode> { + // The order of the mn_lo and mn_hi fields is endian-dependent and would be + // reversed in an LMDB environment created on a big-endian system. + let mn_lo = cursor.read_u16::<LittleEndian>()?; + let mn_hi = cursor.read_u16::<LittleEndian>()?; + + let mn_flags = NodeFlags::from_bits(cursor.read_u16::<LittleEndian>()?) + .ok_or(MigrateError::InvalidNodeBits)?; + let mn_ksize = cursor.read_u16::<LittleEndian>()?; + + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mn_ksize))?; + let key = cursor.get_ref()[start..end].to_vec(); + cursor.set_position(end as u64); + + let mv_size = Self::leaf_node_size(mn_lo, mn_hi); + if mn_flags.contains(NodeFlags::BIGDATA) { + let overflow_pgno = cursor.read_uint::<LittleEndian>(bits.size())?; + Ok(LeafNode::BigData { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + overflow_pgno, + }) + } else if mn_flags.contains(NodeFlags::SUBDATA) { + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mv_size))?; + let value = cursor.get_ref()[start..end].to_vec(); + let mut cursor = std::io::Cursor::new(&value[..]); + let db = Database::new(&mut cursor, bits)?; + validate_page_num(db.md_root, bits)?; + Ok(LeafNode::SubData { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + value, + db, + }) + } else { + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mv_size))?; + let value = cursor.get_ref()[start..end].to_vec(); + Ok(LeafNode::Regular { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + value, + }) + } + } + + fn leaf_node_size(mn_lo: u16, mn_hi: u16) -> u32 { + u32::from(mn_lo) + ((u32::from(mn_hi)) << 16) + } + + fn parse_branch_nodes( + cursor: &mut Cursor<&[u8]>, + pb_lower: u16, + bits: Bits, + ) -> MigrateResult<Vec<BranchNode>> { + let num_keys = Self::num_keys(pb_lower, bits); + let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?; + + let mut branch_nodes = Vec::with_capacity(num_keys as usize); + + for mp_ptr in mp_ptrs { + cursor.set_position(u64::from(mp_ptr)); + branch_nodes.push(Self::parse_branch_node(cursor, bits)?) + } + + Ok(branch_nodes) + } + + fn parse_branch_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<BranchNode> { + // The order of the mn_lo and mn_hi fields is endian-dependent and would be + // reversed in an LMDB environment created on a big-endian system. + let mn_lo = cursor.read_u16::<LittleEndian>()?; + let mn_hi = cursor.read_u16::<LittleEndian>()?; + + let mn_flags = cursor.read_u16::<LittleEndian>()?; + + // Branch nodes overload the mn_lo, mn_hi, and mn_flags fields to store the page + // number, so we derive the number from those fields. + let mp_pgno = Self::branch_node_page_num(mn_lo, mn_hi, mn_flags, bits); + + let mn_ksize = cursor.read_u16::<LittleEndian>()?; + + let position = cursor.position(); + let start = usize::try_from(position)?; + let end = usize::try_from(position + u64::from(mn_ksize))?; + let mn_data = cursor.get_ref()[start..end].to_vec(); + cursor.set_position(end as u64); + + Ok(BranchNode { + mp_pgno, + mn_ksize, + mn_data, + }) + } + + fn branch_node_page_num(mn_lo: u16, mn_hi: u16, mn_flags: u16, bits: Bits) -> u64 { + let mut page_num = u64::from(u32::from(mn_lo) + (u32::from(mn_hi) << 16)); + if bits == Bits::U64 { + page_num += u64::from(mn_flags) << 32; + } + page_num + } + + fn parse_mp_ptrs(cursor: &mut Cursor<&[u8]>, num_keys: u64) -> MigrateResult<Vec<u16>> { + let mut mp_ptrs = Vec::with_capacity(num_keys as usize); + for _ in 0..num_keys { + mp_ptrs.push(cursor.read_u16::<LittleEndian>()?); + } + Ok(mp_ptrs) + } + + fn num_keys(pb_lower: u16, bits: Bits) -> u64 { + (u64::from(pb_lower) - page_header_size(bits)) >> 1 + } +} + +pub struct Migrator { + file: File, + bits: Bits, +} + +impl Migrator { + /// Create a new Migrator for the LMDB environment at the given path. This tries to + /// open the data.mdb file in the environment and determine the bit depth of the + /// executable that created it, so it can fail and return an Err if the file can't be + /// opened or the depth determined. + pub fn new(path: &Path) -> MigrateResult<Migrator> { + let mut path = PathBuf::from(path); + path.push("data.mdb"); + let mut file = File::open(&path)?; + + file.seek(SeekFrom::Start(page_header_size(Bits::U32)))?; + let mut buf = [0; 4]; + file.read_exact(&mut buf)?; + + let bits = if buf == MAGIC { + Bits::U32 + } else { + file.seek(SeekFrom::Start(page_header_size(Bits::U64)))?; + file.read_exact(&mut buf)?; + if buf == MAGIC { + Bits::U64 + } else { + return Err(MigrateError::IndeterminateBitDepth); + } + }; + + Ok(Migrator { file, bits }) + } + + /// Dump the data in one of the databases in the LMDB environment. If the `database` + /// paremeter is None, then we dump the data in the main database. If it's the name + /// of a subdatabase, then we dump the data in that subdatabase. + /// + /// Note that the output isn't identical to that of the `mdb_dump` utility, since + /// `mdb_dump` includes subdatabase key/value pairs when dumping the main database, + /// and those values are architecture-dependent, since they contain pointer-sized + /// data. + /// + /// If we wanted to support identical output, we could parameterize inclusion of + /// subdatabase pairs in get_pairs() and include them when dumping data, while + /// continuing to exclude them when migrating data. + pub fn dump<T: Write>(&mut self, database: Option<&str>, mut out: T) -> MigrateResult<()> { + let meta_data = self.get_meta_data()?; + let root_page_num = meta_data.mm_dbs.main.md_root; + let root_page = Rc::new(self.get_page(root_page_num)?); + + let pairs; + if let Some(database) = database { + let subdbs = self.get_subdbs(root_page)?; + let database = subdbs + .get(database.as_bytes()) + .ok_or_else(|| MigrateError::DatabaseNotFound(database.to_string()))?; + let root_page_num = database.md_root; + let root_page = Rc::new(self.get_page(root_page_num)?); + pairs = self.get_pairs(root_page)?; + } else { + pairs = self.get_pairs(root_page)?; + } + + out.write_all(b"VERSION=3\n")?; + out.write_all(b"format=bytevalue\n")?; + if let Some(database) = database { + writeln!(out, "database={database}")?; + } + out.write_all(b"type=btree\n")?; + writeln!(out, "mapsize={}", meta_data.mm_mapsize)?; + out.write_all(b"maxreaders=126\n")?; + out.write_all(b"db_pagesize=4096\n")?; + out.write_all(b"HEADER=END\n")?; + + for (key, value) in pairs { + out.write_all(b" ")?; + for byte in key { + write!(out, "{byte:02x}")?; + } + out.write_all(b"\n")?; + out.write_all(b" ")?; + for byte in value { + write!(out, "{byte:02x}")?; + } + out.write_all(b"\n")?; + } + + out.write_all(b"DATA=END\n")?; + + Ok(()) + } + + /// Migrate all data in all of databases in the existing LMDB environment to a new + /// environment. This includes all key/value pairs in the main database that aren't + /// metadata about subdatabases and all key/value pairs in all subdatabases. + /// + /// We also set the map size and maximum databases of the new environment to their + /// values for the existing environment. But we don't set other metadata, and we + /// don't check that the new environment is empty before migrating data. + /// + /// Thus it's possible for this to overwrite existing data or fail to migrate data if + /// the new environment isn't empty. It's the consumer's responsibility to ensure + /// that data can be safely migrated to the new environment. In general, this means + /// that environment should be empty. + pub fn migrate(&mut self, dest: &Path) -> MigrateResult<()> { + let meta_data = self.get_meta_data()?; + let root_page_num = meta_data.mm_dbs.main.md_root; + validate_page_num(root_page_num, self.bits)?; + let root_page = Rc::new(self.get_page(root_page_num)?); + let subdbs = self.get_subdbs(Rc::clone(&root_page))?; + + let env = Environment::new() + .set_map_size(meta_data.mm_mapsize as usize) + .set_max_dbs(subdbs.len() as u32) + .open(dest)?; + + // Create the databases before we open a read-write transaction, since database + // creation requires its own read-write transaction, which would hang while + // awaiting completion of an existing one. + env.create_db(None, meta_data.mm_dbs.main.md_flags)?; + for (subdb_name, subdb_info) in &subdbs { + env.create_db(Some(str::from_utf8(subdb_name)?), subdb_info.md_flags)?; + } + + // Now open the read-write transaction that we'll use to migrate all the data. + let mut txn = env.begin_rw_txn()?; + + // Migrate the main database. + let pairs = self.get_pairs(root_page)?; + let db = env.open_db(None)?; + for (key, value) in pairs { + // If we knew that the target database was empty, we could specify + // WriteFlags::APPEND to speed up the migration. + txn.put(db, &key, &value, WriteFlags::empty())?; + } + + // Migrate subdatabases. + for (subdb_name, subdb_info) in &subdbs { + let root_page = Rc::new(self.get_page(subdb_info.md_root)?); + let pairs = self.get_pairs(root_page)?; + let db = env.open_db(Some(str::from_utf8(subdb_name)?))?; + for (key, value) in pairs { + // If we knew that the target database was empty, we could specify + // WriteFlags::APPEND to speed up the migration. + txn.put(db, &key, &value, WriteFlags::empty())?; + } + } + + txn.commit()?; + + Ok(()) + } + + fn get_subdbs(&mut self, root_page: Rc<Page>) -> MigrateResult<HashMap<Vec<u8>, Database>> { + let mut subdbs = HashMap::new(); + let mut pages = vec![root_page]; + + while let Some(page) = pages.pop() { + match &*page { + Page::BRANCH(nodes) => { + for branch in nodes { + pages.push(Rc::new(self.get_page(branch.mp_pgno)?)); + } + } + Page::LEAF(nodes) => { + for leaf in nodes { + if let LeafNode::SubData { key, db, .. } = leaf { + subdbs.insert(key.to_vec(), db.clone()); + }; + } + } + _ => { + return Err(MigrateError::UnexpectedPageVariant); + } + } + } + + Ok(subdbs) + } + + fn get_pairs(&mut self, root_page: Rc<Page>) -> MigrateResult<BTreeMap<Vec<u8>, Vec<u8>>> { + let mut pairs = BTreeMap::new(); + let mut pages = vec![root_page]; + + while let Some(page) = pages.pop() { + match &*page { + Page::BRANCH(nodes) => { + for branch in nodes { + pages.push(Rc::new(self.get_page(branch.mp_pgno)?)); + } + } + Page::LEAF(nodes) => { + for leaf in nodes { + match leaf { + LeafNode::Regular { key, value, .. } => { + pairs.insert(key.to_vec(), value.to_vec()); + } + LeafNode::BigData { + mv_size, + key, + overflow_pgno, + .. + } => { + // Perhaps we could reduce memory consumption during a + // migration by waiting to read big data until it's time + // to write it to the new database. + let value = self.read_data( + *overflow_pgno * u64::from(PAGESIZE) + + page_header_size(self.bits), + *mv_size as usize, + )?; + pairs.insert(key.to_vec(), value); + } + LeafNode::SubData { .. } => { + // We don't include subdatabase leaves in pairs, since + // there's no architecture-neutral representation of them, + // and in any case they're meta-data that should get + // recreated when we migrate the subdatabases themselves. + // + // If we wanted to create identical dumps to those + // produced by `mdb_dump`, however, we could allow + // consumers to specify that they'd like to include these + // records. + } + }; + } + } + _ => { + return Err(MigrateError::UnexpectedPageVariant); + } + } + } + + Ok(pairs) + } + + fn read_data(&mut self, offset: u64, size: usize) -> MigrateResult<Vec<u8>> { + self.file.seek(SeekFrom::Start(offset))?; + let mut buf: Vec<u8> = vec![0; size]; + self.file.read_exact(&mut buf[0..size])?; + Ok(buf.to_vec()) + } + + fn get_page(&mut self, page_no: u64) -> MigrateResult<Page> { + Page::new( + self.read_data(page_no * u64::from(PAGESIZE), usize::from(PAGESIZE))?, + self.bits, + ) + } + + fn get_meta_data(&mut self) -> MigrateResult<MetaData> { + let (page0, page1) = (self.get_page(0)?, self.get_page(1)?); + + match (page0, page1) { + (Page::META(meta0), Page::META(meta1)) => { + let meta = if meta1.mm_txnid > meta0.mm_txnid { + meta1 + } else { + meta0 + }; + if meta.mm_magic != 0xBE_EF_C0_DE { + return Err(MigrateError::InvalidMagicNum); + } + if meta.mm_version != 1 && meta.mm_version != 999 { + return Err(MigrateError::InvalidDataVersion); + } + Ok(meta) + } + _ => Err(MigrateError::UnexpectedPageVariant), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{env, fs, mem::size_of}; + + use lmdb::{Environment, Error as LmdbError}; + use tempfile::{tempdir, tempfile}; + + fn compare_files(ref_file: &mut File, new_file: &mut File) -> MigrateResult<()> { + ref_file.seek(SeekFrom::Start(0))?; + new_file.seek(SeekFrom::Start(0))?; + + let ref_buf = &mut [0; 1024]; + let new_buf = &mut [0; 1024]; + + loop { + match ref_file.read(ref_buf) { + Err(err) => panic!("{}", err), + Ok(ref_len) => match new_file.read(new_buf) { + Err(err) => panic!("{}", err), + Ok(new_len) => { + assert_eq!(ref_len, new_len); + if ref_len == 0 { + break; + }; + assert_eq!(ref_buf[0..ref_len], new_buf[0..new_len]); + } + }, + } + } + + Ok(()) + } + + #[test] + fn test_dump_32() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(None, &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_32_subdb() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_64() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(None, &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_64_subdb() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_64() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(&test_env_path)?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_32() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(&test_env_path)?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_and_replace() -> MigrateResult<()> { + let test_env_name = match size_of::<usize>() { + 4 => "ref_env_64", + 8 => "ref_env_32", + _ => panic!("only 32- and 64-bit depths are supported"), + }; + + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", test_env_name].iter().collect(); + + let old_env = tempdir()?; + fs::copy( + test_env_path.join("data.mdb"), + old_env.path().join("data.mdb"), + )?; + fs::copy( + test_env_path.join("lock.mdb"), + old_env.path().join("lock.mdb"), + )?; + + // Confirm that it isn't possible to open the old environment with LMDB. + assert_eq!( + match Environment::new().open(old_env.path()) { + Err(err) => err, + _ => panic!("opening the environment should have failed"), + }, + LmdbError::Invalid + ); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(old_env.path())?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + // Overwrite the old env's files with the new env's files and confirm that it's now + // possible to open the old env with LMDB. + fs::copy( + new_env.path().join("data.mdb"), + old_env.path().join("data.mdb"), + )?; + fs::copy( + new_env.path().join("lock.mdb"), + old_env.path().join("lock.mdb"), + )?; + assert!(Environment::new().open(old_env.path()).is_ok()); + + Ok(()) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs new file mode 100644 index 0000000000..e23bb49770 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs @@ -0,0 +1,79 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{io, num, str}; + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum MigrateError { + #[error("database not found: {0:?}")] + DatabaseNotFound(String), + + #[error("{0}")] + FromString(String), + + #[error("couldn't determine bit depth")] + IndeterminateBitDepth, + + #[error("I/O error: {0:?}")] + IoError(#[from] io::Error), + + #[error("invalid DatabaseFlags bits")] + InvalidDatabaseBits, + + #[error("invalid data version")] + InvalidDataVersion, + + #[error("invalid magic number")] + InvalidMagicNum, + + #[error("invalid NodeFlags bits")] + InvalidNodeBits, + + #[error("invalid PageFlags bits")] + InvalidPageBits, + + #[error("invalid page number")] + InvalidPageNum, + + #[error("lmdb backend error: {0}")] + LmdbError(#[from] lmdb::Error), + + #[error("string conversion error")] + StringConversionError, + + #[error("TryFromInt error: {0:?}")] + TryFromIntError(#[from] num::TryFromIntError), + + #[error("unexpected Page variant")] + UnexpectedPageVariant, + + #[error("unexpected PageHeader variant")] + UnexpectedPageHeaderVariant, + + #[error("unsupported PageHeader variant")] + UnsupportedPageHeaderVariant, + + #[error("UTF8 error: {0:?}")] + Utf8Error(#[from] str::Utf8Error), +} + +impl From<&str> for MigrateError { + fn from(e: &str) -> MigrateError { + MigrateError::FromString(e.to_string()) + } +} + +impl From<String> for MigrateError { + fn from(e: String) -> MigrateError { + MigrateError::FromString(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs b/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs new file mode 100644 index 0000000000..760abce451 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs @@ -0,0 +1,69 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use lmdb::Cursor; + +use super::IterImpl; +use crate::backend::traits::BackendRoCursor; + +#[derive(Debug)] +pub struct RoCursorImpl<'c>(pub(crate) lmdb::RoCursor<'c>); + +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c, lmdb::RoCursor<'c>>; + + fn into_iter(self) -> Self::Iter { + // We call RoCursor.iter() instead of RoCursor.iter_start() because + // the latter panics when there are no items in the store, whereas the + // former returns an iterator that yields no items. And since we create + // the Cursor and don't change its position, we can be sure that a call + // to Cursor.iter() will start at the beginning. + IterImpl::new(self.0, lmdb::RoCursor::iter) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_from(key)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_dup_of(key)) + } +} + +#[derive(Debug)] +pub struct RwCursorImpl<'c>(pub(crate) lmdb::RwCursor<'c>); + +impl<'c> BackendRoCursor<'c> for RwCursorImpl<'c> { + type Iter = IterImpl<'c, lmdb::RwCursor<'c>>; + + fn into_iter(self) -> Self::Iter { + IterImpl::new(self.0, lmdb::RwCursor::iter) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_from(key)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_dup_of(key)) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/database.rs b/third_party/rust/rkv/src/backend/impl_lmdb/database.rs new file mode 100644 index 0000000000..8edee5c2c3 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/database.rs @@ -0,0 +1,16 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendDatabase; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct DatabaseImpl(pub(crate) lmdb::Database); + +impl BackendDatabase for DatabaseImpl {} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs b/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs new file mode 100644 index 0000000000..e528fad057 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs @@ -0,0 +1,299 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + fs, + path::{Path, PathBuf}, +}; + +use lmdb::Error as LmdbError; + +use super::{ + DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl, + RwTransactionImpl, StatImpl, +}; +use crate::backend::traits::{ + BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor, + BackendRoCursorTransaction, BackendStat, +}; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct EnvironmentBuilderImpl { + builder: lmdb::EnvironmentBuilder, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + make_dir_if_needed: bool, +} + +impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { + type Environment = EnvironmentImpl; + type Error = ErrorImpl; + type Flags = EnvironmentFlagsImpl; + + fn new() -> EnvironmentBuilderImpl { + EnvironmentBuilderImpl { + builder: lmdb::Environment::new(), + env_path_type: EnvironmentPathType::SubDir, + env_lock_type: EnvironmentLockType::Lockfile, + env_db_type: EnvironmentDefaultDbType::SingleDatabase, + make_dir_if_needed: false, + } + } + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>, + { + let flags = flags.into(); + if flags.0 == lmdb::EnvironmentFlags::NO_SUB_DIR { + self.env_path_type = EnvironmentPathType::NoSubDir; + } + if flags.0 == lmdb::EnvironmentFlags::NO_LOCK { + self.env_lock_type = EnvironmentLockType::NoLockfile; + } + self.builder.set_flags(flags.0); + self + } + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self { + self.builder.set_max_readers(max_readers); + self + } + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self { + if max_dbs > 0 { + self.env_db_type = EnvironmentDefaultDbType::MultipleNamedDatabases + } + self.builder.set_max_dbs(max_dbs); + self + } + + fn set_map_size(&mut self, size: usize) -> &mut Self { + self.builder.set_map_size(size); + self + } + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self { + self.make_dir_if_needed = make_dir_if_needed; + self + } + + fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self { + // Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have + // been corrupted. Prefer using the `SafeMode` backend if this is important. + unimplemented!(); + } + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> { + match self.env_path_type { + EnvironmentPathType::NoSubDir => { + if !path.is_file() { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + } + EnvironmentPathType::SubDir => { + if !path.is_dir() { + if !self.make_dir_if_needed { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + fs::create_dir_all(path)?; + } + } + } + + self.builder + .open(path) + .map_err(ErrorImpl::LmdbError) + .and_then(|lmdbenv| { + EnvironmentImpl::new( + path, + self.env_path_type, + self.env_lock_type, + self.env_db_type, + lmdbenv, + ) + }) + } +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentPathType { + SubDir, + NoSubDir, +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentLockType { + Lockfile, + NoLockfile, +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentDefaultDbType { + SingleDatabase, + MultipleNamedDatabases, +} + +#[derive(Debug)] +pub struct EnvironmentImpl { + path: PathBuf, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + lmdbenv: lmdb::Environment, +} + +impl EnvironmentImpl { + pub(crate) fn new( + path: &Path, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + lmdbenv: lmdb::Environment, + ) -> Result<EnvironmentImpl, ErrorImpl> { + Ok(EnvironmentImpl { + path: path.to_path_buf(), + env_path_type, + env_lock_type, + env_db_type, + lmdbenv, + }) + } +} + +impl<'e> BackendEnvironment<'e> for EnvironmentImpl { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = DatabaseFlagsImpl; + type Info = InfoImpl; + type RoTransaction = RoTransactionImpl<'e>; + type RwTransaction = RwTransactionImpl<'e>; + type Stat = StatImpl; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> { + if self.env_db_type == EnvironmentDefaultDbType::SingleDatabase { + return Ok(vec![None]); + } + let db = self + .lmdbenv + .open_db(None) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError)?; + let reader = self.begin_ro_txn()?; + let cursor = reader.open_ro_cursor(&db)?; + let mut iter = cursor.into_iter(); + let mut store = vec![]; + while let Some(result) = iter.next() { + let (key, _) = result?; + let name = String::from_utf8(key.to_owned()) + .map_err(|_| ErrorImpl::LmdbError(lmdb::Error::Corrupted))?; + store.push(Some(name)); + } + Ok(store) + } + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> { + self.lmdbenv + .open_db(name) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error> { + self.lmdbenv + .create_db(name, flags.0) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> { + self.lmdbenv + .begin_ro_txn() + .map(RoTransactionImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> { + self.lmdbenv + .begin_rw_txn() + .map(RwTransactionImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn sync(&self, force: bool) -> Result<(), Self::Error> { + self.lmdbenv.sync(force).map_err(ErrorImpl::LmdbError) + } + + fn stat(&self) -> Result<Self::Stat, Self::Error> { + self.lmdbenv + .stat() + .map(StatImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn info(&self) -> Result<Self::Info, Self::Error> { + self.lmdbenv + .info() + .map(InfoImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn freelist(&self) -> Result<usize, Self::Error> { + self.lmdbenv.freelist().map_err(ErrorImpl::LmdbError) + } + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error> { + let stat = self.stat()?; + let info = self.info()?; + let freelist = self.freelist()?; + + let last_pgno = info.last_pgno() + 1; // pgno is 0 based. + let total_pgs = info.map_size() / stat.page_size(); + if freelist > last_pgno { + return Err(ErrorImpl::LmdbError(LmdbError::Corrupted)); + } + let used_pgs = last_pgno - freelist; + Ok(Some(used_pgs as f32 / total_pgs as f32)) + } + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error> { + self.lmdbenv + .set_map_size(size) + .map_err(ErrorImpl::LmdbError) + } + + fn get_files_on_disk(&self) -> Vec<PathBuf> { + let mut store = vec![]; + + if self.env_path_type == EnvironmentPathType::NoSubDir { + // The option NO_SUB_DIR could change the default directory layout; therefore this should + // probably return the path used to create environment, along with the custom lockfile + // when available. + unimplemented!(); + } + + let mut db_filename = self.path.clone(); + db_filename.push("data.mdb"); + store.push(db_filename); + + if self.env_lock_type == EnvironmentLockType::Lockfile { + let mut lock_filename = self.path.clone(); + lock_filename.push("lock.mdb"); + store.push(lock_filename); + } + + store + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/error.rs b/third_party/rust/rkv/src/backend/impl_lmdb/error.rs new file mode 100644 index 0000000000..a2fd8a7859 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/error.rs @@ -0,0 +1,57 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{fmt, io, path::PathBuf}; + +use crate::{backend::traits::BackendError, error::StoreError}; + +#[derive(Debug)] +pub enum ErrorImpl { + LmdbError(lmdb::Error), + UnsuitableEnvironmentPath(PathBuf), + IoError(io::Error), +} + +impl BackendError for ErrorImpl {} + +impl fmt::Display for ErrorImpl { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + ErrorImpl::LmdbError(e) => e.fmt(fmt), + ErrorImpl::UnsuitableEnvironmentPath(_) => write!(fmt, "UnsuitableEnvironmentPath"), + ErrorImpl::IoError(e) => e.fmt(fmt), + } + } +} + +impl Into<StoreError> for ErrorImpl { + fn into(self) -> StoreError { + match self { + ErrorImpl::LmdbError(lmdb::Error::Corrupted) => StoreError::DatabaseCorrupted, + ErrorImpl::LmdbError(lmdb::Error::NotFound) => StoreError::KeyValuePairNotFound, + ErrorImpl::LmdbError(lmdb::Error::BadValSize) => StoreError::KeyValuePairBadSize, + ErrorImpl::LmdbError(lmdb::Error::Invalid) => StoreError::FileInvalid, + ErrorImpl::LmdbError(lmdb::Error::MapFull) => StoreError::MapFull, + ErrorImpl::LmdbError(lmdb::Error::DbsFull) => StoreError::DbsFull, + ErrorImpl::LmdbError(lmdb::Error::ReadersFull) => StoreError::ReadersFull, + ErrorImpl::LmdbError(error) => StoreError::LmdbError(error), + ErrorImpl::UnsuitableEnvironmentPath(path) => { + StoreError::UnsuitableEnvironmentPath(path) + } + ErrorImpl::IoError(error) => StoreError::IoError(error), + } + } +} + +impl From<io::Error> for ErrorImpl { + fn from(e: io::Error) -> ErrorImpl { + ErrorImpl::IoError(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs b/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs new file mode 100644 index 0000000000..7a16bb7c46 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs @@ -0,0 +1,123 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::{ + common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + traits::{BackendDatabaseFlags, BackendEnvironmentFlags, BackendFlags, BackendWriteFlags}, +}; + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct EnvironmentFlagsImpl(pub(crate) lmdb::EnvironmentFlags); + +impl BackendFlags for EnvironmentFlagsImpl { + fn empty() -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl(lmdb::EnvironmentFlags::empty()) + } +} + +impl BackendEnvironmentFlags for EnvironmentFlagsImpl { + fn set(&mut self, flag: EnvironmentFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<EnvironmentFlagsImpl> for EnvironmentFlags { + fn into(self) -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl(self.into()) + } +} + +impl Into<lmdb::EnvironmentFlags> for EnvironmentFlags { + fn into(self) -> lmdb::EnvironmentFlags { + match self { + EnvironmentFlags::FIXED_MAP => lmdb::EnvironmentFlags::FIXED_MAP, + EnvironmentFlags::NO_SUB_DIR => lmdb::EnvironmentFlags::NO_SUB_DIR, + EnvironmentFlags::WRITE_MAP => lmdb::EnvironmentFlags::WRITE_MAP, + EnvironmentFlags::READ_ONLY => lmdb::EnvironmentFlags::READ_ONLY, + EnvironmentFlags::NO_META_SYNC => lmdb::EnvironmentFlags::NO_META_SYNC, + EnvironmentFlags::NO_SYNC => lmdb::EnvironmentFlags::NO_SYNC, + EnvironmentFlags::MAP_ASYNC => lmdb::EnvironmentFlags::MAP_ASYNC, + EnvironmentFlags::NO_TLS => lmdb::EnvironmentFlags::NO_TLS, + EnvironmentFlags::NO_LOCK => lmdb::EnvironmentFlags::NO_LOCK, + EnvironmentFlags::NO_READAHEAD => lmdb::EnvironmentFlags::NO_READAHEAD, + EnvironmentFlags::NO_MEM_INIT => lmdb::EnvironmentFlags::NO_MEM_INIT, + } + } +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct DatabaseFlagsImpl(pub(crate) lmdb::DatabaseFlags); + +impl BackendFlags for DatabaseFlagsImpl { + fn empty() -> DatabaseFlagsImpl { + DatabaseFlagsImpl(lmdb::DatabaseFlags::empty()) + } +} + +impl BackendDatabaseFlags for DatabaseFlagsImpl { + fn set(&mut self, flag: DatabaseFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<DatabaseFlagsImpl> for DatabaseFlags { + fn into(self) -> DatabaseFlagsImpl { + DatabaseFlagsImpl(self.into()) + } +} + +impl Into<lmdb::DatabaseFlags> for DatabaseFlags { + fn into(self) -> lmdb::DatabaseFlags { + match self { + DatabaseFlags::REVERSE_KEY => lmdb::DatabaseFlags::REVERSE_KEY, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_SORT => lmdb::DatabaseFlags::DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_FIXED => lmdb::DatabaseFlags::DUP_FIXED, + #[cfg(feature = "db-int-key")] + DatabaseFlags::INTEGER_KEY => lmdb::DatabaseFlags::INTEGER_KEY, + DatabaseFlags::INTEGER_DUP => lmdb::DatabaseFlags::INTEGER_DUP, + DatabaseFlags::REVERSE_DUP => lmdb::DatabaseFlags::REVERSE_DUP, + } + } +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct WriteFlagsImpl(pub(crate) lmdb::WriteFlags); + +impl BackendFlags for WriteFlagsImpl { + fn empty() -> WriteFlagsImpl { + WriteFlagsImpl(lmdb::WriteFlags::empty()) + } +} + +impl BackendWriteFlags for WriteFlagsImpl { + fn set(&mut self, flag: WriteFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<WriteFlagsImpl> for WriteFlags { + fn into(self) -> WriteFlagsImpl { + WriteFlagsImpl(self.into()) + } +} + +impl Into<lmdb::WriteFlags> for WriteFlags { + fn into(self) -> lmdb::WriteFlags { + match self { + WriteFlags::NO_OVERWRITE => lmdb::WriteFlags::NO_OVERWRITE, + WriteFlags::NO_DUP_DATA => lmdb::WriteFlags::NO_DUP_DATA, + WriteFlags::CURRENT => lmdb::WriteFlags::CURRENT, + WriteFlags::APPEND => lmdb::WriteFlags::APPEND, + WriteFlags::APPEND_DUP => lmdb::WriteFlags::APPEND_DUP, + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/info.rs b/third_party/rust/rkv/src/backend/impl_lmdb/info.rs new file mode 100644 index 0000000000..6188065c07 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/info.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendInfo; + +pub struct InfoImpl(pub(crate) lmdb::Info); + +impl BackendInfo for InfoImpl { + fn map_size(&self) -> usize { + self.0.map_size() + } + + fn last_pgno(&self) -> usize { + self.0.last_pgno() + } + + fn last_txnid(&self) -> usize { + self.0.last_txnid() + } + + fn max_readers(&self) -> usize { + self.0.max_readers() as usize + } + + fn num_readers(&self) -> usize { + self.0.num_readers() as usize + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs b/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs new file mode 100644 index 0000000000..519d361d4f --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs @@ -0,0 +1,41 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::ErrorImpl; +use crate::backend::traits::BackendIter; + +pub struct IterImpl<'i, C> { + // LMDB semantics dictate that a cursor must be valid for the entire lifetime + // of an iterator. In other words, cursors must not be dropped while an + // iterator built from it is alive. Unfortunately, the LMDB crate API does + // not express this through the type system, so we must enforce it somehow. + #[allow(dead_code)] + cursor: C, + iter: lmdb::Iter<'i>, +} + +impl<'i, C> IterImpl<'i, C> { + pub(crate) fn new( + mut cursor: C, + to_iter: impl FnOnce(&mut C) -> lmdb::Iter<'i>, + ) -> IterImpl<'i, C> { + let iter = to_iter(&mut cursor); + IterImpl { cursor, iter } + } +} + +impl<'i, C> BackendIter<'i> for IterImpl<'i, C> { + type Error = ErrorImpl; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>> { + self.iter.next().map(|e| e.map_err(ErrorImpl::LmdbError)) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs b/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs new file mode 100644 index 0000000000..b0de8c5051 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs @@ -0,0 +1,39 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendStat; + +pub struct StatImpl(pub(crate) lmdb::Stat); + +impl BackendStat for StatImpl { + fn page_size(&self) -> usize { + self.0.page_size() as usize + } + + fn depth(&self) -> usize { + self.0.depth() as usize + } + + fn branch_pages(&self) -> usize { + self.0.branch_pages() + } + + fn leaf_pages(&self) -> usize { + self.0.leaf_pages() + } + + fn overflow_pages(&self) -> usize { + self.0.overflow_pages() + } + + fn entries(&self) -> usize { + self.0.entries() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs b/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs new file mode 100644 index 0000000000..21752763fd --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs @@ -0,0 +1,107 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use lmdb::Transaction; + +use super::{DatabaseImpl, ErrorImpl, RoCursorImpl, WriteFlagsImpl}; +use crate::backend::traits::{ + BackendRoCursorTransaction, BackendRoTransaction, BackendRwCursorTransaction, + BackendRwTransaction, +}; + +#[derive(Debug)] +pub struct RoTransactionImpl<'t>(pub(crate) lmdb::RoTransaction<'t>); + +impl<'t> BackendRoTransaction for RoTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + self.0.get(db.0, &key).map_err(ErrorImpl::LmdbError) + } + + fn abort(self) { + self.0.abort() + } +} + +impl<'t> BackendRoCursorTransaction<'t> for RoTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + self.0 + .open_ro_cursor(db.0) + .map(RoCursorImpl) + .map_err(ErrorImpl::LmdbError) + } +} + +#[derive(Debug)] +pub struct RwTransactionImpl<'t>(pub(crate) lmdb::RwTransaction<'t>); + +impl<'t> BackendRwTransaction for RwTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = WriteFlagsImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + self.0.get(db.0, &key).map_err(ErrorImpl::LmdbError) + } + + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + flags: Self::Flags, + ) -> Result<(), Self::Error> { + self.0 + .put(db.0, &key, &value, flags.0) + .map_err(ErrorImpl::LmdbError) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error> { + self.0.del(db.0, &key, None).map_err(ErrorImpl::LmdbError) + } + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error> { + self.0.del(db.0, &key, value).map_err(ErrorImpl::LmdbError) + } + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error> { + self.0.clear_db(db.0).map_err(ErrorImpl::LmdbError) + } + + fn commit(self) -> Result<(), Self::Error> { + self.0.commit().map_err(ErrorImpl::LmdbError) + } + + fn abort(self) { + self.0.abort() + } +} + +impl<'t> BackendRwCursorTransaction<'t> for RwTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + self.0 + .open_ro_cursor(db.0) + .map(RoCursorImpl) + .map_err(ErrorImpl::LmdbError) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe.rs b/third_party/rust/rkv/src/backend/impl_safe.rs new file mode 100644 index 0000000000..66ca012973 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe.rs @@ -0,0 +1,30 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod cursor; +mod database; +mod environment; +mod error; +mod flags; +mod info; +mod iter; +mod snapshot; +mod stat; +mod transaction; + +pub use cursor::{RoCursorImpl, RwCursorImpl}; +pub use database::DatabaseImpl; +pub use environment::{EnvironmentBuilderImpl, EnvironmentImpl}; +pub use error::ErrorImpl; +pub use flags::{DatabaseFlagsImpl, EnvironmentFlagsImpl, WriteFlagsImpl}; +pub use info::InfoImpl; +pub use iter::IterImpl; +pub use stat::StatImpl; +pub use transaction::{RoTransactionImpl, RwTransactionImpl}; diff --git a/third_party/rust/rkv/src/backend/impl_safe/cursor.rs b/third_party/rust/rkv/src/backend/impl_safe/cursor.rs new file mode 100644 index 0000000000..c3bfefad94 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/cursor.rs @@ -0,0 +1,98 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::{snapshot::Snapshot, IterImpl}; +use crate::backend::traits::BackendRoCursor; + +#[derive(Debug)] +pub struct RoCursorImpl<'c>(pub(crate) &'c Snapshot); + +#[cfg(not(feature = "db-dup-sort"))] +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + IterImpl(Box::new(self.0.iter())) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl(Box::new( + self.0.iter().skip_while(move |&(k, _)| k < key.as_ref()), + )) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl(Box::new( + self.0.iter().filter(move |&(k, _)| k == key.as_ref()), + )) + } +} + +#[cfg(feature = "db-dup-sort")] +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + let flattened = self + .0 + .iter() + .flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + let skipped = self.0.iter().skip_while(move |&(k, _)| k < key.as_ref()); + let flattened = skipped.flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + let filtered = self.0.iter().filter(move |&(k, _)| k == key.as_ref()); + let flattened = filtered.flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } +} + +#[derive(Debug)] +pub struct RwCursorImpl<'c>(&'c mut Snapshot); + +impl<'c> BackendRoCursor<'c> for RwCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + unimplemented!() + } + + fn into_iter_from<K>(self, _key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + unimplemented!() + } + + fn into_iter_dup_of<K>(self, _key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/database.rs b/third_party/rust/rkv/src/backend/impl_safe/database.rs new file mode 100644 index 0000000000..f8eb4aef76 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/database.rs @@ -0,0 +1,41 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use id_arena::Id; +use serde_derive::{Deserialize, Serialize}; + +use super::{snapshot::Snapshot, DatabaseFlagsImpl}; +use crate::backend::traits::BackendDatabase; + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)] +pub struct DatabaseImpl(pub(crate) Id<Database>); + +impl BackendDatabase for DatabaseImpl {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Database { + snapshot: Snapshot, +} + +impl Database { + pub(crate) fn new(flags: Option<DatabaseFlagsImpl>, snapshot: Option<Snapshot>) -> Database { + Database { + snapshot: snapshot.unwrap_or_else(|| Snapshot::new(flags)), + } + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.snapshot.clone() + } + + pub(crate) fn replace(&mut self, snapshot: Snapshot) -> Snapshot { + std::mem::replace(&mut self.snapshot, snapshot) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/environment.rs b/third_party/rust/rkv/src/backend/impl_safe/environment.rs new file mode 100644 index 0000000000..e9fde404ec --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/environment.rs @@ -0,0 +1,326 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + borrow::Cow, + collections::HashMap, + fs, + ops::DerefMut, + path::{Path, PathBuf}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; + +use id_arena::Arena; +use log::warn; + +use super::{ + database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, + RoTransactionImpl, RwTransactionImpl, StatImpl, +}; +use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder}; + +const DEFAULT_DB_FILENAME: &str = "data.safe.bin"; + +type DatabaseArena = Arena<Database>; +type DatabaseNameMap = HashMap<Option<String>, DatabaseImpl>; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct EnvironmentBuilderImpl { + flags: EnvironmentFlagsImpl, + max_readers: Option<usize>, + max_dbs: Option<usize>, + map_size: Option<usize>, + make_dir_if_needed: bool, + discard_if_corrupted: bool, +} + +impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { + type Environment = EnvironmentImpl; + type Error = ErrorImpl; + type Flags = EnvironmentFlagsImpl; + + fn new() -> EnvironmentBuilderImpl { + EnvironmentBuilderImpl { + flags: EnvironmentFlagsImpl::empty(), + max_readers: None, + max_dbs: None, + map_size: None, + make_dir_if_needed: false, + discard_if_corrupted: false, + } + } + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>, + { + self.flags = flags.into(); + self + } + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self { + self.max_readers = Some(max_readers as usize); + self + } + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self { + self.max_dbs = Some(max_dbs as usize); + self + } + + fn set_map_size(&mut self, map_size: usize) -> &mut Self { + self.map_size = Some(map_size); + self + } + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self { + self.make_dir_if_needed = make_dir_if_needed; + self + } + + fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self { + self.discard_if_corrupted = discard_if_corrupted; + self + } + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> { + // Technically NO_SUB_DIR should change these checks here, but they're both currently + // unimplemented with this storage backend. + if !path.is_dir() { + if !self.make_dir_if_needed { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + fs::create_dir_all(path)?; + } + let mut env = EnvironmentImpl::new( + path, + self.flags, + self.max_readers, + self.max_dbs, + self.map_size, + )?; + env.read_from_disk(self.discard_if_corrupted)?; + Ok(env) + } +} + +#[derive(Debug)] +pub(crate) struct EnvironmentDbs { + pub(crate) arena: DatabaseArena, + pub(crate) name_map: DatabaseNameMap, +} + +#[derive(Debug)] +pub(crate) struct EnvironmentDbsRefMut<'a> { + pub(crate) arena: &'a mut DatabaseArena, + pub(crate) name_map: &'a mut DatabaseNameMap, +} + +impl<'a> From<&'a mut EnvironmentDbs> for EnvironmentDbsRefMut<'a> { + fn from(dbs: &mut EnvironmentDbs) -> EnvironmentDbsRefMut { + EnvironmentDbsRefMut { + arena: &mut dbs.arena, + name_map: &mut dbs.name_map, + } + } +} + +#[derive(Debug)] +pub struct EnvironmentImpl { + path: PathBuf, + max_dbs: usize, + dbs: RwLock<EnvironmentDbs>, + ro_txns: Arc<()>, + rw_txns: Arc<()>, +} + +impl EnvironmentImpl { + fn serialize(&self) -> Result<Vec<u8>, ErrorImpl> { + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + let data: HashMap<_, _> = dbs + .name_map + .iter() + .map(|(name, id)| (name, &dbs.arena[id.0])) + .collect(); + Ok(bincode::serialize(&data)?) + } + + fn deserialize( + bytes: &[u8], + discard_if_corrupted: bool, + ) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> { + let mut arena = DatabaseArena::new(); + let mut name_map = HashMap::new(); + let data: HashMap<_, _> = match bincode::deserialize(bytes) { + Err(_) if discard_if_corrupted => Ok(HashMap::new()), + result => result, + }?; + for (name, db) in data { + name_map.insert(name, DatabaseImpl(arena.alloc(db))); + } + Ok((arena, name_map)) + } +} + +impl EnvironmentImpl { + pub(crate) fn new( + path: &Path, + flags: EnvironmentFlagsImpl, + max_readers: Option<usize>, + max_dbs: Option<usize>, + map_size: Option<usize>, + ) -> Result<EnvironmentImpl, ErrorImpl> { + if !flags.is_empty() { + warn!("Ignoring `flags={:?}`", flags); + } + if let Some(max_readers) = max_readers { + warn!("Ignoring `max_readers={}`", max_readers); + } + if let Some(map_size) = map_size { + warn!("Ignoring `map_size={}`", map_size); + } + + Ok(EnvironmentImpl { + path: path.to_path_buf(), + max_dbs: max_dbs.unwrap_or(std::usize::MAX), + dbs: RwLock::new(EnvironmentDbs { + arena: DatabaseArena::new(), + name_map: HashMap::new(), + }), + ro_txns: Arc::new(()), + rw_txns: Arc::new(()), + }) + } + + pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> { + let mut path = Cow::from(&self.path); + if fs::metadata(&path)?.is_dir() { + path.to_mut().push(DEFAULT_DB_FILENAME); + }; + if fs::metadata(&path).is_err() { + return Ok(()); + }; + let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?; + self.dbs = RwLock::new(EnvironmentDbs { arena, name_map }); + Ok(()) + } + + pub(crate) fn write_to_disk(&self) -> Result<(), ErrorImpl> { + let mut path = Cow::from(&self.path); + if fs::metadata(&path)?.is_dir() { + path.to_mut().push(DEFAULT_DB_FILENAME); + }; + fs::write(&path, self.serialize()?)?; + Ok(()) + } + + pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<EnvironmentDbs>, ErrorImpl> { + self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError) + } + + pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<EnvironmentDbs>, ErrorImpl> { + self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError) + } +} + +impl<'e> BackendEnvironment<'e> for EnvironmentImpl { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = DatabaseFlagsImpl; + type Info = InfoImpl; + type RoTransaction = RoTransactionImpl<'e>; + type RwTransaction = RwTransactionImpl<'e>; + type Stat = StatImpl; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> { + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + Ok(dbs.name_map.keys().map(|key| key.to_owned()).collect()) + } + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> { + if Arc::strong_count(&self.ro_txns) > 1 { + return Err(ErrorImpl::DbsIllegalOpen); + } + // TOOD: don't reallocate `name`. + let key = name.map(String::from); + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + let db = dbs.name_map.get(&key).ok_or(ErrorImpl::DbNotFoundError)?; + Ok(*db) + } + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error> { + if Arc::strong_count(&self.ro_txns) > 1 { + return Err(ErrorImpl::DbsIllegalOpen); + } + // TOOD: don't reallocate `name`. + let key = name.map(String::from); + let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?; + if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() { + return Err(ErrorImpl::DbsFull); + } + let parts = EnvironmentDbsRefMut::from(dbs.deref_mut()); + let arena = parts.arena; + let name_map = parts.name_map; + let id = name_map + .entry(key) + .or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None)))); + Ok(*id) + } + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> { + RoTransactionImpl::new(self, self.ro_txns.clone()) + } + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> { + RwTransactionImpl::new(self, self.rw_txns.clone()) + } + + fn sync(&self, force: bool) -> Result<(), Self::Error> { + warn!("Ignoring `force={}`", force); + self.write_to_disk() + } + + fn stat(&self) -> Result<Self::Stat, Self::Error> { + Ok(StatImpl) + } + + fn info(&self) -> Result<Self::Info, Self::Error> { + Ok(InfoImpl) + } + + fn freelist(&self) -> Result<usize, Self::Error> { + unimplemented!() + } + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error> { + warn!("`load_ratio()` is irrelevant for this storage backend."); + Ok(None) + } + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error> { + warn!( + "`set_map_size({})` is ignored by this storage backend.", + size + ); + Ok(()) + } + + fn get_files_on_disk(&self) -> Vec<PathBuf> { + // Technically NO_SUB_DIR and NO_LOCK should change this output, but + // they're both currently unimplemented with this storage backend. + let mut db_filename = self.path.clone(); + db_filename.push(DEFAULT_DB_FILENAME); + vec![db_filename] + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/error.rs b/third_party/rust/rkv/src/backend/impl_safe/error.rs new file mode 100644 index 0000000000..f086cdc954 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/error.rs @@ -0,0 +1,79 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{fmt, io, path::PathBuf}; + +use bincode::Error as BincodeError; + +use crate::{backend::traits::BackendError, error::StoreError}; + +#[derive(Debug)] +pub enum ErrorImpl { + KeyValuePairNotFound, + EnvPoisonError, + DbsFull, + DbsIllegalOpen, + DbNotFoundError, + DbIsForeignError, + UnsuitableEnvironmentPath(PathBuf), + IoError(io::Error), + BincodeError(BincodeError), +} + +impl BackendError for ErrorImpl {} + +impl fmt::Display for ErrorImpl { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + ErrorImpl::KeyValuePairNotFound => write!(fmt, "KeyValuePairNotFound (safe mode)"), + ErrorImpl::EnvPoisonError => write!(fmt, "EnvPoisonError (safe mode)"), + ErrorImpl::DbsFull => write!(fmt, "DbsFull (safe mode)"), + ErrorImpl::DbsIllegalOpen => write!(fmt, "DbIllegalOpen (safe mode)"), + ErrorImpl::DbNotFoundError => write!(fmt, "DbNotFoundError (safe mode)"), + ErrorImpl::DbIsForeignError => write!(fmt, "DbIsForeignError (safe mode)"), + ErrorImpl::UnsuitableEnvironmentPath(_) => { + write!(fmt, "UnsuitableEnvironmentPath (safe mode)") + } + ErrorImpl::IoError(e) => e.fmt(fmt), + ErrorImpl::BincodeError(e) => e.fmt(fmt), + } + } +} + +impl Into<StoreError> for ErrorImpl { + fn into(self) -> StoreError { + // The `StoreError::KeyValuePairBadSize` error is unused, because this + // backend supports keys and values of arbitrary sizes. + // The `StoreError::MapFull` and `StoreError::ReadersFull` are + // unimplemented yet, but they should be in the future. + match self { + ErrorImpl::KeyValuePairNotFound => StoreError::KeyValuePairNotFound, + ErrorImpl::BincodeError(_) => StoreError::FileInvalid, + ErrorImpl::DbsFull => StoreError::DbsFull, + ErrorImpl::UnsuitableEnvironmentPath(path) => { + StoreError::UnsuitableEnvironmentPath(path) + } + ErrorImpl::IoError(error) => StoreError::IoError(error), + _ => StoreError::SafeModeError(self), + } + } +} + +impl From<io::Error> for ErrorImpl { + fn from(e: io::Error) -> ErrorImpl { + ErrorImpl::IoError(e) + } +} + +impl From<BincodeError> for ErrorImpl { + fn from(e: BincodeError) -> ErrorImpl { + ErrorImpl::BincodeError(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/flags.rs b/third_party/rust/rkv/src/backend/impl_safe/flags.rs new file mode 100644 index 0000000000..a0d623614e --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/flags.rs @@ -0,0 +1,124 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use bitflags::bitflags; +use serde_derive::{Deserialize, Serialize}; + +use crate::backend::{ + common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + traits::{BackendDatabaseFlags, BackendEnvironmentFlags, BackendFlags, BackendWriteFlags}, +}; + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct EnvironmentFlagsImpl: u32 { + const NIL = 0b0000_0000; + } +} + +impl BackendFlags for EnvironmentFlagsImpl { + fn empty() -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl::empty() + } +} + +impl BackendEnvironmentFlags for EnvironmentFlagsImpl { + fn set(&mut self, flag: EnvironmentFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<EnvironmentFlagsImpl> for EnvironmentFlags { + fn into(self) -> EnvironmentFlagsImpl { + match self { + EnvironmentFlags::FIXED_MAP => unimplemented!(), + EnvironmentFlags::NO_SUB_DIR => unimplemented!(), + EnvironmentFlags::WRITE_MAP => unimplemented!(), + EnvironmentFlags::READ_ONLY => unimplemented!(), + EnvironmentFlags::NO_META_SYNC => unimplemented!(), + EnvironmentFlags::NO_SYNC => unimplemented!(), + EnvironmentFlags::MAP_ASYNC => unimplemented!(), + EnvironmentFlags::NO_TLS => unimplemented!(), + EnvironmentFlags::NO_LOCK => unimplemented!(), + EnvironmentFlags::NO_READAHEAD => unimplemented!(), + EnvironmentFlags::NO_MEM_INIT => unimplemented!(), + } + } +} + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct DatabaseFlagsImpl: u32 { + const NIL = 0b0000_0000; + #[cfg(feature = "db-dup-sort")] + const DUP_SORT = 0b0000_0001; + #[cfg(feature = "db-int-key")] + const INTEGER_KEY = 0b0000_0010; + } +} + +impl BackendFlags for DatabaseFlagsImpl { + fn empty() -> DatabaseFlagsImpl { + DatabaseFlagsImpl::empty() + } +} + +impl BackendDatabaseFlags for DatabaseFlagsImpl { + fn set(&mut self, flag: DatabaseFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<DatabaseFlagsImpl> for DatabaseFlags { + fn into(self) -> DatabaseFlagsImpl { + match self { + DatabaseFlags::REVERSE_KEY => unimplemented!(), + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_SORT => DatabaseFlagsImpl::DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_FIXED => unimplemented!(), + #[cfg(feature = "db-int-key")] + DatabaseFlags::INTEGER_KEY => DatabaseFlagsImpl::INTEGER_KEY, + DatabaseFlags::INTEGER_DUP => unimplemented!(), + DatabaseFlags::REVERSE_DUP => unimplemented!(), + } + } +} + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct WriteFlagsImpl: u32 { + const NIL = 0b0000_0000; + } +} + +impl BackendFlags for WriteFlagsImpl { + fn empty() -> WriteFlagsImpl { + WriteFlagsImpl::empty() + } +} + +impl BackendWriteFlags for WriteFlagsImpl { + fn set(&mut self, flag: WriteFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<WriteFlagsImpl> for WriteFlags { + fn into(self) -> WriteFlagsImpl { + match self { + WriteFlags::NO_OVERWRITE => unimplemented!(), + WriteFlags::NO_DUP_DATA => unimplemented!(), + WriteFlags::CURRENT => unimplemented!(), + WriteFlags::APPEND => unimplemented!(), + WriteFlags::APPEND_DUP => unimplemented!(), + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/info.rs b/third_party/rust/rkv/src/backend/impl_safe/info.rs new file mode 100644 index 0000000000..18f0f51da3 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/info.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendInfo; + +pub struct InfoImpl; + +impl BackendInfo for InfoImpl { + fn map_size(&self) -> usize { + unimplemented!() + } + + fn last_pgno(&self) -> usize { + unimplemented!() + } + + fn last_txnid(&self) -> usize { + unimplemented!() + } + + fn max_readers(&self) -> usize { + unimplemented!() + } + + fn num_readers(&self) -> usize { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/iter.rs b/third_party/rust/rkv/src/backend/impl_safe/iter.rs new file mode 100644 index 0000000000..a784c00a5b --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/iter.rs @@ -0,0 +1,24 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::ErrorImpl; +use crate::backend::traits::BackendIter; + +// FIXME: Use generics instead. +pub struct IterImpl<'i>(pub(crate) Box<dyn Iterator<Item = (&'i [u8], &'i [u8])> + 'i>); + +impl<'i> BackendIter<'i> for IterImpl<'i> { + type Error = ErrorImpl; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>> { + self.0.next().map(Ok) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs b/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs new file mode 100644 index 0000000000..95f5690d34 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs @@ -0,0 +1,141 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use serde_derive::{Deserialize, Serialize}; + +use super::DatabaseFlagsImpl; + +type Key = Box<[u8]>; +type Value = Box<[u8]>; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Snapshot { + flags: DatabaseFlagsImpl, + #[cfg(not(feature = "db-dup-sort"))] + map: Arc<BTreeMap<Key, Value>>, + #[cfg(feature = "db-dup-sort")] + map: Arc<BTreeMap<Key, BTreeSet<Value>>>, +} + +impl Snapshot { + pub(crate) fn new(flags: Option<DatabaseFlagsImpl>) -> Snapshot { + Snapshot { + flags: flags.unwrap_or_default(), + map: Default::default(), + } + } + + pub(crate) fn flags(&self) -> &DatabaseFlagsImpl { + &self.flags + } + + pub(crate) fn clear(&mut self) { + self.map = Default::default(); + } +} + +#[cfg(not(feature = "db-dup-sort"))] +impl Snapshot { + pub(crate) fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.map.get(key).map(|value| value.as_ref()) + } + + pub(crate) fn put(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + map.insert(Box::from(key), Box::from(value)); + } + + pub(crate) fn del(&mut self, key: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + map.remove(key).map(|_| ()) + } + + pub(crate) fn iter(&self) -> impl Iterator<Item = (&[u8], &[u8])> { + self.map + .iter() + .map(|(key, value)| (key.as_ref(), value.as_ref())) + } +} + +#[cfg(feature = "db-dup-sort")] +impl Snapshot { + pub(crate) fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.map + .get(key) + .and_then(|v| v.iter().next()) + .map(|v| v.as_ref()) + } + + pub(crate) fn put(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => { + let mut values = BTreeSet::new(); + values.insert(Box::from(value)); + map.insert(Box::from(key), values); + } + Some(values) => { + values.clear(); + values.insert(Box::from(value)); + } + } + } + + pub(crate) fn del(&mut self, key: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => None, + Some(values) => { + let was_empty = values.is_empty(); + values.clear(); + Some(()).filter(|_| !was_empty) + } + } + } + + pub(crate) fn iter(&self) -> impl Iterator<Item = (&[u8], impl Iterator<Item = &[u8]>)> { + self.map + .iter() + .map(|(key, values)| (key.as_ref(), values.iter().map(|value| value.as_ref()))) + } +} + +#[cfg(feature = "db-dup-sort")] +impl Snapshot { + pub(crate) fn put_dup(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => { + let mut values = BTreeSet::new(); + values.insert(Box::from(value)); + map.insert(Box::from(key), values); + } + Some(values) => { + values.insert(Box::from(value)); + } + } + } + + pub(crate) fn del_exact(&mut self, key: &[u8], value: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => None, + Some(values) => { + let was_removed = values.remove(value); + Some(()).filter(|_| was_removed) + } + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/stat.rs b/third_party/rust/rkv/src/backend/impl_safe/stat.rs new file mode 100644 index 0000000000..c117b56833 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/stat.rs @@ -0,0 +1,39 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendStat; + +pub struct StatImpl; + +impl BackendStat for StatImpl { + fn page_size(&self) -> usize { + unimplemented!() + } + + fn depth(&self) -> usize { + unimplemented!() + } + + fn branch_pages(&self) -> usize { + unimplemented!() + } + + fn leaf_pages(&self) -> usize { + unimplemented!() + } + + fn overflow_pages(&self) -> usize { + unimplemented!() + } + + fn entries(&self) -> usize { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/transaction.rs b/third_party/rust/rkv/src/backend/impl_safe/transaction.rs new file mode 100644 index 0000000000..d37352a42d --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/transaction.rs @@ -0,0 +1,208 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(dead_code)] // TODO: Get rid of unused struct members + +use std::{collections::HashMap, sync::Arc}; + +use super::{ + snapshot::Snapshot, DatabaseImpl, EnvironmentImpl, ErrorImpl, RoCursorImpl, WriteFlagsImpl, +}; +use crate::backend::traits::{ + BackendRoCursorTransaction, BackendRoTransaction, BackendRwCursorTransaction, + BackendRwTransaction, +}; + +#[derive(Debug)] +pub struct RoTransactionImpl<'t> { + env: &'t EnvironmentImpl, + snapshots: HashMap<DatabaseImpl, Snapshot>, + idx: Arc<()>, +} + +impl<'t> RoTransactionImpl<'t> { + pub(crate) fn new( + env: &'t EnvironmentImpl, + idx: Arc<()>, + ) -> Result<RoTransactionImpl<'t>, ErrorImpl> { + let snapshots = env + .dbs()? + .arena + .iter() + .map(|(id, db)| (DatabaseImpl(id), db.snapshot())) + .collect(); + Ok(RoTransactionImpl { + env, + snapshots, + idx, + }) + } +} + +impl<'t> BackendRoTransaction for RoTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.get(key).ok_or(ErrorImpl::KeyValuePairNotFound) + } + + fn abort(self) { + // noop + } +} + +impl<'t> BackendRoCursorTransaction<'t> for RoTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + Ok(RoCursorImpl(snapshot)) + } +} + +#[derive(Debug)] +pub struct RwTransactionImpl<'t> { + env: &'t EnvironmentImpl, + snapshots: HashMap<DatabaseImpl, Snapshot>, + idx: Arc<()>, +} + +impl<'t> RwTransactionImpl<'t> { + pub(crate) fn new( + env: &'t EnvironmentImpl, + idx: Arc<()>, + ) -> Result<RwTransactionImpl<'t>, ErrorImpl> { + let snapshots = env + .dbs()? + .arena + .iter() + .map(|(id, db)| (DatabaseImpl(id), db.snapshot())) + .collect(); + Ok(RwTransactionImpl { + env, + snapshots, + idx, + }) + } +} + +impl<'t> BackendRwTransaction for RwTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = WriteFlagsImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.get(key).ok_or(ErrorImpl::KeyValuePairNotFound) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + _flags: Self::Flags, + ) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or_else(|| ErrorImpl::DbIsForeignError)?; + snapshot.put(key, value); + Ok(()) + } + + #[cfg(feature = "db-dup-sort")] + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + _flags: Self::Flags, + ) -> Result<(), Self::Error> { + use super::DatabaseFlagsImpl; + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + if snapshot.flags().contains(DatabaseFlagsImpl::DUP_SORT) { + snapshot.put_dup(key, value); + } else { + snapshot.put(key, value); + } + Ok(()) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or_else(|| ErrorImpl::DbIsForeignError)?; + let deleted = snapshot.del(key); + Ok(deleted.ok_or_else(|| ErrorImpl::KeyValuePairNotFound)?) + } + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error> { + use super::DatabaseFlagsImpl; + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + let deleted = match (value, snapshot.flags()) { + (Some(value), flags) if flags.contains(DatabaseFlagsImpl::DUP_SORT) => { + snapshot.del_exact(key, value) + } + _ => snapshot.del(key), + }; + deleted.ok_or(ErrorImpl::KeyValuePairNotFound) + } + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.clear(); + Ok(()) + } + + fn commit(self) -> Result<(), Self::Error> { + let mut dbs = self.env.dbs_mut()?; + + for (id, snapshot) in self.snapshots { + let db = dbs.arena.get_mut(id.0).ok_or(ErrorImpl::DbIsForeignError)?; + db.replace(snapshot); + } + + drop(dbs); + self.env.write_to_disk() + } + + fn abort(self) { + // noop + } +} + +impl<'t> BackendRwCursorTransaction<'t> for RwTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + Ok(RoCursorImpl(snapshot)) + } +} diff --git a/third_party/rust/rkv/src/backend/traits.rs b/third_party/rust/rkv/src/backend/traits.rs new file mode 100644 index 0000000000..1dda15a85c --- /dev/null +++ b/third_party/rust/rkv/src/backend/traits.rs @@ -0,0 +1,202 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + fmt::{Debug, Display}, + path::{Path, PathBuf}, +}; + +use crate::{ + backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + error::StoreError, +}; + +pub trait BackendError: Debug + Display + Into<StoreError> {} + +pub trait BackendDatabase: Debug + Eq + PartialEq + Copy + Clone {} + +pub trait BackendFlags: Debug + Eq + PartialEq + Copy + Clone + Default { + fn empty() -> Self; +} + +pub trait BackendEnvironmentFlags: BackendFlags { + fn set(&mut self, flag: EnvironmentFlags, value: bool); +} + +pub trait BackendDatabaseFlags: BackendFlags { + fn set(&mut self, flag: DatabaseFlags, value: bool); +} + +pub trait BackendWriteFlags: BackendFlags { + fn set(&mut self, flag: WriteFlags, value: bool); +} + +pub trait BackendStat { + fn page_size(&self) -> usize; + + fn depth(&self) -> usize; + + fn branch_pages(&self) -> usize; + + fn leaf_pages(&self) -> usize; + + fn overflow_pages(&self) -> usize; + + fn entries(&self) -> usize; +} + +pub trait BackendInfo { + fn map_size(&self) -> usize; + + fn last_pgno(&self) -> usize; + + fn last_txnid(&self) -> usize; + + fn max_readers(&self) -> usize; + + fn num_readers(&self) -> usize; +} + +pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone { + type Error: BackendError; + type Environment: BackendEnvironment<'b>; + type Flags: BackendEnvironmentFlags; + + fn new() -> Self; + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>; + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self; + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self; + + fn set_map_size(&mut self, size: usize) -> &mut Self; + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self; + + fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self; + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>; +} + +pub trait BackendEnvironment<'e>: Debug { + type Error: BackendError; + type Database: BackendDatabase; + type Flags: BackendDatabaseFlags; + type Stat: BackendStat; + type Info: BackendInfo; + type RoTransaction: BackendRoCursorTransaction<'e, Database = Self::Database>; + type RwTransaction: BackendRwCursorTransaction<'e, Database = Self::Database>; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error>; + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error>; + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error>; + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error>; + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error>; + + fn sync(&self, force: bool) -> Result<(), Self::Error>; + + fn stat(&self) -> Result<Self::Stat, Self::Error>; + + fn info(&self) -> Result<Self::Info, Self::Error>; + + fn freelist(&self) -> Result<usize, Self::Error>; + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error>; + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error>; + + fn get_files_on_disk(&self) -> Vec<PathBuf>; +} + +pub trait BackendRoTransaction: Debug { + type Error: BackendError; + type Database: BackendDatabase; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>; + + fn abort(self); +} + +pub trait BackendRwTransaction: Debug { + type Error: BackendError; + type Database: BackendDatabase; + type Flags: BackendWriteFlags; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>; + + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + flags: Self::Flags, + ) -> Result<(), Self::Error>; + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error>; + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error>; + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error>; + + fn commit(self) -> Result<(), Self::Error>; + + fn abort(self); +} + +pub trait BackendRoCursorTransaction<'t>: BackendRoTransaction { + type RoCursor: BackendRoCursor<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error>; +} + +pub trait BackendRwCursorTransaction<'t>: BackendRwTransaction { + type RoCursor: BackendRoCursor<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error>; +} + +pub trait BackendRoCursor<'c>: Debug { + type Iter: BackendIter<'c>; + + fn into_iter(self) -> Self::Iter; + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c; + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c; +} + +pub trait BackendIter<'i> { + type Error: BackendError; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>>; +} |