diff options
Diffstat (limited to '')
44 files changed, 6819 insertions, 0 deletions
diff --git a/third_party/rust/rkv/src/backend.rs b/third_party/rust/rkv/src/backend.rs new file mode 100644 index 0000000000..63fa08a40c --- /dev/null +++ b/third_party/rust/rkv/src/backend.rs @@ -0,0 +1,40 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod common; +#[cfg(feature = "lmdb")] +mod impl_lmdb; +mod impl_safe; +mod traits; + +pub use common::*; +pub use traits::*; + +#[cfg(feature = "lmdb")] +pub use impl_lmdb::{ + ArchMigrateError as LmdbArchMigrateError, ArchMigrateResult as LmdbArchMigrateResult, + ArchMigrator as LmdbArchMigrator, DatabaseFlagsImpl as LmdbDatabaseFlags, + DatabaseImpl as LmdbDatabase, EnvironmentBuilderImpl as Lmdb, + EnvironmentFlagsImpl as LmdbEnvironmentFlags, EnvironmentImpl as LmdbEnvironment, + ErrorImpl as LmdbError, InfoImpl as LmdbInfo, IterImpl as LmdbIter, + RoCursorImpl as LmdbRoCursor, RoTransactionImpl as LmdbRoTransaction, + RwCursorImpl as LmdbRwCursor, RwTransactionImpl as LmdbRwTransaction, StatImpl as LmdbStat, + WriteFlagsImpl as LmdbWriteFlags, +}; + +pub use impl_safe::{ + DatabaseFlagsImpl as SafeModeDatabaseFlags, DatabaseImpl as SafeModeDatabase, + EnvironmentBuilderImpl as SafeMode, EnvironmentFlagsImpl as SafeModeEnvironmentFlags, + EnvironmentImpl as SafeModeEnvironment, ErrorImpl as SafeModeError, InfoImpl as SafeModeInfo, + IterImpl as SafeModeIter, RoCursorImpl as SafeModeRoCursor, + RoTransactionImpl as SafeModeRoTransaction, RwCursorImpl as SafeModeRwCursor, + RwTransactionImpl as SafeModeRwTransaction, StatImpl as SafeModeStat, + WriteFlagsImpl as SafeModeWriteFlags, +}; diff --git a/third_party/rust/rkv/src/backend/common.rs b/third_party/rust/rkv/src/backend/common.rs new file mode 100644 index 0000000000..bea3839d03 --- /dev/null +++ b/third_party/rust/rkv/src/backend/common.rs @@ -0,0 +1,44 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(non_camel_case_types)] + +pub enum EnvironmentFlags { + FIXED_MAP, + NO_SUB_DIR, + WRITE_MAP, + READ_ONLY, + NO_META_SYNC, + NO_SYNC, + MAP_ASYNC, + NO_TLS, + NO_LOCK, + NO_READAHEAD, + NO_MEM_INIT, +} + +pub enum DatabaseFlags { + REVERSE_KEY, + #[cfg(feature = "db-dup-sort")] + DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DUP_FIXED, + #[cfg(feature = "db-int-key")] + INTEGER_KEY, + INTEGER_DUP, + REVERSE_DUP, +} + +pub enum WriteFlags { + NO_OVERWRITE, + NO_DUP_DATA, + CURRENT, + APPEND, + APPEND_DUP, +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb.rs b/third_party/rust/rkv/src/backend/impl_lmdb.rs new file mode 100644 index 0000000000..5364214598 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb.rs @@ -0,0 +1,34 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod arch_migrator; +mod arch_migrator_error; +mod cursor; +mod database; +mod environment; +mod error; +mod flags; +mod info; +mod iter; +mod stat; +mod transaction; + +pub use arch_migrator::{ + MigrateError as ArchMigrateError, MigrateResult as ArchMigrateResult, Migrator as ArchMigrator, +}; +pub use cursor::{RoCursorImpl, RwCursorImpl}; +pub use database::DatabaseImpl; +pub use environment::{EnvironmentBuilderImpl, EnvironmentImpl}; +pub use error::ErrorImpl; +pub use flags::{DatabaseFlagsImpl, EnvironmentFlagsImpl, WriteFlagsImpl}; +pub use info::InfoImpl; +pub use iter::IterImpl; +pub use stat::StatImpl; +pub use transaction::{RoTransactionImpl, RwTransactionImpl}; diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs new file mode 100644 index 0000000000..6a91db1a3b --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator.rs @@ -0,0 +1,998 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(dead_code)] // TODO: Get rid of unused struct members +#![allow(clippy::upper_case_acronyms)] // TODO: Consider renaming things like `BRANCH` + +//! A utility for migrating data from one LMDB environment to another. Notably, this tool +//! can migrate data from an enviroment created with a different bit-depth than the +//! current rkv consumer, which enables the consumer to retrieve data from an environment +//! that can't be read directly using the rkv APIs. +//! +//! The utility supports both 32-bit and 64-bit LMDB source environments, and it +//! automatically migrates data in both the default database and any named (sub) +//! databases. It also migrates the source environment's "map size" and "max DBs" +//! configuration options to the destination environment. +//! +//! The destination environment must be at the rkv consumer's bit depth and should be +//! empty of data. It can be an empty directory, in which case the utility will create a +//! new LMDB environment within the directory. +//! +//! The tool currently has these limitations: +//! +//! 1. It doesn't support migration from environments created with +//! `EnvironmentFlags::NO_SUB_DIR`. To migrate such an environment, create a +//! temporary directory, copy the environment's data file to a file called data.mdb in +//! the temporary directory, then migrate the temporary directory as the source +//! environment. +//! 2. It doesn't support migration from databases created with DatabaseFlags::DUP_SORT` +//! (with or without `DatabaseFlags::DUP_FIXED`). +//! 3. It doesn't account for existing data in the destination environment, which means +//! that it can overwrite data (causing data loss) or fail to migrate data if the +//! destination environment contains existing data. +//! +//! ## Basic Usage +//! +//! Call `Migrator::new()` with the path to the source environment to create a `Migrator` +//! instance; then call the instance's `migrate()` method with the path to the destination +//! environment to migrate data from the source to the destination environment. For +//! example, this snippet migrates data from the tests/envs/ref_env_32 environment to a +//! new environment in a temporary directory: +//! +//! ``` +//! use rkv::migrator::LmdbArchMigrator as Migrator; +//! use std::path::Path; +//! use tempfile::tempdir; +//! let mut migrator = Migrator::new(Path::new("tests/envs/ref_env_32")).unwrap(); +//! migrator.migrate(&tempdir().unwrap().path()).unwrap(); +//! ``` +//! +//! Both `Migrator::new()` and `migrate()` return a `MigrateResult` that is either an +//! `Ok()` result or an `Err<MigrateError>`, where `MigrateError` is an enum whose +//! variants identify specific kinds of migration failures. + +use std::{ + collections::{BTreeMap, HashMap}, + convert::TryFrom, + fs::File, + io::{Cursor, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + rc::Rc, + str, +}; + +use bitflags::bitflags; +use byteorder::{LittleEndian, ReadBytesExt}; +use lmdb::{DatabaseFlags, Environment, Transaction, WriteFlags}; + +pub use super::arch_migrator_error::MigrateError; + +const PAGESIZE: u16 = 4096; + +// The magic number is 0xBEEFC0DE, which is 0xDEC0EFBE in little-endian. It appears at +// offset 12 on 32-bit systems and 16 on 64-bit systems. We don't support big-endian +// migration, but presumably we could do so by detecting the order of the bytes. +const MAGIC: [u8; 4] = [0xDE, 0xC0, 0xEF, 0xBE]; + +pub type MigrateResult<T> = Result<T, MigrateError>; + +bitflags! { + #[derive(Default)] + struct PageFlags: u16 { + const BRANCH = 0x01; + const LEAF = 0x02; + const OVERFLOW = 0x04; + const META = 0x08; + const DIRTY = 0x10; + const LEAF2 = 0x20; + const SUBP = 0x40; + const LOOSE = 0x4000; + const KEEP = 0x8000; + } +} + +bitflags! { + #[derive(Default)] + struct NodeFlags: u16 { + const BIGDATA = 0x01; + const SUBDATA = 0x02; + const DUPDATA = 0x04; + } +} + +// The bit depth of the executable that created an LMDB environment. The Migrator +// determines this automatically based on the location of the magic number in data.mdb. +#[derive(Clone, Copy, PartialEq)] +enum Bits { + U32, + U64, +} + +impl Bits { + // The size of usize for the bit-depth represented by the enum variant. + fn size(self) -> usize { + match self { + Bits::U32 => 4, + Bits::U64 => 8, + } + } +} + +// The equivalent of PAGEHDRSZ in LMDB, except that this one varies by bits. +fn page_header_size(bits: Bits) -> u64 { + match bits { + Bits::U32 => 12, + Bits::U64 => 16, + } +} + +// The equivalent of P_INVALID in LMDB, except that this one varies by bits. +fn validate_page_num(page_num: u64, bits: Bits) -> MigrateResult<()> { + let invalid_page_num = match bits { + Bits::U32 => u64::from(!0u32), + Bits::U64 => !0u64, + }; + + if page_num == invalid_page_num { + return Err(MigrateError::InvalidPageNum); + } + + Ok(()) +} + +#[derive(Clone, Debug, Default)] +struct Database { + md_pad: u32, + md_flags: DatabaseFlags, + md_depth: u16, + md_branch_pages: u64, + md_leaf_pages: u64, + md_overflow_pages: u64, + md_entries: u64, + md_root: u64, +} + +impl Database { + fn new(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<Database> { + Ok(Database { + md_pad: cursor.read_u32::<LittleEndian>()?, + md_flags: DatabaseFlags::from_bits(cursor.read_u16::<LittleEndian>()?.into()) + .ok_or(MigrateError::InvalidDatabaseBits)?, + md_depth: cursor.read_u16::<LittleEndian>()?, + md_branch_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_leaf_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_overflow_pages: cursor.read_uint::<LittleEndian>(bits.size())?, + md_entries: cursor.read_uint::<LittleEndian>(bits.size())?, + md_root: cursor.read_uint::<LittleEndian>(bits.size())?, + }) + } +} + +#[derive(Debug, Default)] +struct Databases { + free: Database, + main: Database, +} + +#[derive(Debug, Default)] +struct MetaData { + mm_magic: u32, + mm_version: u32, + mm_address: u64, + mm_mapsize: u64, + mm_dbs: Databases, + mm_last_pg: u64, + mm_txnid: u64, +} + +#[derive(Debug)] +enum LeafNode { + Regular { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + value: Vec<u8>, + }, + BigData { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + overflow_pgno: u64, + }, + SubData { + mn_lo: u16, + mn_hi: u16, + mn_flags: NodeFlags, + mn_ksize: u16, + mv_size: u32, + key: Vec<u8>, + value: Vec<u8>, + db: Database, + }, +} + +#[derive(Debug, Default)] +struct BranchNode { + mp_pgno: u64, + mn_ksize: u16, + mn_data: Vec<u8>, +} + +#[derive(Debug)] +enum PageHeader { + Regular { + mp_pgno: u64, + mp_flags: PageFlags, + pb_lower: u16, + pb_upper: u16, + }, + Overflow { + mp_pgno: u64, + mp_flags: PageFlags, + pb_pages: u32, + }, +} + +#[derive(Debug)] +enum Page { + META(MetaData), + LEAF(Vec<LeafNode>), + BRANCH(Vec<BranchNode>), +} + +impl Page { + fn new(buf: Vec<u8>, bits: Bits) -> MigrateResult<Page> { + let mut cursor = std::io::Cursor::new(&buf[..]); + + match Self::parse_page_header(&mut cursor, bits)? { + PageHeader::Regular { + mp_flags, pb_lower, .. + } => { + if mp_flags.contains(PageFlags::LEAF2) || mp_flags.contains(PageFlags::SUBP) { + // We don't yet support DUPFIXED and DUPSORT databases. + return Err(MigrateError::UnsupportedPageHeaderVariant); + } + + if mp_flags.contains(PageFlags::META) { + let meta_data = Self::parse_meta_data(&mut cursor, bits)?; + Ok(Page::META(meta_data)) + } else if mp_flags.contains(PageFlags::LEAF) { + let nodes = Self::parse_leaf_nodes(&mut cursor, pb_lower, bits)?; + Ok(Page::LEAF(nodes)) + } else if mp_flags.contains(PageFlags::BRANCH) { + let nodes = Self::parse_branch_nodes(&mut cursor, pb_lower, bits)?; + Ok(Page::BRANCH(nodes)) + } else { + Err(MigrateError::UnexpectedPageHeaderVariant) + } + } + PageHeader::Overflow { .. } => { + // There isn't anything to do, nor should we try to instantiate + // a page of this type, as we only access them when reading + // a value that is too large to fit into a leaf node. + Err(MigrateError::UnexpectedPageHeaderVariant) + } + } + } + + fn parse_page_header(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<PageHeader> { + let mp_pgno = cursor.read_uint::<LittleEndian>(bits.size())?; + let _mp_pad = cursor.read_u16::<LittleEndian>()?; + let mp_flags = PageFlags::from_bits(cursor.read_u16::<LittleEndian>()?) + .ok_or(MigrateError::InvalidPageBits)?; + + if mp_flags.contains(PageFlags::OVERFLOW) { + let pb_pages = cursor.read_u32::<LittleEndian>()?; + Ok(PageHeader::Overflow { + mp_pgno, + mp_flags, + pb_pages, + }) + } else { + let pb_lower = cursor.read_u16::<LittleEndian>()?; + let pb_upper = cursor.read_u16::<LittleEndian>()?; + Ok(PageHeader::Regular { + mp_pgno, + mp_flags, + pb_lower, + pb_upper, + }) + } + } + + fn parse_meta_data(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<MetaData> { + cursor.seek(SeekFrom::Start(page_header_size(bits)))?; + + Ok(MetaData { + mm_magic: cursor.read_u32::<LittleEndian>()?, + mm_version: cursor.read_u32::<LittleEndian>()?, + mm_address: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_mapsize: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_dbs: Databases { + free: Database::new(cursor, bits)?, + main: Database::new(cursor, bits)?, + }, + mm_last_pg: cursor.read_uint::<LittleEndian>(bits.size())?, + mm_txnid: cursor.read_uint::<LittleEndian>(bits.size())?, + }) + } + + fn parse_leaf_nodes( + cursor: &mut Cursor<&[u8]>, + pb_lower: u16, + bits: Bits, + ) -> MigrateResult<Vec<LeafNode>> { + cursor.set_position(page_header_size(bits)); + let num_keys = Self::num_keys(pb_lower, bits); + let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?; + + let mut leaf_nodes = Vec::with_capacity(num_keys as usize); + + for mp_ptr in mp_ptrs { + cursor.set_position(u64::from(mp_ptr)); + leaf_nodes.push(Self::parse_leaf_node(cursor, bits)?); + } + + Ok(leaf_nodes) + } + + fn parse_leaf_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<LeafNode> { + // The order of the mn_lo and mn_hi fields is endian-dependent and would be + // reversed in an LMDB environment created on a big-endian system. + let mn_lo = cursor.read_u16::<LittleEndian>()?; + let mn_hi = cursor.read_u16::<LittleEndian>()?; + + let mn_flags = NodeFlags::from_bits(cursor.read_u16::<LittleEndian>()?) + .ok_or(MigrateError::InvalidNodeBits)?; + let mn_ksize = cursor.read_u16::<LittleEndian>()?; + + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mn_ksize))?; + let key = cursor.get_ref()[start..end].to_vec(); + cursor.set_position(end as u64); + + let mv_size = Self::leaf_node_size(mn_lo, mn_hi); + if mn_flags.contains(NodeFlags::BIGDATA) { + let overflow_pgno = cursor.read_uint::<LittleEndian>(bits.size())?; + Ok(LeafNode::BigData { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + overflow_pgno, + }) + } else if mn_flags.contains(NodeFlags::SUBDATA) { + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mv_size))?; + let value = cursor.get_ref()[start..end].to_vec(); + let mut cursor = std::io::Cursor::new(&value[..]); + let db = Database::new(&mut cursor, bits)?; + validate_page_num(db.md_root, bits)?; + Ok(LeafNode::SubData { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + value, + db, + }) + } else { + let start = usize::try_from(cursor.position())?; + let end = usize::try_from(cursor.position() + u64::from(mv_size))?; + let value = cursor.get_ref()[start..end].to_vec(); + Ok(LeafNode::Regular { + mn_lo, + mn_hi, + mn_flags, + mn_ksize, + mv_size, + key, + value, + }) + } + } + + fn leaf_node_size(mn_lo: u16, mn_hi: u16) -> u32 { + u32::from(mn_lo) + ((u32::from(mn_hi)) << 16) + } + + fn parse_branch_nodes( + cursor: &mut Cursor<&[u8]>, + pb_lower: u16, + bits: Bits, + ) -> MigrateResult<Vec<BranchNode>> { + let num_keys = Self::num_keys(pb_lower, bits); + let mp_ptrs = Self::parse_mp_ptrs(cursor, num_keys)?; + + let mut branch_nodes = Vec::with_capacity(num_keys as usize); + + for mp_ptr in mp_ptrs { + cursor.set_position(u64::from(mp_ptr)); + branch_nodes.push(Self::parse_branch_node(cursor, bits)?) + } + + Ok(branch_nodes) + } + + fn parse_branch_node(cursor: &mut Cursor<&[u8]>, bits: Bits) -> MigrateResult<BranchNode> { + // The order of the mn_lo and mn_hi fields is endian-dependent and would be + // reversed in an LMDB environment created on a big-endian system. + let mn_lo = cursor.read_u16::<LittleEndian>()?; + let mn_hi = cursor.read_u16::<LittleEndian>()?; + + let mn_flags = cursor.read_u16::<LittleEndian>()?; + + // Branch nodes overload the mn_lo, mn_hi, and mn_flags fields to store the page + // number, so we derive the number from those fields. + let mp_pgno = Self::branch_node_page_num(mn_lo, mn_hi, mn_flags, bits); + + let mn_ksize = cursor.read_u16::<LittleEndian>()?; + + let position = cursor.position(); + let start = usize::try_from(position)?; + let end = usize::try_from(position + u64::from(mn_ksize))?; + let mn_data = cursor.get_ref()[start..end].to_vec(); + cursor.set_position(end as u64); + + Ok(BranchNode { + mp_pgno, + mn_ksize, + mn_data, + }) + } + + fn branch_node_page_num(mn_lo: u16, mn_hi: u16, mn_flags: u16, bits: Bits) -> u64 { + let mut page_num = u64::from(u32::from(mn_lo) + (u32::from(mn_hi) << 16)); + if bits == Bits::U64 { + page_num += u64::from(mn_flags) << 32; + } + page_num + } + + fn parse_mp_ptrs(cursor: &mut Cursor<&[u8]>, num_keys: u64) -> MigrateResult<Vec<u16>> { + let mut mp_ptrs = Vec::with_capacity(num_keys as usize); + for _ in 0..num_keys { + mp_ptrs.push(cursor.read_u16::<LittleEndian>()?); + } + Ok(mp_ptrs) + } + + fn num_keys(pb_lower: u16, bits: Bits) -> u64 { + (u64::from(pb_lower) - page_header_size(bits)) >> 1 + } +} + +pub struct Migrator { + file: File, + bits: Bits, +} + +impl Migrator { + /// Create a new Migrator for the LMDB environment at the given path. This tries to + /// open the data.mdb file in the environment and determine the bit depth of the + /// executable that created it, so it can fail and return an Err if the file can't be + /// opened or the depth determined. + pub fn new(path: &Path) -> MigrateResult<Migrator> { + let mut path = PathBuf::from(path); + path.push("data.mdb"); + let mut file = File::open(&path)?; + + file.seek(SeekFrom::Start(page_header_size(Bits::U32)))?; + let mut buf = [0; 4]; + file.read_exact(&mut buf)?; + + let bits = if buf == MAGIC { + Bits::U32 + } else { + file.seek(SeekFrom::Start(page_header_size(Bits::U64)))?; + file.read_exact(&mut buf)?; + if buf == MAGIC { + Bits::U64 + } else { + return Err(MigrateError::IndeterminateBitDepth); + } + }; + + Ok(Migrator { file, bits }) + } + + /// Dump the data in one of the databases in the LMDB environment. If the `database` + /// paremeter is None, then we dump the data in the main database. If it's the name + /// of a subdatabase, then we dump the data in that subdatabase. + /// + /// Note that the output isn't identical to that of the `mdb_dump` utility, since + /// `mdb_dump` includes subdatabase key/value pairs when dumping the main database, + /// and those values are architecture-dependent, since they contain pointer-sized + /// data. + /// + /// If we wanted to support identical output, we could parameterize inclusion of + /// subdatabase pairs in get_pairs() and include them when dumping data, while + /// continuing to exclude them when migrating data. + pub fn dump<T: Write>(&mut self, database: Option<&str>, mut out: T) -> MigrateResult<()> { + let meta_data = self.get_meta_data()?; + let root_page_num = meta_data.mm_dbs.main.md_root; + let root_page = Rc::new(self.get_page(root_page_num)?); + + let pairs; + if let Some(database) = database { + let subdbs = self.get_subdbs(root_page)?; + let database = subdbs + .get(database.as_bytes()) + .ok_or_else(|| MigrateError::DatabaseNotFound(database.to_string()))?; + let root_page_num = database.md_root; + let root_page = Rc::new(self.get_page(root_page_num)?); + pairs = self.get_pairs(root_page)?; + } else { + pairs = self.get_pairs(root_page)?; + } + + out.write_all(b"VERSION=3\n")?; + out.write_all(b"format=bytevalue\n")?; + if let Some(database) = database { + writeln!(out, "database={database}")?; + } + out.write_all(b"type=btree\n")?; + writeln!(out, "mapsize={}", meta_data.mm_mapsize)?; + out.write_all(b"maxreaders=126\n")?; + out.write_all(b"db_pagesize=4096\n")?; + out.write_all(b"HEADER=END\n")?; + + for (key, value) in pairs { + out.write_all(b" ")?; + for byte in key { + write!(out, "{byte:02x}")?; + } + out.write_all(b"\n")?; + out.write_all(b" ")?; + for byte in value { + write!(out, "{byte:02x}")?; + } + out.write_all(b"\n")?; + } + + out.write_all(b"DATA=END\n")?; + + Ok(()) + } + + /// Migrate all data in all of databases in the existing LMDB environment to a new + /// environment. This includes all key/value pairs in the main database that aren't + /// metadata about subdatabases and all key/value pairs in all subdatabases. + /// + /// We also set the map size and maximum databases of the new environment to their + /// values for the existing environment. But we don't set other metadata, and we + /// don't check that the new environment is empty before migrating data. + /// + /// Thus it's possible for this to overwrite existing data or fail to migrate data if + /// the new environment isn't empty. It's the consumer's responsibility to ensure + /// that data can be safely migrated to the new environment. In general, this means + /// that environment should be empty. + pub fn migrate(&mut self, dest: &Path) -> MigrateResult<()> { + let meta_data = self.get_meta_data()?; + let root_page_num = meta_data.mm_dbs.main.md_root; + validate_page_num(root_page_num, self.bits)?; + let root_page = Rc::new(self.get_page(root_page_num)?); + let subdbs = self.get_subdbs(Rc::clone(&root_page))?; + + let env = Environment::new() + .set_map_size(meta_data.mm_mapsize as usize) + .set_max_dbs(subdbs.len() as u32) + .open(dest)?; + + // Create the databases before we open a read-write transaction, since database + // creation requires its own read-write transaction, which would hang while + // awaiting completion of an existing one. + env.create_db(None, meta_data.mm_dbs.main.md_flags)?; + for (subdb_name, subdb_info) in &subdbs { + env.create_db(Some(str::from_utf8(subdb_name)?), subdb_info.md_flags)?; + } + + // Now open the read-write transaction that we'll use to migrate all the data. + let mut txn = env.begin_rw_txn()?; + + // Migrate the main database. + let pairs = self.get_pairs(root_page)?; + let db = env.open_db(None)?; + for (key, value) in pairs { + // If we knew that the target database was empty, we could specify + // WriteFlags::APPEND to speed up the migration. + txn.put(db, &key, &value, WriteFlags::empty())?; + } + + // Migrate subdatabases. + for (subdb_name, subdb_info) in &subdbs { + let root_page = Rc::new(self.get_page(subdb_info.md_root)?); + let pairs = self.get_pairs(root_page)?; + let db = env.open_db(Some(str::from_utf8(subdb_name)?))?; + for (key, value) in pairs { + // If we knew that the target database was empty, we could specify + // WriteFlags::APPEND to speed up the migration. + txn.put(db, &key, &value, WriteFlags::empty())?; + } + } + + txn.commit()?; + + Ok(()) + } + + fn get_subdbs(&mut self, root_page: Rc<Page>) -> MigrateResult<HashMap<Vec<u8>, Database>> { + let mut subdbs = HashMap::new(); + let mut pages = vec![root_page]; + + while let Some(page) = pages.pop() { + match &*page { + Page::BRANCH(nodes) => { + for branch in nodes { + pages.push(Rc::new(self.get_page(branch.mp_pgno)?)); + } + } + Page::LEAF(nodes) => { + for leaf in nodes { + if let LeafNode::SubData { key, db, .. } = leaf { + subdbs.insert(key.to_vec(), db.clone()); + }; + } + } + _ => { + return Err(MigrateError::UnexpectedPageVariant); + } + } + } + + Ok(subdbs) + } + + fn get_pairs(&mut self, root_page: Rc<Page>) -> MigrateResult<BTreeMap<Vec<u8>, Vec<u8>>> { + let mut pairs = BTreeMap::new(); + let mut pages = vec![root_page]; + + while let Some(page) = pages.pop() { + match &*page { + Page::BRANCH(nodes) => { + for branch in nodes { + pages.push(Rc::new(self.get_page(branch.mp_pgno)?)); + } + } + Page::LEAF(nodes) => { + for leaf in nodes { + match leaf { + LeafNode::Regular { key, value, .. } => { + pairs.insert(key.to_vec(), value.to_vec()); + } + LeafNode::BigData { + mv_size, + key, + overflow_pgno, + .. + } => { + // Perhaps we could reduce memory consumption during a + // migration by waiting to read big data until it's time + // to write it to the new database. + let value = self.read_data( + *overflow_pgno * u64::from(PAGESIZE) + + page_header_size(self.bits), + *mv_size as usize, + )?; + pairs.insert(key.to_vec(), value); + } + LeafNode::SubData { .. } => { + // We don't include subdatabase leaves in pairs, since + // there's no architecture-neutral representation of them, + // and in any case they're meta-data that should get + // recreated when we migrate the subdatabases themselves. + // + // If we wanted to create identical dumps to those + // produced by `mdb_dump`, however, we could allow + // consumers to specify that they'd like to include these + // records. + } + }; + } + } + _ => { + return Err(MigrateError::UnexpectedPageVariant); + } + } + } + + Ok(pairs) + } + + fn read_data(&mut self, offset: u64, size: usize) -> MigrateResult<Vec<u8>> { + self.file.seek(SeekFrom::Start(offset))?; + let mut buf: Vec<u8> = vec![0; size]; + self.file.read_exact(&mut buf[0..size])?; + Ok(buf.to_vec()) + } + + fn get_page(&mut self, page_no: u64) -> MigrateResult<Page> { + Page::new( + self.read_data(page_no * u64::from(PAGESIZE), usize::from(PAGESIZE))?, + self.bits, + ) + } + + fn get_meta_data(&mut self) -> MigrateResult<MetaData> { + let (page0, page1) = (self.get_page(0)?, self.get_page(1)?); + + match (page0, page1) { + (Page::META(meta0), Page::META(meta1)) => { + let meta = if meta1.mm_txnid > meta0.mm_txnid { + meta1 + } else { + meta0 + }; + if meta.mm_magic != 0xBE_EF_C0_DE { + return Err(MigrateError::InvalidMagicNum); + } + if meta.mm_version != 1 && meta.mm_version != 999 { + return Err(MigrateError::InvalidDataVersion); + } + Ok(meta) + } + _ => Err(MigrateError::UnexpectedPageVariant), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{env, fs, mem::size_of}; + + use lmdb::{Environment, Error as LmdbError}; + use tempfile::{tempdir, tempfile}; + + fn compare_files(ref_file: &mut File, new_file: &mut File) -> MigrateResult<()> { + ref_file.seek(SeekFrom::Start(0))?; + new_file.seek(SeekFrom::Start(0))?; + + let ref_buf = &mut [0; 1024]; + let new_buf = &mut [0; 1024]; + + loop { + match ref_file.read(ref_buf) { + Err(err) => panic!("{}", err), + Ok(ref_len) => match new_file.read(new_buf) { + Err(err) => panic!("{}", err), + Ok(new_len) => { + assert_eq!(ref_len, new_len); + if ref_len == 0 { + break; + }; + assert_eq!(ref_buf[0..ref_len], new_buf[0..new_len]); + } + }, + } + } + + Ok(()) + } + + #[test] + fn test_dump_32() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(None, &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_32_subdb() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_64() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(None, &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump.txt"].iter().collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_dump_64_subdb() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Dump data from the test env to a new dump file. + let mut migrator = Migrator::new(&test_env_path)?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_64() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_64"].iter().collect(); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(&test_env_path)?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_32() -> MigrateResult<()> { + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", "ref_env_32"].iter().collect(); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(&test_env_path)?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + Ok(()) + } + + #[test] + fn test_migrate_and_replace() -> MigrateResult<()> { + let test_env_name = match size_of::<usize>() { + 4 => "ref_env_64", + 8 => "ref_env_32", + _ => panic!("only 32- and 64-bit depths are supported"), + }; + + let cwd = env::current_dir()?; + let cwd = cwd.to_str().ok_or(MigrateError::StringConversionError)?; + let test_env_path: PathBuf = [cwd, "tests", "envs", test_env_name].iter().collect(); + + let old_env = tempdir()?; + fs::copy( + test_env_path.join("data.mdb"), + old_env.path().join("data.mdb"), + )?; + fs::copy( + test_env_path.join("lock.mdb"), + old_env.path().join("lock.mdb"), + )?; + + // Confirm that it isn't possible to open the old environment with LMDB. + assert_eq!( + match Environment::new().open(old_env.path()) { + Err(err) => err, + _ => panic!("opening the environment should have failed"), + }, + LmdbError::Invalid + ); + + // Migrate data from the old env to a new one. + let new_env = tempdir()?; + let mut migrator = Migrator::new(old_env.path())?; + migrator.migrate(new_env.path())?; + + // Dump data from the new env to a new dump file. + let mut migrator = Migrator::new(new_env.path())?; + let mut new_dump_file = tempfile()?; + migrator.dump(Some("subdb"), &new_dump_file)?; + + // Open the reference dump file. + let ref_dump_file_path: PathBuf = [cwd, "tests", "envs", "ref_dump_subdb.txt"] + .iter() + .collect(); + let mut ref_dump_file = File::open(ref_dump_file_path)?; + + // Compare the new dump file to the reference dump file. + compare_files(&mut ref_dump_file, &mut new_dump_file)?; + + // Overwrite the old env's files with the new env's files and confirm that it's now + // possible to open the old env with LMDB. + fs::copy( + new_env.path().join("data.mdb"), + old_env.path().join("data.mdb"), + )?; + fs::copy( + new_env.path().join("lock.mdb"), + old_env.path().join("lock.mdb"), + )?; + assert!(Environment::new().open(old_env.path()).is_ok()); + + Ok(()) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs new file mode 100644 index 0000000000..e23bb49770 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/arch_migrator_error.rs @@ -0,0 +1,79 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{io, num, str}; + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum MigrateError { + #[error("database not found: {0:?}")] + DatabaseNotFound(String), + + #[error("{0}")] + FromString(String), + + #[error("couldn't determine bit depth")] + IndeterminateBitDepth, + + #[error("I/O error: {0:?}")] + IoError(#[from] io::Error), + + #[error("invalid DatabaseFlags bits")] + InvalidDatabaseBits, + + #[error("invalid data version")] + InvalidDataVersion, + + #[error("invalid magic number")] + InvalidMagicNum, + + #[error("invalid NodeFlags bits")] + InvalidNodeBits, + + #[error("invalid PageFlags bits")] + InvalidPageBits, + + #[error("invalid page number")] + InvalidPageNum, + + #[error("lmdb backend error: {0}")] + LmdbError(#[from] lmdb::Error), + + #[error("string conversion error")] + StringConversionError, + + #[error("TryFromInt error: {0:?}")] + TryFromIntError(#[from] num::TryFromIntError), + + #[error("unexpected Page variant")] + UnexpectedPageVariant, + + #[error("unexpected PageHeader variant")] + UnexpectedPageHeaderVariant, + + #[error("unsupported PageHeader variant")] + UnsupportedPageHeaderVariant, + + #[error("UTF8 error: {0:?}")] + Utf8Error(#[from] str::Utf8Error), +} + +impl From<&str> for MigrateError { + fn from(e: &str) -> MigrateError { + MigrateError::FromString(e.to_string()) + } +} + +impl From<String> for MigrateError { + fn from(e: String) -> MigrateError { + MigrateError::FromString(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs b/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs new file mode 100644 index 0000000000..760abce451 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/cursor.rs @@ -0,0 +1,69 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use lmdb::Cursor; + +use super::IterImpl; +use crate::backend::traits::BackendRoCursor; + +#[derive(Debug)] +pub struct RoCursorImpl<'c>(pub(crate) lmdb::RoCursor<'c>); + +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c, lmdb::RoCursor<'c>>; + + fn into_iter(self) -> Self::Iter { + // We call RoCursor.iter() instead of RoCursor.iter_start() because + // the latter panics when there are no items in the store, whereas the + // former returns an iterator that yields no items. And since we create + // the Cursor and don't change its position, we can be sure that a call + // to Cursor.iter() will start at the beginning. + IterImpl::new(self.0, lmdb::RoCursor::iter) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_from(key)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_dup_of(key)) + } +} + +#[derive(Debug)] +pub struct RwCursorImpl<'c>(pub(crate) lmdb::RwCursor<'c>); + +impl<'c> BackendRoCursor<'c> for RwCursorImpl<'c> { + type Iter = IterImpl<'c, lmdb::RwCursor<'c>>; + + fn into_iter(self) -> Self::Iter { + IterImpl::new(self.0, lmdb::RwCursor::iter) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_from(key)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl::new(self.0, |cursor| cursor.iter_dup_of(key)) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/database.rs b/third_party/rust/rkv/src/backend/impl_lmdb/database.rs new file mode 100644 index 0000000000..8edee5c2c3 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/database.rs @@ -0,0 +1,16 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendDatabase; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct DatabaseImpl(pub(crate) lmdb::Database); + +impl BackendDatabase for DatabaseImpl {} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs b/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs new file mode 100644 index 0000000000..e528fad057 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/environment.rs @@ -0,0 +1,299 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + fs, + path::{Path, PathBuf}, +}; + +use lmdb::Error as LmdbError; + +use super::{ + DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl, + RwTransactionImpl, StatImpl, +}; +use crate::backend::traits::{ + BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor, + BackendRoCursorTransaction, BackendStat, +}; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct EnvironmentBuilderImpl { + builder: lmdb::EnvironmentBuilder, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + make_dir_if_needed: bool, +} + +impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { + type Environment = EnvironmentImpl; + type Error = ErrorImpl; + type Flags = EnvironmentFlagsImpl; + + fn new() -> EnvironmentBuilderImpl { + EnvironmentBuilderImpl { + builder: lmdb::Environment::new(), + env_path_type: EnvironmentPathType::SubDir, + env_lock_type: EnvironmentLockType::Lockfile, + env_db_type: EnvironmentDefaultDbType::SingleDatabase, + make_dir_if_needed: false, + } + } + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>, + { + let flags = flags.into(); + if flags.0 == lmdb::EnvironmentFlags::NO_SUB_DIR { + self.env_path_type = EnvironmentPathType::NoSubDir; + } + if flags.0 == lmdb::EnvironmentFlags::NO_LOCK { + self.env_lock_type = EnvironmentLockType::NoLockfile; + } + self.builder.set_flags(flags.0); + self + } + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self { + self.builder.set_max_readers(max_readers); + self + } + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self { + if max_dbs > 0 { + self.env_db_type = EnvironmentDefaultDbType::MultipleNamedDatabases + } + self.builder.set_max_dbs(max_dbs); + self + } + + fn set_map_size(&mut self, size: usize) -> &mut Self { + self.builder.set_map_size(size); + self + } + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self { + self.make_dir_if_needed = make_dir_if_needed; + self + } + + fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self { + // Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have + // been corrupted. Prefer using the `SafeMode` backend if this is important. + unimplemented!(); + } + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> { + match self.env_path_type { + EnvironmentPathType::NoSubDir => { + if !path.is_file() { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + } + EnvironmentPathType::SubDir => { + if !path.is_dir() { + if !self.make_dir_if_needed { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + fs::create_dir_all(path)?; + } + } + } + + self.builder + .open(path) + .map_err(ErrorImpl::LmdbError) + .and_then(|lmdbenv| { + EnvironmentImpl::new( + path, + self.env_path_type, + self.env_lock_type, + self.env_db_type, + lmdbenv, + ) + }) + } +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentPathType { + SubDir, + NoSubDir, +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentLockType { + Lockfile, + NoLockfile, +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum EnvironmentDefaultDbType { + SingleDatabase, + MultipleNamedDatabases, +} + +#[derive(Debug)] +pub struct EnvironmentImpl { + path: PathBuf, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + lmdbenv: lmdb::Environment, +} + +impl EnvironmentImpl { + pub(crate) fn new( + path: &Path, + env_path_type: EnvironmentPathType, + env_lock_type: EnvironmentLockType, + env_db_type: EnvironmentDefaultDbType, + lmdbenv: lmdb::Environment, + ) -> Result<EnvironmentImpl, ErrorImpl> { + Ok(EnvironmentImpl { + path: path.to_path_buf(), + env_path_type, + env_lock_type, + env_db_type, + lmdbenv, + }) + } +} + +impl<'e> BackendEnvironment<'e> for EnvironmentImpl { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = DatabaseFlagsImpl; + type Info = InfoImpl; + type RoTransaction = RoTransactionImpl<'e>; + type RwTransaction = RwTransactionImpl<'e>; + type Stat = StatImpl; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> { + if self.env_db_type == EnvironmentDefaultDbType::SingleDatabase { + return Ok(vec![None]); + } + let db = self + .lmdbenv + .open_db(None) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError)?; + let reader = self.begin_ro_txn()?; + let cursor = reader.open_ro_cursor(&db)?; + let mut iter = cursor.into_iter(); + let mut store = vec![]; + while let Some(result) = iter.next() { + let (key, _) = result?; + let name = String::from_utf8(key.to_owned()) + .map_err(|_| ErrorImpl::LmdbError(lmdb::Error::Corrupted))?; + store.push(Some(name)); + } + Ok(store) + } + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> { + self.lmdbenv + .open_db(name) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error> { + self.lmdbenv + .create_db(name, flags.0) + .map(DatabaseImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> { + self.lmdbenv + .begin_ro_txn() + .map(RoTransactionImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> { + self.lmdbenv + .begin_rw_txn() + .map(RwTransactionImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn sync(&self, force: bool) -> Result<(), Self::Error> { + self.lmdbenv.sync(force).map_err(ErrorImpl::LmdbError) + } + + fn stat(&self) -> Result<Self::Stat, Self::Error> { + self.lmdbenv + .stat() + .map(StatImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn info(&self) -> Result<Self::Info, Self::Error> { + self.lmdbenv + .info() + .map(InfoImpl) + .map_err(ErrorImpl::LmdbError) + } + + fn freelist(&self) -> Result<usize, Self::Error> { + self.lmdbenv.freelist().map_err(ErrorImpl::LmdbError) + } + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error> { + let stat = self.stat()?; + let info = self.info()?; + let freelist = self.freelist()?; + + let last_pgno = info.last_pgno() + 1; // pgno is 0 based. + let total_pgs = info.map_size() / stat.page_size(); + if freelist > last_pgno { + return Err(ErrorImpl::LmdbError(LmdbError::Corrupted)); + } + let used_pgs = last_pgno - freelist; + Ok(Some(used_pgs as f32 / total_pgs as f32)) + } + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error> { + self.lmdbenv + .set_map_size(size) + .map_err(ErrorImpl::LmdbError) + } + + fn get_files_on_disk(&self) -> Vec<PathBuf> { + let mut store = vec![]; + + if self.env_path_type == EnvironmentPathType::NoSubDir { + // The option NO_SUB_DIR could change the default directory layout; therefore this should + // probably return the path used to create environment, along with the custom lockfile + // when available. + unimplemented!(); + } + + let mut db_filename = self.path.clone(); + db_filename.push("data.mdb"); + store.push(db_filename); + + if self.env_lock_type == EnvironmentLockType::Lockfile { + let mut lock_filename = self.path.clone(); + lock_filename.push("lock.mdb"); + store.push(lock_filename); + } + + store + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/error.rs b/third_party/rust/rkv/src/backend/impl_lmdb/error.rs new file mode 100644 index 0000000000..a2fd8a7859 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/error.rs @@ -0,0 +1,57 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{fmt, io, path::PathBuf}; + +use crate::{backend::traits::BackendError, error::StoreError}; + +#[derive(Debug)] +pub enum ErrorImpl { + LmdbError(lmdb::Error), + UnsuitableEnvironmentPath(PathBuf), + IoError(io::Error), +} + +impl BackendError for ErrorImpl {} + +impl fmt::Display for ErrorImpl { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + ErrorImpl::LmdbError(e) => e.fmt(fmt), + ErrorImpl::UnsuitableEnvironmentPath(_) => write!(fmt, "UnsuitableEnvironmentPath"), + ErrorImpl::IoError(e) => e.fmt(fmt), + } + } +} + +impl Into<StoreError> for ErrorImpl { + fn into(self) -> StoreError { + match self { + ErrorImpl::LmdbError(lmdb::Error::Corrupted) => StoreError::DatabaseCorrupted, + ErrorImpl::LmdbError(lmdb::Error::NotFound) => StoreError::KeyValuePairNotFound, + ErrorImpl::LmdbError(lmdb::Error::BadValSize) => StoreError::KeyValuePairBadSize, + ErrorImpl::LmdbError(lmdb::Error::Invalid) => StoreError::FileInvalid, + ErrorImpl::LmdbError(lmdb::Error::MapFull) => StoreError::MapFull, + ErrorImpl::LmdbError(lmdb::Error::DbsFull) => StoreError::DbsFull, + ErrorImpl::LmdbError(lmdb::Error::ReadersFull) => StoreError::ReadersFull, + ErrorImpl::LmdbError(error) => StoreError::LmdbError(error), + ErrorImpl::UnsuitableEnvironmentPath(path) => { + StoreError::UnsuitableEnvironmentPath(path) + } + ErrorImpl::IoError(error) => StoreError::IoError(error), + } + } +} + +impl From<io::Error> for ErrorImpl { + fn from(e: io::Error) -> ErrorImpl { + ErrorImpl::IoError(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs b/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs new file mode 100644 index 0000000000..7a16bb7c46 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/flags.rs @@ -0,0 +1,123 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::{ + common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + traits::{BackendDatabaseFlags, BackendEnvironmentFlags, BackendFlags, BackendWriteFlags}, +}; + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct EnvironmentFlagsImpl(pub(crate) lmdb::EnvironmentFlags); + +impl BackendFlags for EnvironmentFlagsImpl { + fn empty() -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl(lmdb::EnvironmentFlags::empty()) + } +} + +impl BackendEnvironmentFlags for EnvironmentFlagsImpl { + fn set(&mut self, flag: EnvironmentFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<EnvironmentFlagsImpl> for EnvironmentFlags { + fn into(self) -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl(self.into()) + } +} + +impl Into<lmdb::EnvironmentFlags> for EnvironmentFlags { + fn into(self) -> lmdb::EnvironmentFlags { + match self { + EnvironmentFlags::FIXED_MAP => lmdb::EnvironmentFlags::FIXED_MAP, + EnvironmentFlags::NO_SUB_DIR => lmdb::EnvironmentFlags::NO_SUB_DIR, + EnvironmentFlags::WRITE_MAP => lmdb::EnvironmentFlags::WRITE_MAP, + EnvironmentFlags::READ_ONLY => lmdb::EnvironmentFlags::READ_ONLY, + EnvironmentFlags::NO_META_SYNC => lmdb::EnvironmentFlags::NO_META_SYNC, + EnvironmentFlags::NO_SYNC => lmdb::EnvironmentFlags::NO_SYNC, + EnvironmentFlags::MAP_ASYNC => lmdb::EnvironmentFlags::MAP_ASYNC, + EnvironmentFlags::NO_TLS => lmdb::EnvironmentFlags::NO_TLS, + EnvironmentFlags::NO_LOCK => lmdb::EnvironmentFlags::NO_LOCK, + EnvironmentFlags::NO_READAHEAD => lmdb::EnvironmentFlags::NO_READAHEAD, + EnvironmentFlags::NO_MEM_INIT => lmdb::EnvironmentFlags::NO_MEM_INIT, + } + } +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct DatabaseFlagsImpl(pub(crate) lmdb::DatabaseFlags); + +impl BackendFlags for DatabaseFlagsImpl { + fn empty() -> DatabaseFlagsImpl { + DatabaseFlagsImpl(lmdb::DatabaseFlags::empty()) + } +} + +impl BackendDatabaseFlags for DatabaseFlagsImpl { + fn set(&mut self, flag: DatabaseFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<DatabaseFlagsImpl> for DatabaseFlags { + fn into(self) -> DatabaseFlagsImpl { + DatabaseFlagsImpl(self.into()) + } +} + +impl Into<lmdb::DatabaseFlags> for DatabaseFlags { + fn into(self) -> lmdb::DatabaseFlags { + match self { + DatabaseFlags::REVERSE_KEY => lmdb::DatabaseFlags::REVERSE_KEY, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_SORT => lmdb::DatabaseFlags::DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_FIXED => lmdb::DatabaseFlags::DUP_FIXED, + #[cfg(feature = "db-int-key")] + DatabaseFlags::INTEGER_KEY => lmdb::DatabaseFlags::INTEGER_KEY, + DatabaseFlags::INTEGER_DUP => lmdb::DatabaseFlags::INTEGER_DUP, + DatabaseFlags::REVERSE_DUP => lmdb::DatabaseFlags::REVERSE_DUP, + } + } +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)] +pub struct WriteFlagsImpl(pub(crate) lmdb::WriteFlags); + +impl BackendFlags for WriteFlagsImpl { + fn empty() -> WriteFlagsImpl { + WriteFlagsImpl(lmdb::WriteFlags::empty()) + } +} + +impl BackendWriteFlags for WriteFlagsImpl { + fn set(&mut self, flag: WriteFlags, value: bool) { + self.0.set(flag.into(), value) + } +} + +impl Into<WriteFlagsImpl> for WriteFlags { + fn into(self) -> WriteFlagsImpl { + WriteFlagsImpl(self.into()) + } +} + +impl Into<lmdb::WriteFlags> for WriteFlags { + fn into(self) -> lmdb::WriteFlags { + match self { + WriteFlags::NO_OVERWRITE => lmdb::WriteFlags::NO_OVERWRITE, + WriteFlags::NO_DUP_DATA => lmdb::WriteFlags::NO_DUP_DATA, + WriteFlags::CURRENT => lmdb::WriteFlags::CURRENT, + WriteFlags::APPEND => lmdb::WriteFlags::APPEND, + WriteFlags::APPEND_DUP => lmdb::WriteFlags::APPEND_DUP, + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/info.rs b/third_party/rust/rkv/src/backend/impl_lmdb/info.rs new file mode 100644 index 0000000000..6188065c07 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/info.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendInfo; + +pub struct InfoImpl(pub(crate) lmdb::Info); + +impl BackendInfo for InfoImpl { + fn map_size(&self) -> usize { + self.0.map_size() + } + + fn last_pgno(&self) -> usize { + self.0.last_pgno() + } + + fn last_txnid(&self) -> usize { + self.0.last_txnid() + } + + fn max_readers(&self) -> usize { + self.0.max_readers() as usize + } + + fn num_readers(&self) -> usize { + self.0.num_readers() as usize + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs b/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs new file mode 100644 index 0000000000..519d361d4f --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/iter.rs @@ -0,0 +1,41 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::ErrorImpl; +use crate::backend::traits::BackendIter; + +pub struct IterImpl<'i, C> { + // LMDB semantics dictate that a cursor must be valid for the entire lifetime + // of an iterator. In other words, cursors must not be dropped while an + // iterator built from it is alive. Unfortunately, the LMDB crate API does + // not express this through the type system, so we must enforce it somehow. + #[allow(dead_code)] + cursor: C, + iter: lmdb::Iter<'i>, +} + +impl<'i, C> IterImpl<'i, C> { + pub(crate) fn new( + mut cursor: C, + to_iter: impl FnOnce(&mut C) -> lmdb::Iter<'i>, + ) -> IterImpl<'i, C> { + let iter = to_iter(&mut cursor); + IterImpl { cursor, iter } + } +} + +impl<'i, C> BackendIter<'i> for IterImpl<'i, C> { + type Error = ErrorImpl; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>> { + self.iter.next().map(|e| e.map_err(ErrorImpl::LmdbError)) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs b/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs new file mode 100644 index 0000000000..b0de8c5051 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/stat.rs @@ -0,0 +1,39 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendStat; + +pub struct StatImpl(pub(crate) lmdb::Stat); + +impl BackendStat for StatImpl { + fn page_size(&self) -> usize { + self.0.page_size() as usize + } + + fn depth(&self) -> usize { + self.0.depth() as usize + } + + fn branch_pages(&self) -> usize { + self.0.branch_pages() + } + + fn leaf_pages(&self) -> usize { + self.0.leaf_pages() + } + + fn overflow_pages(&self) -> usize { + self.0.overflow_pages() + } + + fn entries(&self) -> usize { + self.0.entries() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs b/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs new file mode 100644 index 0000000000..21752763fd --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_lmdb/transaction.rs @@ -0,0 +1,107 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use lmdb::Transaction; + +use super::{DatabaseImpl, ErrorImpl, RoCursorImpl, WriteFlagsImpl}; +use crate::backend::traits::{ + BackendRoCursorTransaction, BackendRoTransaction, BackendRwCursorTransaction, + BackendRwTransaction, +}; + +#[derive(Debug)] +pub struct RoTransactionImpl<'t>(pub(crate) lmdb::RoTransaction<'t>); + +impl<'t> BackendRoTransaction for RoTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + self.0.get(db.0, &key).map_err(ErrorImpl::LmdbError) + } + + fn abort(self) { + self.0.abort() + } +} + +impl<'t> BackendRoCursorTransaction<'t> for RoTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + self.0 + .open_ro_cursor(db.0) + .map(RoCursorImpl) + .map_err(ErrorImpl::LmdbError) + } +} + +#[derive(Debug)] +pub struct RwTransactionImpl<'t>(pub(crate) lmdb::RwTransaction<'t>); + +impl<'t> BackendRwTransaction for RwTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = WriteFlagsImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + self.0.get(db.0, &key).map_err(ErrorImpl::LmdbError) + } + + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + flags: Self::Flags, + ) -> Result<(), Self::Error> { + self.0 + .put(db.0, &key, &value, flags.0) + .map_err(ErrorImpl::LmdbError) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error> { + self.0.del(db.0, &key, None).map_err(ErrorImpl::LmdbError) + } + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error> { + self.0.del(db.0, &key, value).map_err(ErrorImpl::LmdbError) + } + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error> { + self.0.clear_db(db.0).map_err(ErrorImpl::LmdbError) + } + + fn commit(self) -> Result<(), Self::Error> { + self.0.commit().map_err(ErrorImpl::LmdbError) + } + + fn abort(self) { + self.0.abort() + } +} + +impl<'t> BackendRwCursorTransaction<'t> for RwTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + self.0 + .open_ro_cursor(db.0) + .map(RoCursorImpl) + .map_err(ErrorImpl::LmdbError) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe.rs b/third_party/rust/rkv/src/backend/impl_safe.rs new file mode 100644 index 0000000000..66ca012973 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe.rs @@ -0,0 +1,30 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod cursor; +mod database; +mod environment; +mod error; +mod flags; +mod info; +mod iter; +mod snapshot; +mod stat; +mod transaction; + +pub use cursor::{RoCursorImpl, RwCursorImpl}; +pub use database::DatabaseImpl; +pub use environment::{EnvironmentBuilderImpl, EnvironmentImpl}; +pub use error::ErrorImpl; +pub use flags::{DatabaseFlagsImpl, EnvironmentFlagsImpl, WriteFlagsImpl}; +pub use info::InfoImpl; +pub use iter::IterImpl; +pub use stat::StatImpl; +pub use transaction::{RoTransactionImpl, RwTransactionImpl}; diff --git a/third_party/rust/rkv/src/backend/impl_safe/cursor.rs b/third_party/rust/rkv/src/backend/impl_safe/cursor.rs new file mode 100644 index 0000000000..c3bfefad94 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/cursor.rs @@ -0,0 +1,98 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::{snapshot::Snapshot, IterImpl}; +use crate::backend::traits::BackendRoCursor; + +#[derive(Debug)] +pub struct RoCursorImpl<'c>(pub(crate) &'c Snapshot); + +#[cfg(not(feature = "db-dup-sort"))] +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + IterImpl(Box::new(self.0.iter())) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl(Box::new( + self.0.iter().skip_while(move |&(k, _)| k < key.as_ref()), + )) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + IterImpl(Box::new( + self.0.iter().filter(move |&(k, _)| k == key.as_ref()), + )) + } +} + +#[cfg(feature = "db-dup-sort")] +impl<'c> BackendRoCursor<'c> for RoCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + let flattened = self + .0 + .iter() + .flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + let skipped = self.0.iter().skip_while(move |&(k, _)| k < key.as_ref()); + let flattened = skipped.flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + let filtered = self.0.iter().filter(move |&(k, _)| k == key.as_ref()); + let flattened = filtered.flat_map(|(key, values)| values.map(move |value| (key, value))); + IterImpl(Box::new(flattened)) + } +} + +#[derive(Debug)] +pub struct RwCursorImpl<'c>(&'c mut Snapshot); + +impl<'c> BackendRoCursor<'c> for RwCursorImpl<'c> { + type Iter = IterImpl<'c>; + + fn into_iter(self) -> Self::Iter { + unimplemented!() + } + + fn into_iter_from<K>(self, _key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + unimplemented!() + } + + fn into_iter_dup_of<K>(self, _key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c, + { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/database.rs b/third_party/rust/rkv/src/backend/impl_safe/database.rs new file mode 100644 index 0000000000..f8eb4aef76 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/database.rs @@ -0,0 +1,41 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use id_arena::Id; +use serde_derive::{Deserialize, Serialize}; + +use super::{snapshot::Snapshot, DatabaseFlagsImpl}; +use crate::backend::traits::BackendDatabase; + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)] +pub struct DatabaseImpl(pub(crate) Id<Database>); + +impl BackendDatabase for DatabaseImpl {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Database { + snapshot: Snapshot, +} + +impl Database { + pub(crate) fn new(flags: Option<DatabaseFlagsImpl>, snapshot: Option<Snapshot>) -> Database { + Database { + snapshot: snapshot.unwrap_or_else(|| Snapshot::new(flags)), + } + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.snapshot.clone() + } + + pub(crate) fn replace(&mut self, snapshot: Snapshot) -> Snapshot { + std::mem::replace(&mut self.snapshot, snapshot) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/environment.rs b/third_party/rust/rkv/src/backend/impl_safe/environment.rs new file mode 100644 index 0000000000..17cfb87f1e --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/environment.rs @@ -0,0 +1,332 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + borrow::Cow, + collections::HashMap, + fs, + ops::DerefMut, + path::{Path, PathBuf}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; + +use id_arena::Arena; +use log::warn; + +use super::{ + database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, + RoTransactionImpl, RwTransactionImpl, StatImpl, +}; +use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder}; + +const DEFAULT_DB_FILENAME: &str = "data.safe.bin"; + +type DatabaseArena = Arena<Database>; +type DatabaseNameMap = HashMap<Option<String>, DatabaseImpl>; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct EnvironmentBuilderImpl { + flags: EnvironmentFlagsImpl, + max_readers: Option<usize>, + max_dbs: Option<usize>, + map_size: Option<usize>, + make_dir_if_needed: bool, + discard_if_corrupted: bool, +} + +impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { + type Environment = EnvironmentImpl; + type Error = ErrorImpl; + type Flags = EnvironmentFlagsImpl; + + fn new() -> EnvironmentBuilderImpl { + EnvironmentBuilderImpl { + flags: EnvironmentFlagsImpl::empty(), + max_readers: None, + max_dbs: None, + map_size: None, + make_dir_if_needed: false, + discard_if_corrupted: false, + } + } + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>, + { + self.flags = flags.into(); + self + } + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self { + self.max_readers = Some(max_readers as usize); + self + } + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self { + self.max_dbs = Some(max_dbs as usize); + self + } + + fn set_map_size(&mut self, map_size: usize) -> &mut Self { + self.map_size = Some(map_size); + self + } + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self { + self.make_dir_if_needed = make_dir_if_needed; + self + } + + fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self { + self.discard_if_corrupted = discard_if_corrupted; + self + } + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> { + // Technically NO_SUB_DIR should change these checks here, but they're both currently + // unimplemented with this storage backend. + if !path.is_dir() { + if !self.make_dir_if_needed { + return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into())); + } + fs::create_dir_all(path)?; + } + let mut env = EnvironmentImpl::new( + path, + self.flags, + self.max_readers, + self.max_dbs, + self.map_size, + )?; + env.read_from_disk(self.discard_if_corrupted)?; + Ok(env) + } +} + +#[derive(Debug)] +pub(crate) struct EnvironmentDbs { + pub(crate) arena: DatabaseArena, + pub(crate) name_map: DatabaseNameMap, +} + +#[derive(Debug)] +pub(crate) struct EnvironmentDbsRefMut<'a> { + pub(crate) arena: &'a mut DatabaseArena, + pub(crate) name_map: &'a mut DatabaseNameMap, +} + +impl<'a> From<&'a mut EnvironmentDbs> for EnvironmentDbsRefMut<'a> { + fn from(dbs: &mut EnvironmentDbs) -> EnvironmentDbsRefMut { + EnvironmentDbsRefMut { + arena: &mut dbs.arena, + name_map: &mut dbs.name_map, + } + } +} + +#[derive(Debug)] +pub struct EnvironmentImpl { + path: PathBuf, + max_dbs: usize, + dbs: RwLock<EnvironmentDbs>, + ro_txns: Arc<()>, + rw_txns: Arc<()>, +} + +impl EnvironmentImpl { + fn serialize(&self) -> Result<Vec<u8>, ErrorImpl> { + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + let data: HashMap<_, _> = dbs + .name_map + .iter() + .map(|(name, id)| (name, &dbs.arena[id.0])) + .collect(); + Ok(bincode::serialize(&data)?) + } + + fn deserialize( + bytes: &[u8], + discard_if_corrupted: bool, + ) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> { + let mut arena = DatabaseArena::new(); + let mut name_map = HashMap::new(); + let data: HashMap<_, _> = match bincode::deserialize(bytes) { + Err(_) if discard_if_corrupted => Ok(HashMap::new()), + result => result, + }?; + for (name, db) in data { + name_map.insert(name, DatabaseImpl(arena.alloc(db))); + } + Ok((arena, name_map)) + } +} + +impl EnvironmentImpl { + pub(crate) fn new( + path: &Path, + flags: EnvironmentFlagsImpl, + max_readers: Option<usize>, + max_dbs: Option<usize>, + map_size: Option<usize>, + ) -> Result<EnvironmentImpl, ErrorImpl> { + if !flags.is_empty() { + warn!("Ignoring `flags={:?}`", flags); + } + if let Some(max_readers) = max_readers { + warn!("Ignoring `max_readers={}`", max_readers); + } + if let Some(map_size) = map_size { + warn!("Ignoring `map_size={}`", map_size); + } + + Ok(EnvironmentImpl { + path: path.to_path_buf(), + max_dbs: max_dbs.unwrap_or(std::usize::MAX), + dbs: RwLock::new(EnvironmentDbs { + arena: DatabaseArena::new(), + name_map: HashMap::new(), + }), + ro_txns: Arc::new(()), + rw_txns: Arc::new(()), + }) + } + + pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> { + let mut path = Cow::from(&self.path); + if fs::metadata(&path)?.is_dir() { + path.to_mut().push(DEFAULT_DB_FILENAME); + }; + if fs::metadata(&path).is_err() { + return Ok(()); + }; + let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?; + self.dbs = RwLock::new(EnvironmentDbs { arena, name_map }); + Ok(()) + } + + pub(crate) fn write_to_disk(&self) -> Result<(), ErrorImpl> { + let mut path = Cow::from(&self.path); + if fs::metadata(&path)?.is_dir() { + path.to_mut().push(DEFAULT_DB_FILENAME); + }; + + // Write to a temp file first. + let tmp_path = path.with_extension("tmp"); + fs::write(&tmp_path, self.serialize()?)?; + + // Atomically move that file to the database file. + fs::rename(tmp_path, path)?; + Ok(()) + } + + pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<EnvironmentDbs>, ErrorImpl> { + self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError) + } + + pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<EnvironmentDbs>, ErrorImpl> { + self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError) + } +} + +impl<'e> BackendEnvironment<'e> for EnvironmentImpl { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = DatabaseFlagsImpl; + type Info = InfoImpl; + type RoTransaction = RoTransactionImpl<'e>; + type RwTransaction = RwTransactionImpl<'e>; + type Stat = StatImpl; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> { + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + Ok(dbs.name_map.keys().map(|key| key.to_owned()).collect()) + } + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> { + if Arc::strong_count(&self.ro_txns) > 1 { + return Err(ErrorImpl::DbsIllegalOpen); + } + // TOOD: don't reallocate `name`. + let key = name.map(String::from); + let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?; + let db = dbs.name_map.get(&key).ok_or(ErrorImpl::DbNotFoundError)?; + Ok(*db) + } + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error> { + if Arc::strong_count(&self.ro_txns) > 1 { + return Err(ErrorImpl::DbsIllegalOpen); + } + // TOOD: don't reallocate `name`. + let key = name.map(String::from); + let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?; + if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() { + return Err(ErrorImpl::DbsFull); + } + let parts = EnvironmentDbsRefMut::from(dbs.deref_mut()); + let arena = parts.arena; + let name_map = parts.name_map; + let id = name_map + .entry(key) + .or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None)))); + Ok(*id) + } + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> { + RoTransactionImpl::new(self, self.ro_txns.clone()) + } + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> { + RwTransactionImpl::new(self, self.rw_txns.clone()) + } + + fn sync(&self, force: bool) -> Result<(), Self::Error> { + warn!("Ignoring `force={}`", force); + self.write_to_disk() + } + + fn stat(&self) -> Result<Self::Stat, Self::Error> { + Ok(StatImpl) + } + + fn info(&self) -> Result<Self::Info, Self::Error> { + Ok(InfoImpl) + } + + fn freelist(&self) -> Result<usize, Self::Error> { + unimplemented!() + } + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error> { + warn!("`load_ratio()` is irrelevant for this storage backend."); + Ok(None) + } + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error> { + warn!( + "`set_map_size({})` is ignored by this storage backend.", + size + ); + Ok(()) + } + + fn get_files_on_disk(&self) -> Vec<PathBuf> { + // Technically NO_SUB_DIR and NO_LOCK should change this output, but + // they're both currently unimplemented with this storage backend. + let mut db_filename = self.path.clone(); + db_filename.push(DEFAULT_DB_FILENAME); + vec![db_filename] + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/error.rs b/third_party/rust/rkv/src/backend/impl_safe/error.rs new file mode 100644 index 0000000000..f086cdc954 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/error.rs @@ -0,0 +1,79 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{fmt, io, path::PathBuf}; + +use bincode::Error as BincodeError; + +use crate::{backend::traits::BackendError, error::StoreError}; + +#[derive(Debug)] +pub enum ErrorImpl { + KeyValuePairNotFound, + EnvPoisonError, + DbsFull, + DbsIllegalOpen, + DbNotFoundError, + DbIsForeignError, + UnsuitableEnvironmentPath(PathBuf), + IoError(io::Error), + BincodeError(BincodeError), +} + +impl BackendError for ErrorImpl {} + +impl fmt::Display for ErrorImpl { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + ErrorImpl::KeyValuePairNotFound => write!(fmt, "KeyValuePairNotFound (safe mode)"), + ErrorImpl::EnvPoisonError => write!(fmt, "EnvPoisonError (safe mode)"), + ErrorImpl::DbsFull => write!(fmt, "DbsFull (safe mode)"), + ErrorImpl::DbsIllegalOpen => write!(fmt, "DbIllegalOpen (safe mode)"), + ErrorImpl::DbNotFoundError => write!(fmt, "DbNotFoundError (safe mode)"), + ErrorImpl::DbIsForeignError => write!(fmt, "DbIsForeignError (safe mode)"), + ErrorImpl::UnsuitableEnvironmentPath(_) => { + write!(fmt, "UnsuitableEnvironmentPath (safe mode)") + } + ErrorImpl::IoError(e) => e.fmt(fmt), + ErrorImpl::BincodeError(e) => e.fmt(fmt), + } + } +} + +impl Into<StoreError> for ErrorImpl { + fn into(self) -> StoreError { + // The `StoreError::KeyValuePairBadSize` error is unused, because this + // backend supports keys and values of arbitrary sizes. + // The `StoreError::MapFull` and `StoreError::ReadersFull` are + // unimplemented yet, but they should be in the future. + match self { + ErrorImpl::KeyValuePairNotFound => StoreError::KeyValuePairNotFound, + ErrorImpl::BincodeError(_) => StoreError::FileInvalid, + ErrorImpl::DbsFull => StoreError::DbsFull, + ErrorImpl::UnsuitableEnvironmentPath(path) => { + StoreError::UnsuitableEnvironmentPath(path) + } + ErrorImpl::IoError(error) => StoreError::IoError(error), + _ => StoreError::SafeModeError(self), + } + } +} + +impl From<io::Error> for ErrorImpl { + fn from(e: io::Error) -> ErrorImpl { + ErrorImpl::IoError(e) + } +} + +impl From<BincodeError> for ErrorImpl { + fn from(e: BincodeError) -> ErrorImpl { + ErrorImpl::BincodeError(e) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/flags.rs b/third_party/rust/rkv/src/backend/impl_safe/flags.rs new file mode 100644 index 0000000000..a0d623614e --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/flags.rs @@ -0,0 +1,124 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use bitflags::bitflags; +use serde_derive::{Deserialize, Serialize}; + +use crate::backend::{ + common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + traits::{BackendDatabaseFlags, BackendEnvironmentFlags, BackendFlags, BackendWriteFlags}, +}; + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct EnvironmentFlagsImpl: u32 { + const NIL = 0b0000_0000; + } +} + +impl BackendFlags for EnvironmentFlagsImpl { + fn empty() -> EnvironmentFlagsImpl { + EnvironmentFlagsImpl::empty() + } +} + +impl BackendEnvironmentFlags for EnvironmentFlagsImpl { + fn set(&mut self, flag: EnvironmentFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<EnvironmentFlagsImpl> for EnvironmentFlags { + fn into(self) -> EnvironmentFlagsImpl { + match self { + EnvironmentFlags::FIXED_MAP => unimplemented!(), + EnvironmentFlags::NO_SUB_DIR => unimplemented!(), + EnvironmentFlags::WRITE_MAP => unimplemented!(), + EnvironmentFlags::READ_ONLY => unimplemented!(), + EnvironmentFlags::NO_META_SYNC => unimplemented!(), + EnvironmentFlags::NO_SYNC => unimplemented!(), + EnvironmentFlags::MAP_ASYNC => unimplemented!(), + EnvironmentFlags::NO_TLS => unimplemented!(), + EnvironmentFlags::NO_LOCK => unimplemented!(), + EnvironmentFlags::NO_READAHEAD => unimplemented!(), + EnvironmentFlags::NO_MEM_INIT => unimplemented!(), + } + } +} + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct DatabaseFlagsImpl: u32 { + const NIL = 0b0000_0000; + #[cfg(feature = "db-dup-sort")] + const DUP_SORT = 0b0000_0001; + #[cfg(feature = "db-int-key")] + const INTEGER_KEY = 0b0000_0010; + } +} + +impl BackendFlags for DatabaseFlagsImpl { + fn empty() -> DatabaseFlagsImpl { + DatabaseFlagsImpl::empty() + } +} + +impl BackendDatabaseFlags for DatabaseFlagsImpl { + fn set(&mut self, flag: DatabaseFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<DatabaseFlagsImpl> for DatabaseFlags { + fn into(self) -> DatabaseFlagsImpl { + match self { + DatabaseFlags::REVERSE_KEY => unimplemented!(), + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_SORT => DatabaseFlagsImpl::DUP_SORT, + #[cfg(feature = "db-dup-sort")] + DatabaseFlags::DUP_FIXED => unimplemented!(), + #[cfg(feature = "db-int-key")] + DatabaseFlags::INTEGER_KEY => DatabaseFlagsImpl::INTEGER_KEY, + DatabaseFlags::INTEGER_DUP => unimplemented!(), + DatabaseFlags::REVERSE_DUP => unimplemented!(), + } + } +} + +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct WriteFlagsImpl: u32 { + const NIL = 0b0000_0000; + } +} + +impl BackendFlags for WriteFlagsImpl { + fn empty() -> WriteFlagsImpl { + WriteFlagsImpl::empty() + } +} + +impl BackendWriteFlags for WriteFlagsImpl { + fn set(&mut self, flag: WriteFlags, value: bool) { + self.set(flag.into(), value) + } +} + +impl Into<WriteFlagsImpl> for WriteFlags { + fn into(self) -> WriteFlagsImpl { + match self { + WriteFlags::NO_OVERWRITE => unimplemented!(), + WriteFlags::NO_DUP_DATA => unimplemented!(), + WriteFlags::CURRENT => unimplemented!(), + WriteFlags::APPEND => unimplemented!(), + WriteFlags::APPEND_DUP => unimplemented!(), + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/info.rs b/third_party/rust/rkv/src/backend/impl_safe/info.rs new file mode 100644 index 0000000000..18f0f51da3 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/info.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendInfo; + +pub struct InfoImpl; + +impl BackendInfo for InfoImpl { + fn map_size(&self) -> usize { + unimplemented!() + } + + fn last_pgno(&self) -> usize { + unimplemented!() + } + + fn last_txnid(&self) -> usize { + unimplemented!() + } + + fn max_readers(&self) -> usize { + unimplemented!() + } + + fn num_readers(&self) -> usize { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/iter.rs b/third_party/rust/rkv/src/backend/impl_safe/iter.rs new file mode 100644 index 0000000000..a784c00a5b --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/iter.rs @@ -0,0 +1,24 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::ErrorImpl; +use crate::backend::traits::BackendIter; + +// FIXME: Use generics instead. +pub struct IterImpl<'i>(pub(crate) Box<dyn Iterator<Item = (&'i [u8], &'i [u8])> + 'i>); + +impl<'i> BackendIter<'i> for IterImpl<'i> { + type Error = ErrorImpl; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>> { + self.0.next().map(Ok) + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs b/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs new file mode 100644 index 0000000000..95f5690d34 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/snapshot.rs @@ -0,0 +1,141 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use serde_derive::{Deserialize, Serialize}; + +use super::DatabaseFlagsImpl; + +type Key = Box<[u8]>; +type Value = Box<[u8]>; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Snapshot { + flags: DatabaseFlagsImpl, + #[cfg(not(feature = "db-dup-sort"))] + map: Arc<BTreeMap<Key, Value>>, + #[cfg(feature = "db-dup-sort")] + map: Arc<BTreeMap<Key, BTreeSet<Value>>>, +} + +impl Snapshot { + pub(crate) fn new(flags: Option<DatabaseFlagsImpl>) -> Snapshot { + Snapshot { + flags: flags.unwrap_or_default(), + map: Default::default(), + } + } + + pub(crate) fn flags(&self) -> &DatabaseFlagsImpl { + &self.flags + } + + pub(crate) fn clear(&mut self) { + self.map = Default::default(); + } +} + +#[cfg(not(feature = "db-dup-sort"))] +impl Snapshot { + pub(crate) fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.map.get(key).map(|value| value.as_ref()) + } + + pub(crate) fn put(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + map.insert(Box::from(key), Box::from(value)); + } + + pub(crate) fn del(&mut self, key: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + map.remove(key).map(|_| ()) + } + + pub(crate) fn iter(&self) -> impl Iterator<Item = (&[u8], &[u8])> { + self.map + .iter() + .map(|(key, value)| (key.as_ref(), value.as_ref())) + } +} + +#[cfg(feature = "db-dup-sort")] +impl Snapshot { + pub(crate) fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.map + .get(key) + .and_then(|v| v.iter().next()) + .map(|v| v.as_ref()) + } + + pub(crate) fn put(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => { + let mut values = BTreeSet::new(); + values.insert(Box::from(value)); + map.insert(Box::from(key), values); + } + Some(values) => { + values.clear(); + values.insert(Box::from(value)); + } + } + } + + pub(crate) fn del(&mut self, key: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => None, + Some(values) => { + let was_empty = values.is_empty(); + values.clear(); + Some(()).filter(|_| !was_empty) + } + } + } + + pub(crate) fn iter(&self) -> impl Iterator<Item = (&[u8], impl Iterator<Item = &[u8]>)> { + self.map + .iter() + .map(|(key, values)| (key.as_ref(), values.iter().map(|value| value.as_ref()))) + } +} + +#[cfg(feature = "db-dup-sort")] +impl Snapshot { + pub(crate) fn put_dup(&mut self, key: &[u8], value: &[u8]) { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => { + let mut values = BTreeSet::new(); + values.insert(Box::from(value)); + map.insert(Box::from(key), values); + } + Some(values) => { + values.insert(Box::from(value)); + } + } + } + + pub(crate) fn del_exact(&mut self, key: &[u8], value: &[u8]) -> Option<()> { + let map = Arc::make_mut(&mut self.map); + match map.get_mut(key) { + None => None, + Some(values) => { + let was_removed = values.remove(value); + Some(()).filter(|_| was_removed) + } + } + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/stat.rs b/third_party/rust/rkv/src/backend/impl_safe/stat.rs new file mode 100644 index 0000000000..c117b56833 --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/stat.rs @@ -0,0 +1,39 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::backend::traits::BackendStat; + +pub struct StatImpl; + +impl BackendStat for StatImpl { + fn page_size(&self) -> usize { + unimplemented!() + } + + fn depth(&self) -> usize { + unimplemented!() + } + + fn branch_pages(&self) -> usize { + unimplemented!() + } + + fn leaf_pages(&self) -> usize { + unimplemented!() + } + + fn overflow_pages(&self) -> usize { + unimplemented!() + } + + fn entries(&self) -> usize { + unimplemented!() + } +} diff --git a/third_party/rust/rkv/src/backend/impl_safe/transaction.rs b/third_party/rust/rkv/src/backend/impl_safe/transaction.rs new file mode 100644 index 0000000000..d37352a42d --- /dev/null +++ b/third_party/rust/rkv/src/backend/impl_safe/transaction.rs @@ -0,0 +1,208 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(dead_code)] // TODO: Get rid of unused struct members + +use std::{collections::HashMap, sync::Arc}; + +use super::{ + snapshot::Snapshot, DatabaseImpl, EnvironmentImpl, ErrorImpl, RoCursorImpl, WriteFlagsImpl, +}; +use crate::backend::traits::{ + BackendRoCursorTransaction, BackendRoTransaction, BackendRwCursorTransaction, + BackendRwTransaction, +}; + +#[derive(Debug)] +pub struct RoTransactionImpl<'t> { + env: &'t EnvironmentImpl, + snapshots: HashMap<DatabaseImpl, Snapshot>, + idx: Arc<()>, +} + +impl<'t> RoTransactionImpl<'t> { + pub(crate) fn new( + env: &'t EnvironmentImpl, + idx: Arc<()>, + ) -> Result<RoTransactionImpl<'t>, ErrorImpl> { + let snapshots = env + .dbs()? + .arena + .iter() + .map(|(id, db)| (DatabaseImpl(id), db.snapshot())) + .collect(); + Ok(RoTransactionImpl { + env, + snapshots, + idx, + }) + } +} + +impl<'t> BackendRoTransaction for RoTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.get(key).ok_or(ErrorImpl::KeyValuePairNotFound) + } + + fn abort(self) { + // noop + } +} + +impl<'t> BackendRoCursorTransaction<'t> for RoTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + Ok(RoCursorImpl(snapshot)) + } +} + +#[derive(Debug)] +pub struct RwTransactionImpl<'t> { + env: &'t EnvironmentImpl, + snapshots: HashMap<DatabaseImpl, Snapshot>, + idx: Arc<()>, +} + +impl<'t> RwTransactionImpl<'t> { + pub(crate) fn new( + env: &'t EnvironmentImpl, + idx: Arc<()>, + ) -> Result<RwTransactionImpl<'t>, ErrorImpl> { + let snapshots = env + .dbs()? + .arena + .iter() + .map(|(id, db)| (DatabaseImpl(id), db.snapshot())) + .collect(); + Ok(RwTransactionImpl { + env, + snapshots, + idx, + }) + } +} + +impl<'t> BackendRwTransaction for RwTransactionImpl<'t> { + type Database = DatabaseImpl; + type Error = ErrorImpl; + type Flags = WriteFlagsImpl; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.get(key).ok_or(ErrorImpl::KeyValuePairNotFound) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + _flags: Self::Flags, + ) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or_else(|| ErrorImpl::DbIsForeignError)?; + snapshot.put(key, value); + Ok(()) + } + + #[cfg(feature = "db-dup-sort")] + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + _flags: Self::Flags, + ) -> Result<(), Self::Error> { + use super::DatabaseFlagsImpl; + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + if snapshot.flags().contains(DatabaseFlagsImpl::DUP_SORT) { + snapshot.put_dup(key, value); + } else { + snapshot.put(key, value); + } + Ok(()) + } + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or_else(|| ErrorImpl::DbIsForeignError)?; + let deleted = snapshot.del(key); + Ok(deleted.ok_or_else(|| ErrorImpl::KeyValuePairNotFound)?) + } + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error> { + use super::DatabaseFlagsImpl; + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + let deleted = match (value, snapshot.flags()) { + (Some(value), flags) if flags.contains(DatabaseFlagsImpl::DUP_SORT) => { + snapshot.del_exact(key, value) + } + _ => snapshot.del(key), + }; + deleted.ok_or(ErrorImpl::KeyValuePairNotFound) + } + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error> { + let snapshot = self + .snapshots + .get_mut(db) + .ok_or(ErrorImpl::DbIsForeignError)?; + snapshot.clear(); + Ok(()) + } + + fn commit(self) -> Result<(), Self::Error> { + let mut dbs = self.env.dbs_mut()?; + + for (id, snapshot) in self.snapshots { + let db = dbs.arena.get_mut(id.0).ok_or(ErrorImpl::DbIsForeignError)?; + db.replace(snapshot); + } + + drop(dbs); + self.env.write_to_disk() + } + + fn abort(self) { + // noop + } +} + +impl<'t> BackendRwCursorTransaction<'t> for RwTransactionImpl<'t> { + type RoCursor = RoCursorImpl<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> { + let snapshot = self.snapshots.get(db).ok_or(ErrorImpl::DbIsForeignError)?; + Ok(RoCursorImpl(snapshot)) + } +} diff --git a/third_party/rust/rkv/src/backend/traits.rs b/third_party/rust/rkv/src/backend/traits.rs new file mode 100644 index 0000000000..1dda15a85c --- /dev/null +++ b/third_party/rust/rkv/src/backend/traits.rs @@ -0,0 +1,202 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + fmt::{Debug, Display}, + path::{Path, PathBuf}, +}; + +use crate::{ + backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + error::StoreError, +}; + +pub trait BackendError: Debug + Display + Into<StoreError> {} + +pub trait BackendDatabase: Debug + Eq + PartialEq + Copy + Clone {} + +pub trait BackendFlags: Debug + Eq + PartialEq + Copy + Clone + Default { + fn empty() -> Self; +} + +pub trait BackendEnvironmentFlags: BackendFlags { + fn set(&mut self, flag: EnvironmentFlags, value: bool); +} + +pub trait BackendDatabaseFlags: BackendFlags { + fn set(&mut self, flag: DatabaseFlags, value: bool); +} + +pub trait BackendWriteFlags: BackendFlags { + fn set(&mut self, flag: WriteFlags, value: bool); +} + +pub trait BackendStat { + fn page_size(&self) -> usize; + + fn depth(&self) -> usize; + + fn branch_pages(&self) -> usize; + + fn leaf_pages(&self) -> usize; + + fn overflow_pages(&self) -> usize; + + fn entries(&self) -> usize; +} + +pub trait BackendInfo { + fn map_size(&self) -> usize; + + fn last_pgno(&self) -> usize; + + fn last_txnid(&self) -> usize; + + fn max_readers(&self) -> usize; + + fn num_readers(&self) -> usize; +} + +pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone { + type Error: BackendError; + type Environment: BackendEnvironment<'b>; + type Flags: BackendEnvironmentFlags; + + fn new() -> Self; + + fn set_flags<T>(&mut self, flags: T) -> &mut Self + where + T: Into<Self::Flags>; + + fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self; + + fn set_max_readers(&mut self, max_readers: u32) -> &mut Self; + + fn set_map_size(&mut self, size: usize) -> &mut Self; + + fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self; + + fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self; + + fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>; +} + +pub trait BackendEnvironment<'e>: Debug { + type Error: BackendError; + type Database: BackendDatabase; + type Flags: BackendDatabaseFlags; + type Stat: BackendStat; + type Info: BackendInfo; + type RoTransaction: BackendRoCursorTransaction<'e, Database = Self::Database>; + type RwTransaction: BackendRwCursorTransaction<'e, Database = Self::Database>; + + fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error>; + + fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error>; + + fn create_db( + &self, + name: Option<&str>, + flags: Self::Flags, + ) -> Result<Self::Database, Self::Error>; + + fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error>; + + fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error>; + + fn sync(&self, force: bool) -> Result<(), Self::Error>; + + fn stat(&self) -> Result<Self::Stat, Self::Error>; + + fn info(&self) -> Result<Self::Info, Self::Error>; + + fn freelist(&self) -> Result<usize, Self::Error>; + + fn load_ratio(&self) -> Result<Option<f32>, Self::Error>; + + fn set_map_size(&self, size: usize) -> Result<(), Self::Error>; + + fn get_files_on_disk(&self) -> Vec<PathBuf>; +} + +pub trait BackendRoTransaction: Debug { + type Error: BackendError; + type Database: BackendDatabase; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>; + + fn abort(self); +} + +pub trait BackendRwTransaction: Debug { + type Error: BackendError; + type Database: BackendDatabase; + type Flags: BackendWriteFlags; + + fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>; + + fn put( + &mut self, + db: &Self::Database, + key: &[u8], + value: &[u8], + flags: Self::Flags, + ) -> Result<(), Self::Error>; + + #[cfg(not(feature = "db-dup-sort"))] + fn del(&mut self, db: &Self::Database, key: &[u8]) -> Result<(), Self::Error>; + + #[cfg(feature = "db-dup-sort")] + fn del( + &mut self, + db: &Self::Database, + key: &[u8], + value: Option<&[u8]>, + ) -> Result<(), Self::Error>; + + fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error>; + + fn commit(self) -> Result<(), Self::Error>; + + fn abort(self); +} + +pub trait BackendRoCursorTransaction<'t>: BackendRoTransaction { + type RoCursor: BackendRoCursor<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error>; +} + +pub trait BackendRwCursorTransaction<'t>: BackendRwTransaction { + type RoCursor: BackendRoCursor<'t>; + + fn open_ro_cursor(&'t self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error>; +} + +pub trait BackendRoCursor<'c>: Debug { + type Iter: BackendIter<'c>; + + fn into_iter(self) -> Self::Iter; + + fn into_iter_from<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c; + + fn into_iter_dup_of<K>(self, key: K) -> Self::Iter + where + K: AsRef<[u8]> + 'c; +} + +pub trait BackendIter<'i> { + type Error: BackendError; + + #[allow(clippy::type_complexity)] + fn next(&mut self) -> Option<Result<(&'i [u8], &'i [u8]), Self::Error>>; +} diff --git a/third_party/rust/rkv/src/bin/dump.rs b/third_party/rust/rkv/src/bin/dump.rs new file mode 100644 index 0000000000..c0e12b986f --- /dev/null +++ b/third_party/rust/rkv/src/bin/dump.rs @@ -0,0 +1,47 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{env::args, io, path::Path}; + +use rkv::migrator::{LmdbArchMigrateError, LmdbArchMigrator}; + +fn main() -> Result<(), LmdbArchMigrateError> { + let mut cli_args = args(); + let mut db_name = None; + let mut env_path = None; + + // The first arg is the name of the program, which we can ignore. + cli_args.next(); + + while let Some(arg) = cli_args.next() { + if &arg[0..1] == "-" { + match &arg[1..] { + "s" => { + db_name = match cli_args.next() { + None => return Err("-s must be followed by database name".into()), + Some(str) => Some(str), + }; + } + str => return Err(format!("arg -{str} not recognized").into()), + } + } else { + if env_path.is_some() { + return Err("must provide only one path to the LMDB environment".into()); + } + env_path = Some(arg); + } + } + + let env_path = env_path.ok_or("must provide a path to the LMDB environment")?; + let mut migrator = LmdbArchMigrator::new(Path::new(&env_path))?; + migrator.dump(db_name.as_deref(), io::stdout()).unwrap(); + + Ok(()) +} diff --git a/third_party/rust/rkv/src/bin/rand.rs b/third_party/rust/rkv/src/bin/rand.rs new file mode 100644 index 0000000000..56d578c22e --- /dev/null +++ b/third_party/rust/rkv/src/bin/rand.rs @@ -0,0 +1,106 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +//! A command-line utility to create an LMDB environment containing random data. +//! It requires one flag, `-s path/to/environment`, which specifies the location +//! where the tool should create the environment. Optionally, you may specify +//! the number of key/value pairs to create via the `-n <number>` flag +//! (for which the default value is 50). + +use std::{env::args, fs, fs::File, io::Read, path::Path}; + +use rkv::{ + backend::{BackendEnvironmentBuilder, Lmdb}, + Rkv, StoreOptions, Value, +}; + +fn main() { + let mut args = args(); + let mut database = None; + let mut path = None; + let mut num_pairs = 50; + + // The first arg is the name of the program, which we can ignore. + args.next(); + + while let Some(arg) = args.next() { + if &arg[0..1] == "-" { + match &arg[1..] { + "s" => { + database = match args.next() { + None => panic!("-s must be followed by database arg"), + Some(str) => Some(str), + }; + } + "n" => { + num_pairs = match args.next() { + None => panic!("-s must be followed by number of pairs"), + Some(str) => str.parse().expect("number"), + }; + } + str => panic!("arg -{} not recognized", str), + } + } else { + if path.is_some() { + panic!("must provide only one path to the LMDB environment"); + } + path = Some(arg); + } + } + + if path.is_none() { + panic!("must provide a path to the LMDB environment"); + } + + let path = path.unwrap(); + fs::create_dir_all(&path).expect("dir created"); + + let mut builder = Rkv::environment_builder::<Lmdb>(); + builder.set_max_dbs(2); + // Allocate enough map to accommodate the largest random collection. + // We currently do this by allocating twice the maximum possible size + // of the pairs (assuming maximum key and value sizes). + builder.set_map_size((511 + 65535) * num_pairs * 2); + let rkv = Rkv::from_builder(Path::new(&path), builder).expect("Rkv"); + let store = rkv + .open_single(database.as_deref(), StoreOptions::create()) + .expect("opened"); + let mut writer = rkv.write().expect("writer"); + + // Generate random values for the number of keys and key/value lengths. + // On Linux, "Just use /dev/urandom!" <https://www.2uo.de/myths-about-urandom/>. + // On macOS it doesn't matter (/dev/random and /dev/urandom are identical). + let mut random = File::open("/dev/urandom").unwrap(); + let mut nums = [0u8; 4]; + random.read_exact(&mut nums).unwrap(); + + // Generate 0–255 pairs. + for _ in 0..num_pairs { + // Generate key and value lengths. The key must be 1–511 bytes long. + // The value length can be 0 and is essentially unbounded; we generate + // value lengths of 0–0xffff (65535). + // NB: the modulus method for generating a random number within a range + // introduces distribution skew, but we don't need it to be perfect. + let key_len = ((u16::from(nums[0]) + (u16::from(nums[1]) << 8)) % 511 + 1) as usize; + let value_len = (u16::from(nums[2]) + (u16::from(nums[3]) << 8)) as usize; + + let mut key: Vec<u8> = vec![0; key_len]; + random.read_exact(&mut key[0..key_len]).unwrap(); + + let mut value: Vec<u8> = vec![0; value_len]; + random.read_exact(&mut value[0..value_len]).unwrap(); + + store + .put(&mut writer, key, &Value::Blob(&value)) + .expect("wrote"); + } + + writer.commit().expect("committed"); +} diff --git a/third_party/rust/rkv/src/env.rs b/third_party/rust/rkv/src/env.rs new file mode 100644 index 0000000000..32e4fe5035 --- /dev/null +++ b/third_party/rust/rkv/src/env.rs @@ -0,0 +1,320 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + fs, + os::raw::c_uint, + path::{Path, PathBuf}, +}; + +#[cfg(any(feature = "db-dup-sort", feature = "db-int-key"))] +use crate::backend::{BackendDatabaseFlags, DatabaseFlags}; +use crate::{ + backend::{ + BackendEnvironment, BackendEnvironmentBuilder, BackendRoCursorTransaction, + BackendRwCursorTransaction, SafeModeError, + }, + error::{CloseError, StoreError}, + readwrite::{Reader, Writer}, + store::{single::SingleStore, CloseOptions, Options as StoreOptions}, +}; + +#[cfg(feature = "db-dup-sort")] +use crate::store::multi::MultiStore; + +#[cfg(feature = "db-int-key")] +use crate::store::integer::IntegerStore; +#[cfg(feature = "db-int-key")] +use crate::store::keys::PrimitiveInt; + +#[cfg(all(feature = "db-dup-sort", feature = "db-int-key"))] +use crate::store::integermulti::MultiIntegerStore; + +pub static DEFAULT_MAX_DBS: c_uint = 5; + +/// Wrapper around an `Environment` (e.g. such as an `LMDB` or `SafeMode` environment). +#[derive(Debug)] +pub struct Rkv<E> { + _path: PathBuf, + env: E, +} + +/// Static methods. +impl<'e, E> Rkv<E> +where + E: BackendEnvironment<'e>, +{ + pub fn environment_builder<B>() -> B + where + B: BackendEnvironmentBuilder<'e, Environment = E>, + { + B::new() + } + + /// Return a new Rkv environment that supports up to `DEFAULT_MAX_DBS` open databases. + #[allow(clippy::new_ret_no_self)] + pub fn new<B>(path: &Path) -> Result<Rkv<E>, StoreError> + where + B: BackendEnvironmentBuilder<'e, Environment = E>, + { + Rkv::with_capacity::<B>(path, DEFAULT_MAX_DBS) + } + + /// Return a new Rkv environment that supports the specified number of open databases. + pub fn with_capacity<B>(path: &Path, max_dbs: c_uint) -> Result<Rkv<E>, StoreError> + where + B: BackendEnvironmentBuilder<'e, Environment = E>, + { + let mut builder = B::new(); + builder.set_max_dbs(max_dbs); + + // Future: set flags, maximum size, etc. here if necessary. + Rkv::from_builder(path, builder) + } + + /// Return a new Rkv environment from the provided builder. + pub fn from_builder<B>(path: &Path, builder: B) -> Result<Rkv<E>, StoreError> + where + B: BackendEnvironmentBuilder<'e, Environment = E>, + { + Ok(Rkv { + _path: path.into(), + env: builder.open(path).map_err(|e| e.into())?, + }) + } +} + +/// Store creation methods. +impl<'e, E> Rkv<E> +where + E: BackendEnvironment<'e>, +{ + /// Return all created databases. + pub fn get_dbs(&self) -> Result<Vec<Option<String>>, StoreError> { + self.env.get_dbs().map_err(|e| e.into()) + } + + /// Create or Open an existing database in (&[u8] -> Single Value) mode. + /// Note: that create=true cannot be called concurrently with other operations so if + /// you are sure that the database exists, call this with create=false. + pub fn open_single<'s, T>( + &self, + name: T, + opts: StoreOptions<E::Flags>, + ) -> Result<SingleStore<E::Database>, StoreError> + where + T: Into<Option<&'s str>>, + { + self.open(name, opts).map(SingleStore::new) + } + + /// Create or Open an existing database in (Integer -> Single Value) mode. + /// Note: that create=true cannot be called concurrently with other operations so if + /// you are sure that the database exists, call this with create=false. + #[cfg(feature = "db-int-key")] + pub fn open_integer<'s, T, K>( + &self, + name: T, + mut opts: StoreOptions<E::Flags>, + ) -> Result<IntegerStore<E::Database, K>, StoreError> + where + K: PrimitiveInt, + T: Into<Option<&'s str>>, + { + opts.flags.set(DatabaseFlags::INTEGER_KEY, true); + self.open(name, opts).map(IntegerStore::new) + } + + /// Create or Open an existing database in (&[u8] -> Multiple Values) mode. + /// Note: that create=true cannot be called concurrently with other operations so if + /// you are sure that the database exists, call this with create=false. + #[cfg(feature = "db-dup-sort")] + pub fn open_multi<'s, T>( + &self, + name: T, + mut opts: StoreOptions<E::Flags>, + ) -> Result<MultiStore<E::Database>, StoreError> + where + T: Into<Option<&'s str>>, + { + opts.flags.set(DatabaseFlags::DUP_SORT, true); + self.open(name, opts).map(MultiStore::new) + } + + /// Create or Open an existing database in (Integer -> Multiple Values) mode. + /// Note: that create=true cannot be called concurrently with other operations so if + /// you are sure that the database exists, call this with create=false. + #[cfg(all(feature = "db-dup-sort", feature = "db-int-key"))] + pub fn open_multi_integer<'s, T, K>( + &self, + name: T, + mut opts: StoreOptions<E::Flags>, + ) -> Result<MultiIntegerStore<E::Database, K>, StoreError> + where + K: PrimitiveInt, + T: Into<Option<&'s str>>, + { + opts.flags.set(DatabaseFlags::INTEGER_KEY, true); + opts.flags.set(DatabaseFlags::DUP_SORT, true); + self.open(name, opts).map(MultiIntegerStore::new) + } + + fn open<'s, T>(&self, name: T, opts: StoreOptions<E::Flags>) -> Result<E::Database, StoreError> + where + T: Into<Option<&'s str>>, + { + if opts.create { + self.env + .create_db(name.into(), opts.flags) + .map_err(|e| match e.into() { + #[cfg(feature = "lmdb")] + StoreError::LmdbError(lmdb::Error::BadRslot) => { + StoreError::open_during_transaction() + } + StoreError::SafeModeError(SafeModeError::DbsIllegalOpen) => { + StoreError::open_during_transaction() + } + e => e, + }) + } else { + self.env.open_db(name.into()).map_err(|e| match e.into() { + #[cfg(feature = "lmdb")] + StoreError::LmdbError(lmdb::Error::BadRslot) => { + StoreError::open_during_transaction() + } + StoreError::SafeModeError(SafeModeError::DbsIllegalOpen) => { + StoreError::open_during_transaction() + } + e => e, + }) + } + } +} + +/// Read and write accessors. +impl<'e, E> Rkv<E> +where + E: BackendEnvironment<'e>, +{ + /// Create a read transaction. There can be multiple concurrent readers for an + /// environment, up to the maximum specified by LMDB (default 126), and you can open + /// readers while a write transaction is active. + pub fn read<T>(&'e self) -> Result<Reader<T>, StoreError> + where + E: BackendEnvironment<'e, RoTransaction = T>, + T: BackendRoCursorTransaction<'e, Database = E::Database>, + { + Ok(Reader::new(self.env.begin_ro_txn().map_err(|e| e.into())?)) + } + + /// Create a write transaction. There can be only one write transaction active at any + /// given time, so trying to create a second one will block until the first is + /// committed or aborted. + pub fn write<T>(&'e self) -> Result<Writer<T>, StoreError> + where + E: BackendEnvironment<'e, RwTransaction = T>, + T: BackendRwCursorTransaction<'e, Database = E::Database>, + { + Ok(Writer::new(self.env.begin_rw_txn().map_err(|e| e.into())?)) + } +} + +/// Other environment methods. +impl<'e, E> Rkv<E> +where + E: BackendEnvironment<'e>, +{ + /// Flush the data buffers to disk. This call is only useful, when the environment was + /// open with either `NO_SYNC`, `NO_META_SYNC` or `MAP_ASYNC` (see below). The call is + /// not valid if the environment was opened with `READ_ONLY`. + /// + /// Data is always written to disk when `transaction.commit()` is called, but the + /// operating system may keep it buffered. LMDB always flushes the OS buffers upon + /// commit as well, unless the environment was opened with `NO_SYNC` or in part + /// `NO_META_SYNC`. + /// + /// `force`: if true, force a synchronous flush. Otherwise if the environment has the + /// `NO_SYNC` flag set the flushes will be omitted, and with `MAP_ASYNC` they will + /// be asynchronous. + pub fn sync(&self, force: bool) -> Result<(), StoreError> { + self.env.sync(force).map_err(|e| e.into()) + } + + /// Retrieve statistics about this environment. + /// + /// It includes: + /// * Page size in bytes + /// * B-tree depth + /// * Number of internal (non-leaf) pages + /// * Number of leaf pages + /// * Number of overflow pages + /// * Number of data entries + pub fn stat(&self) -> Result<E::Stat, StoreError> { + self.env.stat().map_err(|e| e.into()) + } + + /// Retrieve information about this environment. + /// + /// It includes: + /// * Map size in bytes + /// * The last used page number + /// * The last transaction ID + /// * Max number of readers allowed + /// * Number of readers in use + pub fn info(&self) -> Result<E::Info, StoreError> { + self.env.info().map_err(|e| e.into()) + } + + /// Retrieve the load ratio (# of used pages / total pages) about this environment. + /// + /// With the formular: (last_page_no - freelist_pages) / total_pages. + /// A value of `None` means that the backend doesn't ever need to be resized. + pub fn load_ratio(&self) -> Result<Option<f32>, StoreError> { + self.env.load_ratio().map_err(|e| e.into()) + } + + /// Sets the size of the memory map to use for the environment. + /// + /// This can be used to resize the map when the environment is already open. You can + /// also use `Rkv::environment_builder()` to set the map size during the `Rkv` + /// initialization. + /// + /// Note: + /// + /// * No active transactions allowed when performing resizing in this process. It's up + /// to the consumer to enforce that. + /// + /// * The size should be a multiple of the OS page size. Any attempt to set a size + /// smaller than the space already consumed by the environment will be silently + /// changed to the current size of the used space. + /// + /// * In the multi-process case, once a process resizes the map, other processes need + /// to either re-open the environment, or call set_map_size with size 0 to update + /// the environment. Otherwise, new transaction creation will fail with + /// `LmdbError::MapResized`. + pub fn set_map_size(&self, size: usize) -> Result<(), StoreError> { + self.env.set_map_size(size).map_err(Into::into) + } + + /// Closes this environment and optionally deletes all its files from disk. Doesn't + /// delete the folder used when opening the environment. + pub fn close(self, options: CloseOptions) -> Result<(), CloseError> { + let files = self.env.get_files_on_disk(); + drop(self); + + if options.delete { + for file in files { + fs::remove_file(file)?; + } + } + + Ok(()) + } +} diff --git a/third_party/rust/rkv/src/error.rs b/third_party/rust/rkv/src/error.rs new file mode 100644 index 0000000000..f50dda73e4 --- /dev/null +++ b/third_party/rust/rkv/src/error.rs @@ -0,0 +1,150 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{io, path::PathBuf, sync, thread, thread::ThreadId}; + +use thiserror::Error; + +pub use crate::backend::SafeModeError; +use crate::value::Type; + +#[derive(Debug, Error)] +pub enum DataError { + #[error("unknown type tag: {0}")] + UnknownType(u8), + + #[error("unexpected type tag: expected {expected}, got {actual}")] + UnexpectedType { expected: Type, actual: Type }, + + #[error("empty data; expected tag")] + Empty, + + #[error("invalid value for type {value_type}: {err}")] + DecodingError { + value_type: Type, + err: Box<bincode::ErrorKind>, + }, + + #[error("couldn't encode value: {0}")] + EncodingError(#[from] Box<bincode::ErrorKind>), + + #[error("invalid uuid bytes")] + InvalidUuid, +} + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("manager poisoned")] + ManagerPoisonError, + + #[error("database corrupted")] + DatabaseCorrupted, + + #[error("key/value pair not found")] + KeyValuePairNotFound, + + #[error("unsupported size of key/DB name/data")] + KeyValuePairBadSize, + + #[error("file is not a valid database")] + FileInvalid, + + #[error("environment mapsize reached")] + MapFull, + + #[error("environment maxdbs reached")] + DbsFull, + + #[error("environment maxreaders reached")] + ReadersFull, + + #[error("I/O error: {0:?}")] + IoError(#[from] io::Error), + + #[error("environment path does not exist or not the right type: {0:?}")] + UnsuitableEnvironmentPath(PathBuf), + + #[error("data error: {0:?}")] + DataError(#[from] DataError), + + #[cfg(feature = "lmdb")] + #[error("lmdb backend error: {0}")] + LmdbError(lmdb::Error), + + #[error("safe mode backend error: {0}")] + SafeModeError(SafeModeError), + + #[error("read transaction already exists in thread {0:?}")] + ReadTransactionAlreadyExists(ThreadId), + + #[error("attempted to open DB during transaction in thread {0:?}")] + OpenAttemptedDuringTransaction(ThreadId), +} + +impl StoreError { + pub fn open_during_transaction() -> StoreError { + StoreError::OpenAttemptedDuringTransaction(thread::current().id()) + } + + pub fn read_transaction_already_exists() -> StoreError { + StoreError::ReadTransactionAlreadyExists(thread::current().id()) + } +} + +impl<T> From<sync::PoisonError<T>> for StoreError { + fn from(_: sync::PoisonError<T>) -> StoreError { + StoreError::ManagerPoisonError + } +} + +#[derive(Debug, Error)] +pub enum CloseError { + #[error("manager poisoned")] + ManagerPoisonError, + + #[error("close attempted while manager has an environment still open")] + EnvironmentStillOpen, + + #[error("close attempted while an environment not known to the manager is still open")] + UnknownEnvironmentStillOpen, + + #[error("I/O error: {0:?}")] + IoError(#[from] io::Error), +} + +impl<T> From<sync::PoisonError<T>> for CloseError { + fn from(_: sync::PoisonError<T>) -> CloseError { + CloseError::ManagerPoisonError + } +} + +#[derive(Debug, Error)] +pub enum MigrateError { + #[error("store error: {0}")] + StoreError(#[from] StoreError), + + #[error("close error: {0}")] + CloseError(#[from] CloseError), + + #[error("manager poisoned")] + ManagerPoisonError, + + #[error("source is empty")] + SourceEmpty, + + #[error("destination is not empty")] + DestinationNotEmpty, +} + +impl<T> From<sync::PoisonError<T>> for MigrateError { + fn from(_: sync::PoisonError<T>) -> MigrateError { + MigrateError::ManagerPoisonError + } +} diff --git a/third_party/rust/rkv/src/helpers.rs b/third_party/rust/rkv/src/helpers.rs new file mode 100644 index 0000000000..89dcdf8dd7 --- /dev/null +++ b/third_party/rust/rkv/src/helpers.rs @@ -0,0 +1,43 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + io, + path::{Path, PathBuf}, +}; + +use url::Url; + +use crate::{error::StoreError, value::Value}; + +pub(crate) fn read_transform(value: Result<&[u8], StoreError>) -> Result<Value, StoreError> { + match value { + Ok(bytes) => Value::from_tagged_slice(bytes).map_err(StoreError::DataError), + Err(e) => Err(e), + } +} + +// Workaround the UNC path on Windows, see https://github.com/rust-lang/rust/issues/42869. +// Otherwise, `Env::from_builder()` will panic with error_no(123). +pub(crate) fn canonicalize_path<'p, P>(path: P) -> io::Result<PathBuf> +where + P: Into<&'p Path>, +{ + let canonical = path.into().canonicalize()?; + + Ok(if cfg!(target_os = "windows") { + let map_err = |_| io::Error::new(io::ErrorKind::Other, "path canonicalization error"); + Url::from_file_path(&canonical) + .and_then(|url| url.to_file_path()) + .map_err(map_err)? + } else { + canonical + }) +} diff --git a/third_party/rust/rkv/src/lib.rs b/third_party/rust/rkv/src/lib.rs new file mode 100644 index 0000000000..0c8951e191 --- /dev/null +++ b/third_party/rust/rkv/src/lib.rs @@ -0,0 +1,236 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +#![allow(clippy::from_over_into)] // TODO: `Into` implementations in [safe/lmdb]/flags.rs + +//! A simple, humane, typed key-value storage solution. It supports multiple backend +//! engines with varying guarantees, such as [LMDB](http://www.lmdb.tech/doc/) for +//! performance, or "SafeMode" for reliability. +//! +//! It aims to achieve the following: +//! +//! - Avoid sharp edges (e.g., obscure error codes for common situations). +//! - Correctly restrict access to one handle per process via a +//! [Manager](struct.Manager.html). +//! - Use Rust's type system to make single-typed key stores safe and ergonomic. +//! - Encode and decode values via [bincode](https://docs.rs/bincode/)/[serde](https://docs.rs/serde/) +//! and type tags, achieving platform-independent storage and input/output flexibility. +//! +//! It exposes these primary abstractions: +//! +//! - [Manager](struct.Manager.html): a singleton that controls access to environments +//! - [Rkv](struct.Rkv.html): an environment contains a set of key/value databases +//! - [SingleStore](store/single/struct.SingleStore.html): a database contains a set of +//! key/value pairs +//! +//! Keys can be anything that implements `AsRef<[u8]>` or integers +//! (when accessing an [IntegerStore](store/integer/struct.IntegerStore.html)). +//! +//! Values can be any of the types defined by the [Value](value/enum.Value.html) enum, +//! including: +//! +//! - booleans (`Value::Bool`) +//! - integers (`Value::I64`, `Value::U64`) +//! - floats (`Value::F64`) +//! - strings (`Value::Str`) +//! - blobs (`Value::Blob`) +//! +//! See [Value](value/enum.Value.html) for the complete list of supported types. +//! +//! ## Basic Usage +//! ``` +//! use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; +//! use rkv::backend::{SafeMode, SafeModeEnvironment}; +//! use std::fs; +//! use tempfile::Builder; +//! +//! // First determine the path to the environment, which is represented on disk as a +//! // directory containing two files: +//! // +//! // * a data file containing the key/value stores +//! // * a lock file containing metadata about current transactions +//! // +//! // In this example, we use the `tempfile` crate to create the directory. +//! // +//! let root = Builder::new().prefix("simple-db").tempdir().unwrap(); +//! fs::create_dir_all(root.path()).unwrap(); +//! let path = root.path(); +//! +//! // The `Manager` enforces that each process opens the same environment at most once by +//! // caching a handle to each environment that it opens. Use it to retrieve the handle +//! // to an opened environment—or create one if it hasn't already been opened: +//! let mut manager = Manager::<SafeModeEnvironment>::singleton().write().unwrap(); +//! let created_arc = manager.get_or_create(path, Rkv::new::<SafeMode>).unwrap(); +//! let env = created_arc.read().unwrap(); +//! +//! // Then you can use the environment handle to get a handle to a datastore: +//! let store = env.open_single("mydb", StoreOptions::create()).unwrap(); +//! +//! { +//! // Use a write transaction to mutate the store via a `Writer`. There can be only +//! // one writer for a given environment, so opening a second one will block until +//! // the first completes. +//! let mut writer = env.write().unwrap(); +//! +//! // Keys are `AsRef<[u8]>`, while values are `Value` enum instances. Use the `Blob` +//! // variant to store arbitrary collections of bytes. Putting data returns a +//! // `Result<(), StoreError>`, where StoreError is an enum identifying the reason +//! // for a failure. +//! store.put(&mut writer, "int", &Value::I64(1234)).unwrap(); +//! store.put(&mut writer, "uint", &Value::U64(1234_u64)).unwrap(); +//! store.put(&mut writer, "float", &Value::F64(1234.0.into())).unwrap(); +//! store.put(&mut writer, "instant", &Value::Instant(1528318073700)).unwrap(); +//! store.put(&mut writer, "boolean", &Value::Bool(true)).unwrap(); +//! store.put(&mut writer, "string", &Value::Str("Héllo, wörld!")).unwrap(); +//! store.put(&mut writer, "json", &Value::Json(r#"{"foo":"bar", "number": 1}"#)).unwrap(); +//! store.put(&mut writer, "blob", &Value::Blob(b"blob")).unwrap(); +//! +//! // You must commit a write transaction before the writer goes out of scope, or the +//! // transaction will abort and the data won't persist. +//! writer.commit().unwrap(); +//! } +//! +//! { +//! // Use a read transaction to query the store via a `Reader`. There can be multiple +//! // concurrent readers for a store, and readers never block on a writer nor other +//! // readers. +//! let reader = env.read().expect("reader"); +//! +//! // Keys are `AsRef<u8>`, and the return value is `Result<Option<Value>, StoreError>`. +//! println!("Get int {:?}", store.get(&reader, "int").unwrap()); +//! println!("Get uint {:?}", store.get(&reader, "uint").unwrap()); +//! println!("Get float {:?}", store.get(&reader, "float").unwrap()); +//! println!("Get instant {:?}", store.get(&reader, "instant").unwrap()); +//! println!("Get boolean {:?}", store.get(&reader, "boolean").unwrap()); +//! println!("Get string {:?}", store.get(&reader, "string").unwrap()); +//! println!("Get json {:?}", store.get(&reader, "json").unwrap()); +//! println!("Get blob {:?}", store.get(&reader, "blob").unwrap()); +//! +//! // Retrieving a non-existent value returns `Ok(None)`. +//! println!("Get non-existent value {:?}", store.get(&reader, "non-existent").unwrap()); +//! +//! // A read transaction will automatically close once the reader goes out of scope, +//! // so isn't necessary to close it explicitly, although you can do so by calling +//! // `Reader.abort()`. +//! } +//! +//! { +//! // Aborting a write transaction rolls back the change(s). +//! let mut writer = env.write().unwrap(); +//! store.put(&mut writer, "foo", &Value::Str("bar")).unwrap(); +//! writer.abort(); +//! let reader = env.read().expect("reader"); +//! println!("It should be None! ({:?})", store.get(&reader, "foo").unwrap()); +//! } +//! +//! { +//! // Explicitly aborting a transaction is not required unless an early abort is +//! // desired, since both read and write transactions will implicitly be aborted once +//! // they go out of scope. +//! { +//! let mut writer = env.write().unwrap(); +//! store.put(&mut writer, "foo", &Value::Str("bar")).unwrap(); +//! } +//! let reader = env.read().expect("reader"); +//! println!("It should be None! ({:?})", store.get(&reader, "foo").unwrap()); +//! } +//! +//! { +//! // Deleting a key/value pair also requires a write transaction. +//! let mut writer = env.write().unwrap(); +//! store.put(&mut writer, "foo", &Value::Str("bar")).unwrap(); +//! store.put(&mut writer, "bar", &Value::Str("baz")).unwrap(); +//! store.delete(&mut writer, "foo").unwrap(); +//! +//! // A write transaction also supports reading, and the version of the store that it +//! // reads includes the changes it has made regardless of the commit state of that +//! // transaction. + +//! // In the code above, "foo" and "bar" were put into the store, then "foo" was +//! // deleted so only "bar" will return a result when the database is queried via the +//! // writer. +//! println!("It should be None! ({:?})", store.get(&writer, "foo").unwrap()); +//! println!("Get bar ({:?})", store.get(&writer, "bar").unwrap()); +//! +//! // But a reader won't see that change until the write transaction is committed. +//! { +//! let reader = env.read().expect("reader"); +//! println!("Get foo {:?}", store.get(&reader, "foo").unwrap()); +//! println!("Get bar {:?}", store.get(&reader, "bar").unwrap()); +//! } +//! writer.commit().unwrap(); +//! { +//! let reader = env.read().expect("reader"); +//! println!("It should be None! ({:?})", store.get(&reader, "foo").unwrap()); +//! println!("Get bar {:?}", store.get(&reader, "bar").unwrap()); +//! } +//! +//! // Committing a transaction consumes the writer, preventing you from reusing it by +//! // failing at compile time with an error. This line would report "error[E0382]: +//! // borrow of moved value: `writer`". +//! // store.put(&mut writer, "baz", &Value::Str("buz")).unwrap(); +//! } +//! +//! { +//! // Clearing all the entries in the store with a write transaction. +//! { +//! let mut writer = env.write().unwrap(); +//! store.put(&mut writer, "foo", &Value::Str("bar")).unwrap(); +//! store.put(&mut writer, "bar", &Value::Str("baz")).unwrap(); +//! writer.commit().unwrap(); +//! } +//! +//! { +//! let mut writer = env.write().unwrap(); +//! store.clear(&mut writer).unwrap(); +//! writer.commit().unwrap(); +//! } +//! +//! { +//! let reader = env.read().expect("reader"); +//! println!("It should be None! ({:?})", store.get(&reader, "foo").unwrap()); +//! println!("It should be None! ({:?})", store.get(&reader, "bar").unwrap()); +//! } +//! +//! } +//! +//! ``` + +mod env; +mod error; +mod helpers; +mod manager; +mod readwrite; + +pub mod backend; +#[cfg(feature = "lmdb")] +pub mod migrator; +pub mod store; +pub mod value; + +pub use backend::{DatabaseFlags, EnvironmentFlags, WriteFlags}; +pub use env::Rkv; +pub use error::{DataError, MigrateError, StoreError}; +pub use manager::Manager; +#[cfg(feature = "lmdb")] +pub use migrator::Migrator; +pub use readwrite::{Readable, Reader, Writer}; +pub use store::{keys::EncodableKey, single::SingleStore, CloseOptions, Options as StoreOptions}; +pub use value::{OwnedValue, Value}; + +#[cfg(feature = "db-dup-sort")] +pub use store::multi::MultiStore; + +#[cfg(feature = "db-int-key")] +pub use store::integer::IntegerStore; +#[cfg(feature = "db-int-key")] +pub use store::keys::PrimitiveInt; + +#[cfg(all(feature = "db-dup-sort", feature = "db-int-key"))] +pub use store::integermulti::MultiIntegerStore; diff --git a/third_party/rust/rkv/src/manager.rs b/third_party/rust/rkv/src/manager.rs new file mode 100644 index 0000000000..045564497c --- /dev/null +++ b/third_party/rust/rkv/src/manager.rs @@ -0,0 +1,247 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + os::raw::c_uint, + path::{Path, PathBuf}, + result, + sync::{Arc, RwLock}, +}; + +use lazy_static::lazy_static; + +#[cfg(feature = "lmdb")] +use crate::backend::LmdbEnvironment; +use crate::{ + backend::{BackendEnvironment, BackendEnvironmentBuilder, SafeModeEnvironment}, + error::{CloseError, StoreError}, + helpers::canonicalize_path, + store::CloseOptions, + Rkv, +}; + +type Result<T> = result::Result<T, StoreError>; +type CloseResult<T> = result::Result<T, CloseError>; +type SharedRkv<E> = Arc<RwLock<Rkv<E>>>; + +#[cfg(feature = "lmdb")] +lazy_static! { + static ref MANAGER_LMDB: RwLock<Manager<LmdbEnvironment>> = RwLock::new(Manager::new()); +} + +lazy_static! { + static ref MANAGER_SAFE_MODE: RwLock<Manager<SafeModeEnvironment>> = + RwLock::new(Manager::new()); +} + +/// A process is only permitted to have one open handle to each Rkv environment. This +/// manager exists to enforce that constraint: don't open environments directly. +/// +/// By default, path canonicalization is enabled for identifying RKV instances. This +/// is true by default, because it helps enforce the constraints guaranteed by +/// this manager. However, path canonicalization might crash in some fringe +/// circumstances, so the `no-canonicalize-path` feature offers the possibility of +/// disabling it. See: https://bugzilla.mozilla.org/show_bug.cgi?id=1531887 +/// +/// When path canonicalization is disabled, you *must* ensure an RKV environment is +/// always created or retrieved with the same path. +pub struct Manager<E> { + environments: BTreeMap<PathBuf, SharedRkv<E>>, +} + +impl<'e, E> Manager<E> +where + E: BackendEnvironment<'e>, +{ + fn new() -> Manager<E> { + Manager { + environments: Default::default(), + } + } + + /// Return the open env at `path`, returning `None` if it has not already been opened. + pub fn get<'p, P>(&self, path: P) -> Result<Option<SharedRkv<E>>> + where + P: Into<&'p Path>, + { + let canonical = if cfg!(feature = "no-canonicalize-path") { + path.into().to_path_buf() + } else { + canonicalize_path(path)? + }; + Ok(self.environments.get(&canonical).cloned()) + } + + /// Return the open env at `path`, or create it by calling `f`. + pub fn get_or_create<'p, F, P>(&mut self, path: P, f: F) -> Result<SharedRkv<E>> + where + F: FnOnce(&Path) -> Result<Rkv<E>>, + P: Into<&'p Path>, + { + let canonical = if cfg!(feature = "no-canonicalize-path") { + path.into().to_path_buf() + } else { + canonicalize_path(path)? + }; + Ok(match self.environments.entry(canonical) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let k = Arc::new(RwLock::new(f(e.key().as_path())?)); + e.insert(k).clone() + } + }) + } + + /// Return the open env at `path` with `capacity`, or create it by calling `f`. + pub fn get_or_create_with_capacity<'p, F, P>( + &mut self, + path: P, + capacity: c_uint, + f: F, + ) -> Result<SharedRkv<E>> + where + F: FnOnce(&Path, c_uint) -> Result<Rkv<E>>, + P: Into<&'p Path>, + { + let canonical = if cfg!(feature = "no-canonicalize-path") { + path.into().to_path_buf() + } else { + canonicalize_path(path)? + }; + Ok(match self.environments.entry(canonical) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let k = Arc::new(RwLock::new(f(e.key().as_path(), capacity)?)); + e.insert(k).clone() + } + }) + } + + /// Return a new Rkv environment from the builder, or create it by calling `f`. + pub fn get_or_create_from_builder<'p, F, P, B>( + &mut self, + path: P, + builder: B, + f: F, + ) -> Result<SharedRkv<E>> + where + F: FnOnce(&Path, B) -> Result<Rkv<E>>, + P: Into<&'p Path>, + B: BackendEnvironmentBuilder<'e, Environment = E>, + { + let canonical = if cfg!(feature = "no-canonicalize-path") { + path.into().to_path_buf() + } else { + canonicalize_path(path)? + }; + Ok(match self.environments.entry(canonical) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let k = Arc::new(RwLock::new(f(e.key().as_path(), builder)?)); + e.insert(k).clone() + } + }) + } + + /// Tries to close the specified environment. + /// Returns an error when other users of this environment still exist. + pub fn try_close<'p, P>(&mut self, path: P, options: CloseOptions) -> CloseResult<()> + where + P: Into<&'p Path>, + { + let canonical = if cfg!(feature = "no-canonicalize-path") { + path.into().to_path_buf() + } else { + canonicalize_path(path)? + }; + match self.environments.entry(canonical) { + Entry::Vacant(_) => Ok(()), + Entry::Occupied(e) if Arc::strong_count(e.get()) > 1 => { + Err(CloseError::EnvironmentStillOpen) + } + Entry::Occupied(e) => { + let env = Arc::try_unwrap(e.remove()) + .map_err(|_| CloseError::UnknownEnvironmentStillOpen)?; + env.into_inner()?.close(options)?; + Ok(()) + } + } + } +} + +#[cfg(feature = "lmdb")] +impl Manager<LmdbEnvironment> { + pub fn singleton() -> &'static RwLock<Manager<LmdbEnvironment>> { + &MANAGER_LMDB + } +} + +impl Manager<SafeModeEnvironment> { + pub fn singleton() -> &'static RwLock<Manager<SafeModeEnvironment>> { + &MANAGER_SAFE_MODE + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::*; + + use std::fs; + + use tempfile::Builder; + + #[cfg(feature = "lmdb")] + use backend::Lmdb; + + /// Test that one can mutate managed Rkv instances in surprising ways. + #[cfg(feature = "lmdb")] + #[test] + fn test_mutate_managed_rkv() { + let mut manager = Manager::<LmdbEnvironment>::new(); + + let root1 = Builder::new() + .prefix("test_mutate_managed_rkv_1") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root1.path()).expect("dir created"); + let path1 = root1.path(); + let arc = manager + .get_or_create(path1, Rkv::new::<Lmdb>) + .expect("created"); + + // Arc<RwLock<>> has interior mutability, so we can replace arc's Rkv instance with a new + // instance that has a different path. + let root2 = Builder::new() + .prefix("test_mutate_managed_rkv_2") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root2.path()).expect("dir created"); + let path2 = root2.path(); + { + let mut rkv = arc.write().expect("guard"); + let rkv2 = Rkv::new::<Lmdb>(path2).expect("Rkv"); + *rkv = rkv2; + } + + // Arc now has a different internal Rkv with path2, but it's still mapped to path1 in + // manager, so its pointer is equal to a new Arc for path1. + let path1_arc = manager.get(path1).expect("success").expect("existed"); + assert!(Arc::ptr_eq(&path1_arc, &arc)); + + // Meanwhile, a new Arc for path2 has a different pointer, even though its Rkv's path is + // the same as arc's current path. + let path2_arc = manager + .get_or_create(path2, Rkv::new::<Lmdb>) + .expect("success"); + assert!(!Arc::ptr_eq(&path2_arc, &arc)); + } +} diff --git a/third_party/rust/rkv/src/migrator.rs b/third_party/rust/rkv/src/migrator.rs new file mode 100644 index 0000000000..6dc8d60371 --- /dev/null +++ b/third_party/rust/rkv/src/migrator.rs @@ -0,0 +1,179 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +//! A simple utility for migrating data from one RVK environment to another. Notably, this +//! tool can migrate data from an enviroment created with a different backend than the +//! current RKV consumer (e.g from Lmdb to SafeMode). +//! +//! The utility doesn't support migrating between 32-bit and 64-bit LMDB environments yet, +//! see `arch_migrator` if this is needed. However, this utility is ultimately intended to +//! handle all possible migrations. +//! +//! The destination environment should be empty of data, otherwise an error is returned. +//! +//! There are 3 versions of the migration methods: +//! * `migrate_<src>_to_<dst>`, where `<src>` and `<dst>` are the source and destination +//! environment types. You're responsive with opening both these environments, handling +//! all errors, and performing any cleanup if necessary. +//! * `open_and_migrate_<src>_to_<dst>`, which is similar the the above, but automatically +//! attempts to open the source environment and delete all of its supporting files if +//! there's no other environment open at that path. You're still responsible with +//! handling all errors. +//! * `easy_migrate_<src>_to_<dst>` which is similar to the above, but ignores the +//! migration and doesn't delete any files if the source environment is invalid +//! (corrupted), unavailable (path not accessible or incompatible with configuration), +//! or empty (database has no records). +//! +//! The tool currently has these limitations: +//! +//! 1. It doesn't support migration from environments created with +//! `EnvironmentFlags::NO_SUB_DIR`. To migrate such an environment, create a temporary +//! directory, copy the environment's data files in the temporary directory, then +//! migrate the temporary directory as the source environment. +//! 2. It doesn't support migration from databases created with DatabaseFlags::DUP_SORT` +//! (with or without `DatabaseFlags::DUP_FIXED`) nor with `DatabaseFlags::INTEGER_KEY`. +//! This effectively means that migration is limited to `SingleStore`s. +//! 3. It doesn't allow for existing data in the destination environment, which means that +//! it cannot overwrite nor append data. + +use crate::{ + backend::{LmdbEnvironment, SafeModeEnvironment}, + error::MigrateError, + Rkv, StoreOptions, +}; + +pub use crate::backend::{LmdbArchMigrateError, LmdbArchMigrateResult, LmdbArchMigrator}; + +// FIXME: should parametrize this instead. + +macro_rules! fn_migrator { + ($name:tt, $src_env:ty, $dst_env:ty) => { + /// Migrate all data in all of databases from the source environment to the destination + /// environment. This includes all key/value pairs in the main database that aren't + /// metadata about subdatabases and all key/value pairs in all subdatabases. + /// + /// Other backend-specific metadata such as map size or maximum databases left intact on + /// the given environments. + /// + /// The destination environment should be empty of data, otherwise an error is returned. + pub fn $name<S, D>(src_env: S, dst_env: D) -> Result<(), MigrateError> + where + S: std::ops::Deref<Target = Rkv<$src_env>>, + D: std::ops::Deref<Target = Rkv<$dst_env>>, + { + let src_dbs = src_env.get_dbs().unwrap(); + if src_dbs.is_empty() { + return Err(MigrateError::SourceEmpty); + } + let dst_dbs = dst_env.get_dbs().unwrap(); + if !dst_dbs.is_empty() { + return Err(MigrateError::DestinationNotEmpty); + } + for name in src_dbs { + let src_store = src_env.open_single(name.as_deref(), StoreOptions::default())?; + let dst_store = dst_env.open_single(name.as_deref(), StoreOptions::create())?; + let reader = src_env.read()?; + let mut writer = dst_env.write()?; + let mut iter = src_store.iter_start(&reader)?; + while let Some(Ok((key, value))) = iter.next() { + dst_store.put(&mut writer, key, &value).expect("wrote"); + } + writer.commit()?; + } + Ok(()) + } + }; + + (open $migrate:tt, $name:tt, $builder:tt, $src_env:ty, $dst_env:ty) => { + /// Same as the the `migrate_x_to_y` migration method above, but automatically attempts + /// to open the source environment. Finally, deletes all of its supporting files if + /// there's no other environment open at that path and the migration succeeded. + pub fn $name<F, D>(path: &std::path::Path, build: F, dst_env: D) -> Result<(), MigrateError> + where + F: FnOnce(crate::backend::$builder) -> crate::backend::$builder, + D: std::ops::Deref<Target = Rkv<$dst_env>>, + { + use crate::{backend::*, CloseOptions}; + + let mut manager = crate::Manager::<$src_env>::singleton().write()?; + let mut builder = Rkv::<$src_env>::environment_builder::<$builder>(); + builder.set_max_dbs(crate::env::DEFAULT_MAX_DBS); + builder = build(builder); + + let src_env = + manager.get_or_create_from_builder(path, builder, Rkv::from_builder::<$builder>)?; + Migrator::$migrate(src_env.read()?, dst_env)?; + + drop(src_env); + manager.try_close(path, CloseOptions::delete_files_on_disk())?; + + Ok(()) + } + }; + + (easy $migrate:tt, $name:tt, $src_env:ty, $dst_env:ty) => { + /// Same as the `open_and_migrate_x_to_y` migration method above, but ignores the + /// migration and doesn't delete any files if the following conditions apply: + /// - Source environment is invalid/corrupted, unavailable, or empty. + /// - Destination environment is not empty. + /// Use this instead of the other migration methods if: + /// - You're not concerned by throwing away old data and starting fresh with a new store. + /// - You'll never want to overwrite data in the new store from the old store. + pub fn $name<D>(path: &std::path::Path, dst_env: D) -> Result<(), MigrateError> + where + D: std::ops::Deref<Target = Rkv<$dst_env>>, + { + match Migrator::$migrate(path, |builder| builder, dst_env) { + // Source environment is an invalid file or corrupted database. + Err(crate::MigrateError::StoreError(crate::StoreError::FileInvalid)) => Ok(()), + Err(crate::MigrateError::StoreError(crate::StoreError::DatabaseCorrupted)) => { + Ok(()) + } + // Path not accessible. + Err(crate::MigrateError::StoreError(crate::StoreError::IoError(_))) => Ok(()), + // Path accessible but incompatible for configuration. + Err(crate::MigrateError::StoreError( + crate::StoreError::UnsuitableEnvironmentPath(_), + )) => Ok(()), + // Couldn't close source environment and delete files on disk (e.g. other stores still open). + Err(crate::MigrateError::CloseError(_)) => Ok(()), + // Nothing to migrate. + Err(crate::MigrateError::SourceEmpty) => Ok(()), + // Migrating would overwrite. + Err(crate::MigrateError::DestinationNotEmpty) => Ok(()), + result => result, + }?; + + Ok(()) + } + }; +} + +macro_rules! fns_migrator { + ($src:tt, $dst:tt) => { + paste::item! { + fns_migrator!([<migrate_ $src _to_ $dst>], $src, $dst); + fns_migrator!([<migrate_ $dst _to_ $src>], $dst, $src); + } + }; + ($name:tt, $src:tt, $dst:tt) => { + paste::item! { + fn_migrator!($name, [<$src:camel Environment>], [<$dst:camel Environment>]); + fn_migrator!(open $name, [<open_and_ $name>], [<$src:camel>], [<$src:camel Environment>], [<$dst:camel Environment>]); + fn_migrator!(easy [<open_and_ $name>], [<easy_ $name>], [<$src:camel Environment>], [<$dst:camel Environment>]); + } + }; +} + +pub struct Migrator; + +impl Migrator { + fns_migrator!(lmdb, safe_mode); +} diff --git a/third_party/rust/rkv/src/readwrite.rs b/third_party/rust/rkv/src/readwrite.rs new file mode 100644 index 0000000000..38b4746210 --- /dev/null +++ b/third_party/rust/rkv/src/readwrite.rs @@ -0,0 +1,154 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::{ + backend::{ + BackendDatabase, BackendRoCursor, BackendRoCursorTransaction, BackendRoTransaction, + BackendRwCursorTransaction, BackendRwTransaction, + }, + error::StoreError, + helpers::read_transform, + value::Value, +}; + +pub struct Reader<T>(T); +pub struct Writer<T>(T); + +pub trait Readable<'r> { + type Database: BackendDatabase; + type RoCursor: BackendRoCursor<'r>; + + fn get<K>(&'r self, db: &Self::Database, k: &K) -> Result<Option<Value<'r>>, StoreError> + where + K: AsRef<[u8]>; + + fn open_ro_cursor(&'r self, db: &Self::Database) -> Result<Self::RoCursor, StoreError>; +} + +impl<'r, T> Readable<'r> for Reader<T> +where + T: BackendRoCursorTransaction<'r>, +{ + type Database = T::Database; + type RoCursor = T::RoCursor; + + fn get<K>(&'r self, db: &T::Database, k: &K) -> Result<Option<Value<'r>>, StoreError> + where + K: AsRef<[u8]>, + { + let bytes = self.0.get(db, k.as_ref()).map_err(|e| e.into()); + match read_transform(bytes).map(Some) { + Err(StoreError::KeyValuePairNotFound) => Ok(None), + result => result, + } + } + + fn open_ro_cursor(&'r self, db: &T::Database) -> Result<T::RoCursor, StoreError> { + self.0.open_ro_cursor(db).map_err(|e| e.into()) + } +} + +impl<T> Reader<T> { + pub(crate) fn new(txn: T) -> Reader<T> { + Reader(txn) + } +} + +impl<T> Reader<T> +where + T: BackendRoTransaction, +{ + pub fn abort(self) { + self.0.abort(); + } +} + +impl<'r, T> Readable<'r> for Writer<T> +where + T: BackendRwCursorTransaction<'r>, +{ + type Database = T::Database; + type RoCursor = T::RoCursor; + + fn get<K>(&'r self, db: &T::Database, k: &K) -> Result<Option<Value<'r>>, StoreError> + where + K: AsRef<[u8]>, + { + let bytes = self.0.get(db, k.as_ref()).map_err(|e| e.into()); + match read_transform(bytes).map(Some) { + Err(StoreError::KeyValuePairNotFound) => Ok(None), + result => result, + } + } + + fn open_ro_cursor(&'r self, db: &T::Database) -> Result<T::RoCursor, StoreError> { + self.0.open_ro_cursor(db).map_err(|e| e.into()) + } +} + +impl<T> Writer<T> { + pub(crate) fn new(txn: T) -> Writer<T> { + Writer(txn) + } +} + +impl<T> Writer<T> +where + T: BackendRwTransaction, +{ + pub fn commit(self) -> Result<(), StoreError> { + self.0.commit().map_err(|e| e.into()) + } + + pub fn abort(self) { + self.0.abort(); + } + + pub(crate) fn put<K>( + &mut self, + db: &T::Database, + k: &K, + v: &Value, + flags: T::Flags, + ) -> Result<(), StoreError> + where + K: AsRef<[u8]>, + { + // TODO: don't allocate twice. + self.0 + .put(db, k.as_ref(), &v.to_bytes()?, flags) + .map_err(|e| e.into()) + } + + #[cfg(not(feature = "db-dup-sort"))] + pub(crate) fn delete<K>(&mut self, db: &T::Database, k: &K) -> Result<(), StoreError> + where + K: AsRef<[u8]>, + { + self.0.del(db, k.as_ref()).map_err(|e| e.into()) + } + + #[cfg(feature = "db-dup-sort")] + pub(crate) fn delete<K>( + &mut self, + db: &T::Database, + k: &K, + v: Option<&[u8]>, + ) -> Result<(), StoreError> + where + K: AsRef<[u8]>, + { + self.0.del(db, k.as_ref(), v).map_err(|e| e.into()) + } + + pub(crate) fn clear(&mut self, db: &T::Database) -> Result<(), StoreError> { + self.0.clear_db(db).map_err(|e| e.into()) + } +} diff --git a/third_party/rust/rkv/src/store.rs b/third_party/rust/rkv/src/store.rs new file mode 100644 index 0000000000..c8ac2c583f --- /dev/null +++ b/third_party/rust/rkv/src/store.rs @@ -0,0 +1,52 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +pub mod keys; +pub mod single; + +#[cfg(feature = "db-dup-sort")] +pub mod multi; + +#[cfg(feature = "db-int-key")] +pub mod integer; + +#[cfg(all(feature = "db-dup-sort", feature = "db-int-key"))] +pub mod integermulti; + +use crate::backend::BackendDatabaseFlags; + +#[derive(Default, Debug, Copy, Clone)] +pub struct Options<F> { + pub create: bool, + pub flags: F, +} + +impl<F> Options<F> +where + F: BackendDatabaseFlags, +{ + pub fn create() -> Options<F> { + Options { + create: true, + flags: F::empty(), + } + } +} + +#[derive(Default, Debug, Copy, Clone)] +pub struct CloseOptions { + pub delete: bool, +} + +impl CloseOptions { + pub fn delete_files_on_disk() -> CloseOptions { + CloseOptions { delete: true } + } +} diff --git a/third_party/rust/rkv/src/store/integer.rs b/third_party/rust/rkv/src/store/integer.rs new file mode 100644 index 0000000000..f150f09c5f --- /dev/null +++ b/third_party/rust/rkv/src/store/integer.rs @@ -0,0 +1,637 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::marker::PhantomData; + +use crate::{ + backend::{BackendDatabase, BackendRwTransaction}, + error::StoreError, + readwrite::{Readable, Writer}, + store::{ + keys::{Key, PrimitiveInt}, + single::SingleStore, + }, + value::Value, +}; + +type EmptyResult = Result<(), StoreError>; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct IntegerStore<D, K> { + inner: SingleStore<D>, + phantom: PhantomData<K>, +} + +impl<D, K> IntegerStore<D, K> +where + D: BackendDatabase, + K: PrimitiveInt, +{ + pub(crate) fn new(db: D) -> IntegerStore<D, K> { + IntegerStore { + inner: SingleStore::new(db), + phantom: PhantomData, + } + } + + pub fn get<'r, R>(&self, reader: &'r R, k: K) -> Result<Option<Value<'r>>, StoreError> + where + R: Readable<'r, Database = D>, + { + self.inner.get(reader, Key::new(&k)?) + } + + pub fn put<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.put(writer, Key::new(&k)?, v) + } + + pub fn delete<T>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.delete(writer, Key::new(&k)?) + } + + pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.clear(writer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::*; + + use std::fs; + + use tempfile::Builder; + + #[test] + fn test_integer_keys() { + let root = Builder::new() + .prefix("test_integer_keys") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + macro_rules! test_integer_keys { + ($type:ty, $key:expr) => {{ + let mut writer = k.write().expect("writer"); + + s.put(&mut writer, $key, &Value::Str("hello!")) + .expect("write"); + assert_eq!( + s.get(&writer, $key).expect("read"), + Some(Value::Str("hello!")) + ); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!( + s.get(&reader, $key).expect("read"), + Some(Value::Str("hello!")) + ); + }}; + } + + test_integer_keys!(u32, std::u32::MIN); + test_integer_keys!(u32, std::u32::MAX); + } + + #[test] + fn test_clear() { + let root = Builder::new() + .prefix("test_integer_clear") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 3, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 3).expect("read"), Some(Value::Str("hello!"))); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup() { + let root = Builder::new() + .prefix("test_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("foo!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("bar!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("bar!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_del() { + let root = Builder::new() + .prefix("test_integer_del") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("foo!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("bar!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("bar!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1).expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 2).expect_err("not deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 2).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 3).expect_err("not deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_persist() { + let root = Builder::new() + .prefix("test_integer_persist") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 3, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 3).expect("read"), Some(Value::Str("hello!"))); + writer.commit().expect("committed"); + } + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 3).expect("read"), Some(Value::Str("hello!"))); + } + } + + #[test] + fn test_intertwine_read_write() { + let root = Builder::new() + .prefix("test_integer_intertwine_read_write") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + let reader = k.read().expect("reader"); + let mut writer = k.write().expect("writer"); + + { + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + + { + s.put(&mut writer, 1, &Value::Str("goodbye!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("goodbye!")) + .expect("write"); + s.put(&mut writer, 3, &Value::Str("goodbye!")) + .expect("write"); + assert_eq!( + s.get(&writer, 1).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + writer.commit().expect("committed"); + } + + { + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!( + s.get(&writer, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + writer.commit().expect("committed"); + } + + { + let reader = k.write().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!( + s.get(&reader, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&reader, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + reader.commit().expect("committed"); + } + } +} + +#[cfg(test)] +mod tests_safe { + use super::*; + use crate::*; + + use std::fs; + + use tempfile::Builder; + + #[test] + fn test_integer_keys() { + let root = Builder::new() + .prefix("test_integer_keys") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + macro_rules! test_integer_keys { + ($type:ty, $key:expr) => {{ + let mut writer = k.write().expect("writer"); + + s.put(&mut writer, $key, &Value::Str("hello!")) + .expect("write"); + assert_eq!( + s.get(&writer, $key).expect("read"), + Some(Value::Str("hello!")) + ); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!( + s.get(&reader, $key).expect("read"), + Some(Value::Str("hello!")) + ); + }}; + } + + test_integer_keys!(u32, std::u32::MIN); + test_integer_keys!(u32, std::u32::MAX); + } + + #[test] + fn test_clear() { + let root = Builder::new() + .prefix("test_integer_clear") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 3, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 3).expect("read"), Some(Value::Str("hello!"))); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup() { + let root = Builder::new() + .prefix("test_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("foo!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("bar!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("bar!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_del() { + let root = Builder::new() + .prefix("test_integer_del") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("foo!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("bar!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("bar!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1).expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 2).expect_err("not deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 2).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 3).expect_err("not deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_persist() { + let root = Builder::new() + .prefix("test_integer_persist") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 3, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 3).expect("read"), Some(Value::Str("hello!"))); + writer.commit().expect("committed"); + } + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 3).expect("read"), Some(Value::Str("hello!"))); + } + } + + #[test] + fn test_intertwine_read_write() { + let root = Builder::new() + .prefix("test_integer_intertwine_read_write") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k.open_integer("s", StoreOptions::create()).expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&writer, 2).expect("read"), None); + assert_eq!(s.get(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + let reader = k.read().expect("reader"); + let mut writer = k.write().expect("writer"); + + { + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + + { + s.put(&mut writer, 1, &Value::Str("goodbye!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("goodbye!")) + .expect("write"); + s.put(&mut writer, 3, &Value::Str("goodbye!")) + .expect("write"); + assert_eq!( + s.get(&writer, 1).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + writer.commit().expect("committed"); + } + + { + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!(s.get(&reader, 2).expect("read"), None); + assert_eq!(s.get(&reader, 3).expect("read"), None); + } + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + assert_eq!(s.get(&writer, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!( + s.get(&writer, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&writer, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + writer.commit().expect("committed"); + } + + { + let reader = k.write().expect("reader"); + assert_eq!(s.get(&reader, 1).expect("read"), Some(Value::Str("hello!"))); + assert_eq!( + s.get(&reader, 2).expect("read"), + Some(Value::Str("goodbye!")) + ); + assert_eq!( + s.get(&reader, 3).expect("read"), + Some(Value::Str("goodbye!")) + ); + reader.commit().expect("committed"); + } + } +} diff --git a/third_party/rust/rkv/src/store/integermulti.rs b/third_party/rust/rkv/src/store/integermulti.rs new file mode 100644 index 0000000000..71dd0aae04 --- /dev/null +++ b/third_party/rust/rkv/src/store/integermulti.rs @@ -0,0 +1,709 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::marker::PhantomData; + +use crate::{ + backend::{BackendDatabase, BackendIter, BackendRoCursor, BackendRwTransaction}, + error::StoreError, + readwrite::{Readable, Writer}, + store::{ + keys::{Key, PrimitiveInt}, + multi::{Iter, MultiStore}, + }, + value::Value, +}; + +type EmptyResult = Result<(), StoreError>; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct MultiIntegerStore<D, K> { + inner: MultiStore<D>, + phantom: PhantomData<K>, +} + +impl<D, K> MultiIntegerStore<D, K> +where + D: BackendDatabase, + K: PrimitiveInt, +{ + pub(crate) fn new(db: D) -> MultiIntegerStore<D, K> { + MultiIntegerStore { + inner: MultiStore::new(db), + phantom: PhantomData, + } + } + + pub fn get<'r, R, I, C>(&self, reader: &'r R, k: K) -> Result<Iter<'r, I>, StoreError> + where + R: Readable<'r, Database = D, RoCursor = C>, + I: BackendIter<'r>, + C: BackendRoCursor<'r, Iter = I>, + K: 'r, + { + self.inner.get(reader, Key::new(&k)?) + } + + pub fn get_first<'r, R>(&self, reader: &'r R, k: K) -> Result<Option<Value<'r>>, StoreError> + where + R: Readable<'r, Database = D>, + { + self.inner.get_first(reader, Key::new(&k)?) + } + + pub fn put<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.put(writer, Key::new(&k)?, v) + } + + pub fn put_with_flags<T>( + &self, + writer: &mut Writer<T>, + k: K, + v: &Value, + flags: T::Flags, + ) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.put_with_flags(writer, Key::new(&k)?, v, flags) + } + + pub fn delete_all<T>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.delete_all(writer, Key::new(&k)?) + } + + pub fn delete<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.delete(writer, Key::new(&k)?, v) + } + + pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + self.inner.clear(writer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::*; + + use std::fs; + + use tempfile::Builder; + + #[test] + fn test_integer_keys() { + let root = Builder::new() + .prefix("test_integer_keys") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + macro_rules! test_integer_keys { + ($type:ty, $key:expr) => {{ + let mut writer = k.write().expect("writer"); + + s.put(&mut writer, $key, &Value::Str("hello!")) + .expect("write"); + assert_eq!( + s.get_first(&writer, $key).expect("read"), + Some(Value::Str("hello!")) + ); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!( + s.get_first(&reader, $key).expect("read"), + Some(Value::Str("hello!")) + ); + }}; + } + + test_integer_keys!(u32, std::u32::MIN); + test_integer_keys!(u32, std::u32::MAX); + } + + #[test] + fn test_clear() { + let root = Builder::new() + .prefix("test_multi_integer_clear") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + assert_eq!( + s.get_first(&writer, 1).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!( + s.get_first(&writer, 2).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!(s.get_first(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get_first(&reader, 1).expect("read"), None); + assert_eq!(s.get_first(&reader, 2).expect("read"), None); + assert_eq!(s.get_first(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + assert_eq!( + s.get_first(&writer, 1).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!(s.get_first(&writer, 2).expect("read"), None); + assert_eq!(s.get_first(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get_first(&reader, 1).expect("read"), None); + assert_eq!(s.get_first(&reader, 2).expect("read"), None); + assert_eq!(s.get_first(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup_2() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + } + + #[test] + fn test_del() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + { + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello!")) + .expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello!")) + .expect_err("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello1!")) + .expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello1!")) + .expect_err("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert!(iter.next().is_none()); + } + } + + #[test] + fn test_persist() { + let root = Builder::new() + .prefix("test_multi_integer_persist") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + { + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + writer.commit().expect("committed"); + } + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + } +} + +#[cfg(test)] +mod tests_safe { + use super::*; + use crate::*; + + use std::fs; + + use tempfile::Builder; + + #[test] + fn test_integer_keys() { + let root = Builder::new() + .prefix("test_integer_keys") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + macro_rules! test_integer_keys { + ($type:ty, $key:expr) => {{ + let mut writer = k.write().expect("writer"); + + s.put(&mut writer, $key, &Value::Str("hello!")) + .expect("write"); + assert_eq!( + s.get_first(&writer, $key).expect("read"), + Some(Value::Str("hello!")) + ); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!( + s.get_first(&reader, $key).expect("read"), + Some(Value::Str("hello!")) + ); + }}; + } + + test_integer_keys!(u32, std::u32::MIN); + test_integer_keys!(u32, std::u32::MAX); + } + + #[test] + fn test_clear() { + let root = Builder::new() + .prefix("test_multi_integer_clear") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + assert_eq!( + s.get_first(&writer, 1).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!( + s.get_first(&writer, 2).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!(s.get_first(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get_first(&reader, 1).expect("read"), None); + assert_eq!(s.get_first(&reader, 2).expect("read"), None); + assert_eq!(s.get_first(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + assert_eq!( + s.get_first(&writer, 1).expect("read"), + Some(Value::Str("hello!")) + ); + assert_eq!(s.get_first(&writer, 2).expect("read"), None); + assert_eq!(s.get_first(&writer, 3).expect("read"), None); + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.clear(&mut writer).expect("cleared"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + assert_eq!(s.get_first(&reader, 1).expect("read"), None); + assert_eq!(s.get_first(&reader, 2).expect("read"), None); + assert_eq!(s.get_first(&reader, 3).expect("read"), None); + } + } + + #[test] + fn test_dup_2() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + } + + #[test] + fn test_del() { + let root = Builder::new() + .prefix("test_multi_integer_dup") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + { + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + { + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + writer.commit().expect("committed"); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello!")) + .expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello!")) + .expect_err("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello1!")) + .expect("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert!(iter.next().is_none()); + } + + { + let mut writer = k.write().expect("writer"); + s.delete(&mut writer, 1, &Value::Str("hello1!")) + .expect_err("deleted"); + writer.commit().expect("committed"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert!(iter.next().is_none()); + } + } + + #[test] + fn test_persist() { + let root = Builder::new() + .prefix("test_multi_integer_persist") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + let mut writer = k.write().expect("writer"); + s.put(&mut writer, 1, &Value::Str("hello!")).expect("write"); + s.put(&mut writer, 1, &Value::Str("hello1!")) + .expect("write"); + s.put(&mut writer, 2, &Value::Str("hello!")).expect("write"); + { + let mut iter = s.get(&writer, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + writer.commit().expect("committed"); + } + + { + let k = Rkv::new::<backend::SafeMode>(root.path()).expect("new succeeded"); + let s = k + .open_multi_integer("s", StoreOptions::create()) + .expect("open"); + + let reader = k.read().expect("reader"); + let mut iter = s.get(&reader, 1).expect("read"); + assert_eq!( + iter.next().expect("first").expect("ok").1, + Value::Str("hello!") + ); + assert_eq!( + iter.next().expect("second").expect("ok").1, + Value::Str("hello1!") + ); + assert!(iter.next().is_none()); + } + } +} diff --git a/third_party/rust/rkv/src/store/keys.rs b/third_party/rust/rkv/src/store/keys.rs new file mode 100644 index 0000000000..26a13db47a --- /dev/null +++ b/third_party/rust/rkv/src/store/keys.rs @@ -0,0 +1,46 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +mod encodables; +mod primitives; + +use std::marker::PhantomData; + +use crate::error::DataError; + +pub use encodables::*; +pub use primitives::*; + +pub(crate) struct Key<K> { + bytes: Vec<u8>, + phantom: PhantomData<K>, +} + +impl<K> AsRef<[u8]> for Key<K> +where + K: EncodableKey, +{ + fn as_ref(&self) -> &[u8] { + self.bytes.as_ref() + } +} + +impl<K> Key<K> +where + K: EncodableKey, +{ + #[allow(clippy::new_ret_no_self)] + pub fn new(k: &K) -> Result<Key<K>, DataError> { + Ok(Key { + bytes: k.to_bytes()?, + phantom: PhantomData, + }) + } +} diff --git a/third_party/rust/rkv/src/store/keys/encodables.rs b/third_party/rust/rkv/src/store/keys/encodables.rs new file mode 100644 index 0000000000..85e3eacdf5 --- /dev/null +++ b/third_party/rust/rkv/src/store/keys/encodables.rs @@ -0,0 +1,27 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use bincode::serialize; +use serde::Serialize; + +use crate::error::DataError; + +pub trait EncodableKey { + fn to_bytes(&self) -> Result<Vec<u8>, DataError>; +} + +impl<T> EncodableKey for T +where + T: Serialize, +{ + fn to_bytes(&self) -> Result<Vec<u8>, DataError> { + serialize(self).map_err(|e| e.into()) + } +} diff --git a/third_party/rust/rkv/src/store/keys/primitives.rs b/third_party/rust/rkv/src/store/keys/primitives.rs new file mode 100644 index 0000000000..26282b7cbb --- /dev/null +++ b/third_party/rust/rkv/src/store/keys/primitives.rs @@ -0,0 +1,15 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use crate::store::keys::EncodableKey; + +pub trait PrimitiveInt: EncodableKey {} + +impl PrimitiveInt for u32 {} diff --git a/third_party/rust/rkv/src/store/multi.rs b/third_party/rust/rkv/src/store/multi.rs new file mode 100644 index 0000000000..c7f816b1ec --- /dev/null +++ b/third_party/rust/rkv/src/store/multi.rs @@ -0,0 +1,133 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::marker::PhantomData; + +use crate::{ + backend::{BackendDatabase, BackendFlags, BackendIter, BackendRoCursor, BackendRwTransaction}, + error::StoreError, + helpers::read_transform, + readwrite::{Readable, Writer}, + value::Value, +}; + +type EmptyResult = Result<(), StoreError>; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct MultiStore<D> { + db: D, +} + +pub struct Iter<'i, I> { + iter: I, + phantom: PhantomData<&'i ()>, +} + +impl<D> MultiStore<D> +where + D: BackendDatabase, +{ + pub(crate) fn new(db: D) -> MultiStore<D> { + MultiStore { db } + } + + /// Provides a cursor to all of the values for the duplicate entries that match this + /// key + pub fn get<'r, R, I, C, K>(&self, reader: &'r R, k: K) -> Result<Iter<'r, I>, StoreError> + where + R: Readable<'r, Database = D, RoCursor = C>, + I: BackendIter<'r>, + C: BackendRoCursor<'r, Iter = I>, + K: AsRef<[u8]> + 'r, + { + let cursor = reader.open_ro_cursor(&self.db)?; + let iter = cursor.into_iter_dup_of(k); + + Ok(Iter { + iter, + phantom: PhantomData, + }) + } + + /// Provides the first value that matches this key + pub fn get_first<'r, R, K>(&self, reader: &'r R, k: K) -> Result<Option<Value<'r>>, StoreError> + where + R: Readable<'r, Database = D>, + K: AsRef<[u8]>, + { + reader.get(&self.db, &k) + } + + /// Insert a value at the specified key. + /// This put will allow duplicate entries. If you wish to have duplicate entries + /// rejected, use the `put_with_flags` function and specify NO_DUP_DATA + pub fn put<T, K>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.put(&self.db, &k, v, T::Flags::empty()) + } + + pub fn put_with_flags<T, K>( + &self, + writer: &mut Writer<T>, + k: K, + v: &Value, + flags: T::Flags, + ) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.put(&self.db, &k, v, flags) + } + + pub fn delete_all<T, K>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.delete(&self.db, &k, None) + } + + pub fn delete<T, K>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.delete(&self.db, &k, Some(&v.to_bytes()?)) + } + + pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + { + writer.clear(&self.db) + } +} + +impl<'i, I> Iterator for Iter<'i, I> +where + I: BackendIter<'i>, +{ + type Item = Result<(&'i [u8], Value<'i>), StoreError>; + + fn next(&mut self) -> Option<Self::Item> { + match self.iter.next() { + None => None, + Some(Ok((key, bytes))) => match read_transform(Ok(bytes)) { + Ok(val) => Some(Ok((key, val))), + Err(err) => Some(Err(err)), + }, + Some(Err(err)) => Some(Err(err.into())), + } + } +} diff --git a/third_party/rust/rkv/src/store/single.rs b/third_party/rust/rkv/src/store/single.rs new file mode 100644 index 0000000000..c456e6aaa6 --- /dev/null +++ b/third_party/rust/rkv/src/store/single.rs @@ -0,0 +1,132 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::marker::PhantomData; + +use crate::{ + backend::{BackendDatabase, BackendFlags, BackendIter, BackendRoCursor, BackendRwTransaction}, + error::StoreError, + helpers::read_transform, + readwrite::{Readable, Writer}, + value::Value, +}; + +type EmptyResult = Result<(), StoreError>; + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct SingleStore<D> { + db: D, +} + +pub struct Iter<'i, I> { + iter: I, + phantom: PhantomData<&'i ()>, +} + +impl<D> SingleStore<D> +where + D: BackendDatabase, +{ + pub(crate) fn new(db: D) -> SingleStore<D> { + SingleStore { db } + } + + pub fn get<'r, R, K>(&self, reader: &'r R, k: K) -> Result<Option<Value<'r>>, StoreError> + where + R: Readable<'r, Database = D>, + K: AsRef<[u8]>, + { + reader.get(&self.db, &k) + } + + // TODO: flags + pub fn put<T, K>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.put(&self.db, &k, v, T::Flags::empty()) + } + + #[cfg(not(feature = "db-dup-sort"))] + pub fn delete<T, K>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.delete(&self.db, &k) + } + + #[cfg(feature = "db-dup-sort")] + pub fn delete<T, K>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult + where + T: BackendRwTransaction<Database = D>, + K: AsRef<[u8]>, + { + writer.delete(&self.db, &k, None) + } + + pub fn iter_start<'r, R, I, C>(&self, reader: &'r R) -> Result<Iter<'r, I>, StoreError> + where + R: Readable<'r, Database = D, RoCursor = C>, + I: BackendIter<'r>, + C: BackendRoCursor<'r, Iter = I>, + { + let cursor = reader.open_ro_cursor(&self.db)?; + let iter = cursor.into_iter(); + + Ok(Iter { + iter, + phantom: PhantomData, + }) + } + + pub fn iter_from<'r, R, I, C, K>(&self, reader: &'r R, k: K) -> Result<Iter<'r, I>, StoreError> + where + R: Readable<'r, Database = D, RoCursor = C>, + I: BackendIter<'r>, + C: BackendRoCursor<'r, Iter = I>, + K: AsRef<[u8]> + 'r, + { + let cursor = reader.open_ro_cursor(&self.db)?; + let iter = cursor.into_iter_from(k); + + Ok(Iter { + iter, + phantom: PhantomData, + }) + } + + pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult + where + D: BackendDatabase, + T: BackendRwTransaction<Database = D>, + { + writer.clear(&self.db) + } +} + +impl<'i, I> Iterator for Iter<'i, I> +where + I: BackendIter<'i>, +{ + type Item = Result<(&'i [u8], Value<'i>), StoreError>; + + fn next(&mut self) -> Option<Self::Item> { + match self.iter.next() { + None => None, + Some(Ok((key, bytes))) => match read_transform(Ok(bytes)) { + Ok(val) => Some(Ok((key, val))), + Err(err) => Some(Err(err)), + }, + Some(Err(err)) => Some(Err(err.into())), + } + } +} diff --git a/third_party/rust/rkv/src/value.rs b/third_party/rust/rkv/src/value.rs new file mode 100644 index 0000000000..762607acd3 --- /dev/null +++ b/third_party/rust/rkv/src/value.rs @@ -0,0 +1,252 @@ +// Copyright 2018-2019 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::fmt; + +use arrayref::array_ref; +use bincode::{deserialize, serialize, serialized_size}; +use ordered_float::OrderedFloat; +use uuid::{Bytes, Uuid}; + +use crate::error::DataError; + +/// We define a set of types, associated with simple integers, to annotate values stored +/// in LMDB. This is to avoid an accidental 'cast' from a value of one type to another. +/// For this reason we don't simply use `deserialize` from the `bincode` crate. +#[repr(u8)] +#[derive(Debug, PartialEq, Eq)] +pub enum Type { + Bool = 1, + U64 = 2, + I64 = 3, + F64 = 4, + Instant = 5, // Millisecond-precision timestamp. + Uuid = 6, + Str = 7, + Json = 8, + Blob = 9, +} + +/// We use manual tagging, because <https://github.com/serde-rs/serde/issues/610>. +impl Type { + pub fn from_tag(tag: u8) -> Result<Type, DataError> { + #![allow(clippy::unnecessary_lazy_evaluations)] + Type::from_primitive(tag).ok_or_else(|| DataError::UnknownType(tag)) + } + + #[allow(clippy::wrong_self_convention)] + pub fn to_tag(self) -> u8 { + self as u8 + } + + fn from_primitive(p: u8) -> Option<Type> { + match p { + 1 => Some(Type::Bool), + 2 => Some(Type::U64), + 3 => Some(Type::I64), + 4 => Some(Type::F64), + 5 => Some(Type::Instant), + 6 => Some(Type::Uuid), + 7 => Some(Type::Str), + 8 => Some(Type::Json), + 9 => Some(Type::Blob), + _ => None, + } + } +} + +impl fmt::Display for Type { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str(match *self { + Type::Bool => "bool", + Type::U64 => "u64", + Type::I64 => "i64", + Type::F64 => "f64", + Type::Instant => "instant", + Type::Uuid => "uuid", + Type::Str => "str", + Type::Json => "json", + Type::Blob => "blob", + }) + } +} + +#[derive(Debug, Eq, PartialEq)] +pub enum Value<'v> { + Bool(bool), + U64(u64), + I64(i64), + F64(OrderedFloat<f64>), + Instant(i64), // Millisecond-precision timestamp. + Uuid(&'v Bytes), + Str(&'v str), + Json(&'v str), + Blob(&'v [u8]), +} + +#[derive(Clone, Debug, PartialEq)] +pub enum OwnedValue { + Bool(bool), + U64(u64), + I64(i64), + F64(f64), + Instant(i64), // Millisecond-precision timestamp. + Uuid(Uuid), + Str(String), + Json(String), // TODO + Blob(Vec<u8>), +} + +fn uuid(bytes: &[u8]) -> Result<Value, DataError> { + if bytes.len() == 16 { + Ok(Value::Uuid(array_ref![bytes, 0, 16])) + } else { + Err(DataError::InvalidUuid) + } +} + +impl<'v> Value<'v> { + pub fn from_tagged_slice(slice: &'v [u8]) -> Result<Value<'v>, DataError> { + let (tag, data) = slice.split_first().ok_or(DataError::Empty)?; + let t = Type::from_tag(*tag)?; + Value::from_type_and_data(t, data) + } + + fn from_type_and_data(t: Type, data: &'v [u8]) -> Result<Value<'v>, DataError> { + if t == Type::Uuid { + return deserialize(data) + .map_err(|e| DataError::DecodingError { + value_type: t, + err: e, + }) + .map(uuid)?; + } + + match t { + Type::Bool => deserialize(data).map(Value::Bool), + Type::U64 => deserialize(data).map(Value::U64), + Type::I64 => deserialize(data).map(Value::I64), + Type::F64 => deserialize(data).map(OrderedFloat).map(Value::F64), + Type::Instant => deserialize(data).map(Value::Instant), + Type::Str => deserialize(data).map(Value::Str), + Type::Json => deserialize(data).map(Value::Json), + Type::Blob => deserialize(data).map(Value::Blob), + Type::Uuid => { + // Processed above to avoid verbose duplication of error transforms. + unreachable!() + } + } + .map_err(|e| DataError::DecodingError { + value_type: t, + err: e, + }) + } + + pub fn to_bytes(&self) -> Result<Vec<u8>, DataError> { + match self { + Value::Bool(v) => serialize(&(Type::Bool.to_tag(), *v)), + Value::U64(v) => serialize(&(Type::U64.to_tag(), *v)), + Value::I64(v) => serialize(&(Type::I64.to_tag(), *v)), + Value::F64(v) => serialize(&(Type::F64.to_tag(), v.0)), + Value::Instant(v) => serialize(&(Type::Instant.to_tag(), *v)), + Value::Str(v) => serialize(&(Type::Str.to_tag(), v)), + Value::Json(v) => serialize(&(Type::Json.to_tag(), v)), + Value::Blob(v) => serialize(&(Type::Blob.to_tag(), v)), + Value::Uuid(v) => serialize(&(Type::Uuid.to_tag(), v)), + } + .map_err(DataError::EncodingError) + } + + pub fn serialized_size(&self) -> Result<u64, DataError> { + match self { + Value::Bool(v) => serialized_size(&(Type::Bool.to_tag(), *v)), + Value::U64(v) => serialized_size(&(Type::U64.to_tag(), *v)), + Value::I64(v) => serialized_size(&(Type::I64.to_tag(), *v)), + Value::F64(v) => serialized_size(&(Type::F64.to_tag(), v.0)), + Value::Instant(v) => serialized_size(&(Type::Instant.to_tag(), *v)), + Value::Str(v) => serialized_size(&(Type::Str.to_tag(), v)), + Value::Json(v) => serialized_size(&(Type::Json.to_tag(), v)), + Value::Blob(v) => serialized_size(&(Type::Blob.to_tag(), v)), + Value::Uuid(v) => serialized_size(&(Type::Uuid.to_tag(), v)), + } + .map_err(DataError::EncodingError) + } +} + +impl<'v> From<&'v Value<'v>> for OwnedValue { + fn from(value: &Value) -> OwnedValue { + match value { + Value::Bool(v) => OwnedValue::Bool(*v), + Value::U64(v) => OwnedValue::U64(*v), + Value::I64(v) => OwnedValue::I64(*v), + Value::F64(v) => OwnedValue::F64(**v), + Value::Instant(v) => OwnedValue::Instant(*v), + Value::Uuid(v) => OwnedValue::Uuid(Uuid::from_bytes(**v)), + Value::Str(v) => OwnedValue::Str((*v).to_string()), + Value::Json(v) => OwnedValue::Json((*v).to_string()), + Value::Blob(v) => OwnedValue::Blob(v.to_vec()), + } + } +} + +impl<'v> From<&'v OwnedValue> for Value<'v> { + fn from(value: &OwnedValue) -> Value { + match value { + OwnedValue::Bool(v) => Value::Bool(*v), + OwnedValue::U64(v) => Value::U64(*v), + OwnedValue::I64(v) => Value::I64(*v), + OwnedValue::F64(v) => Value::F64(OrderedFloat::from(*v)), + OwnedValue::Instant(v) => Value::Instant(*v), + OwnedValue::Uuid(v) => Value::Uuid(v.as_bytes()), + OwnedValue::Str(v) => Value::Str(v), + OwnedValue::Json(v) => Value::Json(v), + OwnedValue::Blob(v) => Value::Blob(v), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_value_serialized_size() { + // | Value enum | tag: 1 byte | value_payload | + // |----------------------------------------------------------| + // | I64 | 1 | 8 | + // | U64 | 1 | 8 | + // | Bool | 1 | 1 | + // | Instant | 1 | 8 | + // | F64 | 1 | 8 | + // | Uuid | 1 | 16 | + // | Str/Blob/Json | 1 |(8: len + sizeof(payload))| + assert_eq!(Value::I64(-1000).serialized_size().unwrap(), 9); + assert_eq!(Value::U64(1000u64).serialized_size().unwrap(), 9); + assert_eq!(Value::Bool(true).serialized_size().unwrap(), 2); + assert_eq!( + Value::Instant(1_558_020_865_224).serialized_size().unwrap(), + 9 + ); + assert_eq!( + Value::F64(OrderedFloat(10000.1)).serialized_size().unwrap(), + 9 + ); + assert_eq!(Value::Str("hello!").serialized_size().unwrap(), 15); + assert_eq!(Value::Str("¡Hola").serialized_size().unwrap(), 15); + assert_eq!(Value::Blob(b"hello!").serialized_size().unwrap(), 15); + assert_eq!( + uuid(b"\x9f\xe2\xc4\xe9\x3f\x65\x4f\xdb\xb2\x4c\x02\xb1\x52\x59\x71\x6c") + .unwrap() + .serialized_size() + .unwrap(), + 17 + ); + } +} |