diff options
Diffstat (limited to 'toolkit/components/kvstore/src')
-rw-r--r-- | toolkit/components/kvstore/src/error.rs | 89 | ||||
-rw-r--r-- | toolkit/components/kvstore/src/lib.rs | 367 | ||||
-rw-r--r-- | toolkit/components/kvstore/src/owned_value.rs | 72 | ||||
-rw-r--r-- | toolkit/components/kvstore/src/task.rs | 727 |
4 files changed, 1255 insertions, 0 deletions
diff --git a/toolkit/components/kvstore/src/error.rs b/toolkit/components/kvstore/src/error.rs new file mode 100644 index 0000000000..82ed2ce0d8 --- /dev/null +++ b/toolkit/components/kvstore/src/error.rs @@ -0,0 +1,89 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use nserror::{ + nsresult, NS_ERROR_FAILURE, NS_ERROR_NOT_IMPLEMENTED, NS_ERROR_NO_INTERFACE, + NS_ERROR_NULL_POINTER, NS_ERROR_UNEXPECTED, +}; +use nsstring::nsCString; +use rkv::{MigrateError, StoreError}; +use std::{io::Error as IoError, str::Utf8Error, string::FromUtf16Error, sync::PoisonError}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum KeyValueError { + #[error("error converting string: {0:?}")] + ConvertBytes(#[from] Utf8Error), + + #[error("error converting string: {0:?}")] + ConvertString(#[from] FromUtf16Error), + + #[error("I/O error: {0:?}")] + IoError(#[from] IoError), + + #[error("migrate error: {0:?}")] + MigrateError(#[from] MigrateError), + + #[error("no interface '{0}'")] + NoInterface(&'static str), + + // NB: We can avoid storing the nsCString error description + // once nsresult is a real type with a Display implementation + // per https://bugzilla.mozilla.org/show_bug.cgi?id=1513350. + #[error("error result {0}")] + Nsresult(nsCString, nsresult), + + #[error("arg is null")] + NullPointer, + + #[error("poison error getting read/write lock")] + PoisonError, + + #[error("error reading key/value pair")] + Read, + + #[error("store error: {0:?}")] + StoreError(#[from] StoreError), + + #[error("unsupported owned value type")] + UnsupportedOwned, + + #[error("unexpected value")] + UnexpectedValue, + + #[error("unsupported variant type: {0}")] + UnsupportedVariant(u16), +} + +impl From<nsresult> for KeyValueError { + fn from(result: nsresult) -> KeyValueError { + KeyValueError::Nsresult(result.error_name(), result) + } +} + +impl From<KeyValueError> for nsresult { + fn from(err: KeyValueError) -> nsresult { + match err { + KeyValueError::ConvertBytes(_) => NS_ERROR_FAILURE, + KeyValueError::ConvertString(_) => NS_ERROR_FAILURE, + KeyValueError::IoError(_) => NS_ERROR_FAILURE, + KeyValueError::NoInterface(_) => NS_ERROR_NO_INTERFACE, + KeyValueError::Nsresult(_, result) => result, + KeyValueError::NullPointer => NS_ERROR_NULL_POINTER, + KeyValueError::PoisonError => NS_ERROR_UNEXPECTED, + KeyValueError::Read => NS_ERROR_FAILURE, + KeyValueError::StoreError(_) => NS_ERROR_FAILURE, + KeyValueError::MigrateError(_) => NS_ERROR_FAILURE, + KeyValueError::UnsupportedOwned => NS_ERROR_NOT_IMPLEMENTED, + KeyValueError::UnexpectedValue => NS_ERROR_UNEXPECTED, + KeyValueError::UnsupportedVariant(_) => NS_ERROR_NOT_IMPLEMENTED, + } + } +} + +impl<T> From<PoisonError<T>> for KeyValueError { + fn from(_err: PoisonError<T>) -> KeyValueError { + KeyValueError::PoisonError + } +} diff --git a/toolkit/components/kvstore/src/lib.rs b/toolkit/components/kvstore/src/lib.rs new file mode 100644 index 0000000000..5601ecb12a --- /dev/null +++ b/toolkit/components/kvstore/src/lib.rs @@ -0,0 +1,367 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +extern crate atomic_refcell; +extern crate crossbeam_utils; +#[macro_use] +extern crate cstr; +extern crate libc; +extern crate log; +extern crate moz_task; +extern crate nserror; +extern crate nsstring; +extern crate rkv; +extern crate storage_variant; +extern crate tempfile; +extern crate thin_vec; +extern crate thiserror; +extern crate xpcom; + +mod error; +mod owned_value; +mod task; + +use atomic_refcell::AtomicRefCell; +use error::KeyValueError; +use libc::c_void; +use moz_task::{create_background_task_queue, DispatchOptions, TaskRunnable}; +use nserror::{nsresult, NS_ERROR_FAILURE, NS_OK}; +use nsstring::{nsACString, nsCString}; +use owned_value::{owned_to_variant, variant_to_owned}; +use rkv::backend::{SafeModeDatabase, SafeModeEnvironment}; +use rkv::OwnedValue; +use std::{ + ptr, + sync::{Arc, RwLock}, + vec::IntoIter, +}; +use task::{ + ClearTask, DeleteTask, EnumerateTask, GetOrCreateTask, GetTask, HasTask, PutTask, WriteManyTask, +}; +use thin_vec::ThinVec; +use xpcom::{ + getter_addrefs, + interfaces::{ + nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValuePair, + nsIKeyValueVariantCallback, nsIKeyValueVoidCallback, nsISerialEventTarget, nsIVariant, + }, + nsIID, xpcom, xpcom_method, RefPtr, +}; + +type Rkv = rkv::Rkv<SafeModeEnvironment>; +type SingleStore = rkv::SingleStore<SafeModeDatabase>; +type KeyValuePairResult = Result<(String, OwnedValue), KeyValueError>; + +#[no_mangle] +pub unsafe extern "C" fn nsKeyValueServiceConstructor( + iid: &nsIID, + result: *mut *mut c_void, +) -> nsresult { + *result = ptr::null_mut(); + + let service = KeyValueService::new(); + service.QueryInterface(iid, result) +} + +// For each public XPCOM method in the nsIKeyValue* interfaces, we implement +// a pair of Rust methods: +// +// 1. a method named after the XPCOM (as modified by the XPIDL parser, i.e. +// by capitalization of its initial letter) that returns an nsresult; +// +// 2. a method with a Rust-y name that returns a Result<(), KeyValueError>. +// +// XPCOM calls the first method, which is only responsible for calling +// the second one and converting its Result to an nsresult (logging errors +// in the process). The second method is responsible for doing the work. +// +// For example, given an XPCOM method FooBar, we implement a method FooBar +// that calls a method foo_bar. foo_bar returns a Result<(), KeyValueError>, +// and FooBar converts that to an nsresult. +// +// This design allows us to use Rust idioms like the question mark operator +// to simplify the implementation in the second method while returning XPCOM- +// compatible nsresult values to XPCOM callers. +// +// The XPCOM methods are implemented using the xpcom_method! declarative macro +// from the xpcom crate. + +#[xpcom(implement(nsIKeyValueService), atomic)] +pub struct KeyValueService {} + +impl KeyValueService { + fn new() -> RefPtr<KeyValueService> { + KeyValueService::allocate(InitKeyValueService {}) + } + + xpcom_method!( + get_or_create => GetOrCreate( + callback: *const nsIKeyValueDatabaseCallback, + path: *const nsACString, + name: *const nsACString + ) + ); + + fn get_or_create( + &self, + callback: &nsIKeyValueDatabaseCallback, + path: &nsACString, + name: &nsACString, + ) -> Result<(), nsresult> { + let task = Box::new(GetOrCreateTask::new( + RefPtr::new(callback), + nsCString::from(path), + nsCString::from(name), + )); + + TaskRunnable::new("KVService::GetOrCreate", task)? + .dispatch_background_task_with_options(DispatchOptions::default().may_block(true)) + } +} + +#[xpcom(implement(nsIKeyValueDatabase), atomic)] +pub struct KeyValueDatabase { + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + queue: RefPtr<nsISerialEventTarget>, +} + +impl KeyValueDatabase { + fn new( + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + ) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> { + let queue = create_background_task_queue(cstr!("KeyValueDatabase"))?; + Ok(KeyValueDatabase::allocate(InitKeyValueDatabase { + rkv, + store, + queue, + })) + } + + xpcom_method!( + put => Put( + callback: *const nsIKeyValueVoidCallback, + key: *const nsACString, + value: *const nsIVariant + ) + ); + + fn put( + &self, + callback: &nsIKeyValueVoidCallback, + key: &nsACString, + value: &nsIVariant, + ) -> Result<(), nsresult> { + let value = variant_to_owned(value)?.ok_or(KeyValueError::UnexpectedValue)?; + + let task = Box::new(PutTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + nsCString::from(key), + value, + )); + + TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, &self.queue) + } + + xpcom_method!( + write_many => WriteMany( + callback: *const nsIKeyValueVoidCallback, + pairs: *const ThinVec<Option<RefPtr<nsIKeyValuePair>>> + ) + ); + + fn write_many( + &self, + callback: &nsIKeyValueVoidCallback, + pairs: &ThinVec<Option<RefPtr<nsIKeyValuePair>>>, + ) -> Result<(), nsresult> { + let mut entries = Vec::with_capacity(pairs.len()); + + for pair in pairs { + let pair = pair + .as_ref() + .ok_or(nsresult::from(KeyValueError::UnexpectedValue))?; + + let mut key = nsCString::new(); + unsafe { pair.GetKey(&mut *key) }.to_result()?; + if key.is_empty() { + return Err(nsresult::from(KeyValueError::UnexpectedValue)); + } + + let val: RefPtr<nsIVariant> = getter_addrefs(|p| unsafe { pair.GetValue(p) })?; + let value = variant_to_owned(&val)?; + entries.push((key, value)); + } + + let task = Box::new(WriteManyTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + entries, + )); + + TaskRunnable::dispatch( + TaskRunnable::new("KVDatabase::WriteMany", task)?, + &self.queue, + ) + } + + xpcom_method!( + get => Get( + callback: *const nsIKeyValueVariantCallback, + key: *const nsACString, + default_value: *const nsIVariant + ) + ); + + fn get( + &self, + callback: &nsIKeyValueVariantCallback, + key: &nsACString, + default_value: &nsIVariant, + ) -> Result<(), nsresult> { + let task = Box::new(GetTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + nsCString::from(key), + variant_to_owned(default_value)?, + )); + + TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, &self.queue) + } + + xpcom_method!( + has => Has(callback: *const nsIKeyValueVariantCallback, key: *const nsACString) + ); + + fn has(&self, callback: &nsIKeyValueVariantCallback, key: &nsACString) -> Result<(), nsresult> { + let task = Box::new(HasTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + nsCString::from(key), + )); + + TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, &self.queue) + } + + xpcom_method!( + delete => Delete(callback: *const nsIKeyValueVoidCallback, key: *const nsACString) + ); + + fn delete(&self, callback: &nsIKeyValueVoidCallback, key: &nsACString) -> Result<(), nsresult> { + let task = Box::new(DeleteTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + nsCString::from(key), + )); + + TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, &self.queue) + } + + xpcom_method!( + clear => Clear(callback: *const nsIKeyValueVoidCallback) + ); + + fn clear(&self, callback: &nsIKeyValueVoidCallback) -> Result<(), nsresult> { + let task = Box::new(ClearTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + )); + + TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, &self.queue) + } + + xpcom_method!( + enumerate => Enumerate( + callback: *const nsIKeyValueEnumeratorCallback, + from_key: *const nsACString, + to_key: *const nsACString + ) + ); + + fn enumerate( + &self, + callback: &nsIKeyValueEnumeratorCallback, + from_key: &nsACString, + to_key: &nsACString, + ) -> Result<(), nsresult> { + let task = Box::new(EnumerateTask::new( + RefPtr::new(callback), + Arc::clone(&self.rkv), + self.store, + nsCString::from(from_key), + nsCString::from(to_key), + )); + + TaskRunnable::dispatch( + TaskRunnable::new("KVDatabase::Enumerate", task)?, + &self.queue, + ) + } +} + +#[xpcom(implement(nsIKeyValueEnumerator), atomic)] +pub struct KeyValueEnumerator { + iter: AtomicRefCell<IntoIter<KeyValuePairResult>>, +} + +impl KeyValueEnumerator { + fn new(pairs: Vec<KeyValuePairResult>) -> RefPtr<KeyValueEnumerator> { + KeyValueEnumerator::allocate(InitKeyValueEnumerator { + iter: AtomicRefCell::new(pairs.into_iter()), + }) + } + + xpcom_method!(has_more_elements => HasMoreElements() -> bool); + + fn has_more_elements(&self) -> Result<bool, KeyValueError> { + Ok(!self.iter.borrow().as_slice().is_empty()) + } + + xpcom_method!(get_next => GetNext() -> *const nsIKeyValuePair); + + fn get_next(&self) -> Result<RefPtr<nsIKeyValuePair>, KeyValueError> { + let mut iter = self.iter.borrow_mut(); + let (key, value) = iter + .next() + .ok_or_else(|| KeyValueError::from(NS_ERROR_FAILURE))??; + + // We fail on retrieval of the key/value pair if the key isn't valid + // UTF-*, if the value is unexpected, or if we encountered a store error + // while retrieving the pair. + Ok(RefPtr::new( + KeyValuePair::new(key, value).coerce::<nsIKeyValuePair>(), + )) + } +} + +#[xpcom(implement(nsIKeyValuePair), atomic)] +pub struct KeyValuePair { + key: String, + value: OwnedValue, +} + +impl KeyValuePair { + fn new(key: String, value: OwnedValue) -> RefPtr<KeyValuePair> { + KeyValuePair::allocate(InitKeyValuePair { key, value }) + } + + xpcom_method!(get_key => GetKey() -> nsACString); + xpcom_method!(get_value => GetValue() -> *const nsIVariant); + + fn get_key(&self) -> Result<nsCString, KeyValueError> { + Ok(nsCString::from(&self.key)) + } + + fn get_value(&self) -> Result<RefPtr<nsIVariant>, KeyValueError> { + Ok(owned_to_variant(self.value.clone())?) + } +} diff --git a/toolkit/components/kvstore/src/owned_value.rs b/toolkit/components/kvstore/src/owned_value.rs new file mode 100644 index 0000000000..a22abcb8da --- /dev/null +++ b/toolkit/components/kvstore/src/owned_value.rs @@ -0,0 +1,72 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use error::KeyValueError; +use nsstring::{nsCString, nsString}; +use rkv::OwnedValue; +use std::convert::TryInto; +use storage_variant::{DataType, NsIVariantExt, VariantType}; +use xpcom::{interfaces::nsIVariant, RefPtr}; + +pub fn owned_to_variant(owned: OwnedValue) -> Result<RefPtr<nsIVariant>, KeyValueError> { + match owned { + OwnedValue::Bool(val) => Ok(val.into_variant()), + OwnedValue::I64(val) => Ok(val.into_variant()), + OwnedValue::F64(val) => Ok(val.into_variant()), + OwnedValue::Str(ref val) => Ok(nsString::from(val).into_variant()), + + // kvstore doesn't (yet?) support these types of OwnedValue, + // and we should never encounter them, but we need to exhaust + // all possible variants of the OwnedValue enum. + OwnedValue::Instant(_) => Err(KeyValueError::UnsupportedOwned), + OwnedValue::Json(_) => Err(KeyValueError::UnsupportedOwned), + OwnedValue::U64(_) => Err(KeyValueError::UnsupportedOwned), + OwnedValue::Uuid(_) => Err(KeyValueError::UnsupportedOwned), + OwnedValue::Blob(_) => Err(KeyValueError::UnsupportedOwned), + } +} + +pub fn variant_to_owned(variant: &nsIVariant) -> Result<Option<OwnedValue>, KeyValueError> { + let data_type = variant.get_data_type(); + + match data_type.try_into() { + Ok(DataType::Int32) => { + let mut val: i32 = 0; + unsafe { variant.GetAsInt32(&mut val) }.to_result()?; + Ok(Some(OwnedValue::I64(val.into()))) + } + Ok(DataType::Int64) => { + let mut val: i64 = 0; + unsafe { variant.GetAsInt64(&mut val) }.to_result()?; + Ok(Some(OwnedValue::I64(val))) + } + Ok(DataType::Double) => { + let mut val: f64 = 0.0; + unsafe { variant.GetAsDouble(&mut val) }.to_result()?; + Ok(Some(OwnedValue::F64(val))) + } + Ok(DataType::CString) + | Ok(DataType::CharStr) + | Ok(DataType::StringSizeIs) + | Ok(DataType::Utf8String) => { + let mut val: nsCString = nsCString::new(); + unsafe { variant.GetAsAUTF8String(&mut *val) }.to_result()?; + let s = std::str::from_utf8(&*val)?; + Ok(Some(OwnedValue::Str(s.into()))) + } + Ok(DataType::AString) | Ok(DataType::WCharStr) | Ok(DataType::WStringSizeIs) => { + let mut val: nsString = nsString::new(); + unsafe { variant.GetAsAString(&mut *val) }.to_result()?; + let str = String::from_utf16(&val)?; + Ok(Some(OwnedValue::Str(str))) + } + Ok(DataType::Bool) => { + let mut val: bool = false; + unsafe { variant.GetAsBool(&mut val) }.to_result()?; + Ok(Some(OwnedValue::Bool(val))) + } + Ok(DataType::Void) | Ok(DataType::EmptyArray) | Ok(DataType::Empty) => Ok(None), + Err(_) => Err(KeyValueError::UnsupportedVariant(data_type)), + } +} diff --git a/toolkit/components/kvstore/src/task.rs b/toolkit/components/kvstore/src/task.rs new file mode 100644 index 0000000000..3608dc9665 --- /dev/null +++ b/toolkit/components/kvstore/src/task.rs @@ -0,0 +1,727 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +extern crate xpcom; + +use crossbeam_utils::atomic::AtomicCell; +use error::KeyValueError; +use moz_task::Task; +use nserror::{nsresult, NS_ERROR_FAILURE}; +use nsstring::nsCString; +use owned_value::owned_to_variant; +use rkv::backend::{BackendInfo, SafeMode, SafeModeDatabase, SafeModeEnvironment}; +use rkv::{OwnedValue, StoreError, StoreOptions, Value}; +use std::{ + path::Path, + str, + sync::{Arc, RwLock}, +}; +use storage_variant::VariantType; +use xpcom::{ + interfaces::{ + nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback, + nsIKeyValueVoidCallback, nsIVariant, + }, + RefPtr, ThreadBoundRefPtr, +}; +use KeyValueDatabase; +use KeyValueEnumerator; +use KeyValuePairResult; + +type Manager = rkv::Manager<SafeModeEnvironment>; +type Rkv = rkv::Rkv<SafeModeEnvironment>; +type SingleStore = rkv::SingleStore<SafeModeDatabase>; + +/// A macro to generate a done() implementation for a Task. +/// Takes one argument that specifies the type of the Task's callback function: +/// value: a callback function that takes a value +/// void: the callback function doesn't take a value +/// +/// The "value" variant calls self.convert() to convert a successful result +/// into the value to pass to the callback function. So if you generate done() +/// for a callback that takes a value, ensure you also implement convert()! +macro_rules! task_done { + (value) => { + fn done(&self) -> Result<(), nsresult> { + // If TaskRunnable calls Task.done() to return a result on the + // main thread before TaskRunnable returns on the database thread, + // then the Task will get dropped on the database thread. + // + // But the callback is an nsXPCWrappedJS that isn't safe to release + // on the database thread. So we move it out of the Task here to ensure + // it gets released on the main thread. + let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?; + let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?; + + match self.result.swap(None) { + Some(Ok(value)) => unsafe { callback.Resolve(self.convert(value)?.coerce()) }, + Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) }, + None => unsafe { callback.Reject(&*nsCString::from("unexpected")) }, + } + .to_result() + } + }; + + (void) => { + fn done(&self) -> Result<(), nsresult> { + // If TaskRunnable calls Task.done() to return a result on the + // main thread before TaskRunnable returns on the database thread, + // then the Task will get dropped on the database thread. + // + // But the callback is an nsXPCWrappedJS that isn't safe to release + // on the database thread. So we move it out of the Task here to ensure + // it gets released on the main thread. + let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?; + let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?; + + match self.result.swap(None) { + Some(Ok(())) => unsafe { callback.Resolve() }, + Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) }, + None => unsafe { callback.Reject(&*nsCString::from("unexpected")) }, + } + .to_result() + } + }; +} + +/// A tuple comprising an Arc<RwLock<Rkv>> and a SingleStore, which is +/// the result of GetOrCreateTask. We declare this type because otherwise +/// Clippy complains "error: very complex type used. Consider factoring +/// parts into `type` definitions" (i.e. clippy::type-complexity) when we +/// declare the type of `GetOrCreateTask::result`. +type RkvStoreTuple = (Arc<RwLock<Rkv>>, SingleStore); + +// The threshold for active resizing. +const RESIZE_RATIO: f32 = 0.85; + +/// The threshold (50 MB) to switch the resizing policy from the double size to +/// the constant increment for active resizing. +const INCREMENTAL_RESIZE_THRESHOLD: usize = 52_428_800; + +/// The incremental resize step (5 MB) +const INCREMENTAL_RESIZE_STEP: usize = 5_242_880; + +/// The RKV disk page size and mask. +const PAGE_SIZE: usize = 4096; +const PAGE_SIZE_MASK: usize = 0b_1111_1111_1111; + +/// Round the non-zero size to the multiple of page size greater or equal. +/// +/// It does not handle the special cases such as size zero and overflow, +/// because even if that happens (extremely unlikely though), RKV will +/// ignore the new size if it's smaller than the current size. +/// +/// E.g: +/// [ 1 - 4096] -> 4096, +/// [4097 - 8192] -> 8192, +/// [8193 - 12286] -> 12286, +fn round_to_pagesize(size: usize) -> usize { + if size & PAGE_SIZE_MASK == 0 { + size + } else { + (size & !PAGE_SIZE_MASK) + PAGE_SIZE + } +} + +/// Kvstore employes two auto resizing strategies: active and passive resizing. +/// They work together to liberate consumers from having to guess the "proper" +/// size of the store upfront. See more detail about this in Bug 1543861. +/// +/// Active resizing that is performed at the store startup. +/// +/// It either increases the size in double, or by a constant size if its size +/// reaches INCREMENTAL_RESIZE_THRESHOLD. +/// +/// Note that on Linux / MAC OSX, the increased size would only take effect if +/// there is a write transaction committed afterwards. +fn active_resize(env: &Rkv) -> Result<(), StoreError> { + let info = env.info()?; + let current_size = info.map_size(); + + let size = if current_size < INCREMENTAL_RESIZE_THRESHOLD { + current_size << 1 + } else { + current_size + INCREMENTAL_RESIZE_STEP + }; + + env.set_map_size(size)?; + Ok(()) +} + +/// Passive resizing that is performed when the MAP_FULL error occurs. It +/// increases the store with a `wanted` size. +/// +/// Note that the `wanted` size must be rounded to a multiple of page size +/// by using `round_to_pagesize`. +fn passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError> { + let info = env.info()?; + let current_size = info.map_size(); + env.set_map_size(current_size + wanted)?; + Ok(()) +} + +pub struct GetOrCreateTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueDatabaseCallback>>>, + path: nsCString, + name: nsCString, + result: AtomicCell<Option<Result<RkvStoreTuple, KeyValueError>>>, +} + +impl GetOrCreateTask { + pub fn new( + callback: RefPtr<nsIKeyValueDatabaseCallback>, + path: nsCString, + name: nsCString, + ) -> GetOrCreateTask { + GetOrCreateTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + path, + name, + result: AtomicCell::default(), + } + } + + fn convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> { + Ok(KeyValueDatabase::new(result.0, result.1)?) + } +} + +impl Task for GetOrCreateTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result + .store(Some(|| -> Result<RkvStoreTuple, KeyValueError> { + let store; + let mut manager = Manager::singleton().write()?; + // Note that path canonicalization is diabled to work around crashes on Fennec: + // https://bugzilla.mozilla.org/show_bug.cgi?id=1531887 + let path = Path::new(str::from_utf8(&self.path)?); + let rkv = manager.get_or_create(path, Rkv::new::<SafeMode>)?; + { + let env = rkv.read()?; + let load_ratio = env.load_ratio()?.unwrap_or(0.0); + if load_ratio > RESIZE_RATIO { + active_resize(&env)?; + } + store = env.open_single(str::from_utf8(&self.name)?, StoreOptions::create())?; + } + Ok((rkv, store)) + }())); + } + + task_done!(value); +} + +pub struct PutTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + value: OwnedValue, + result: AtomicCell<Option<Result<(), KeyValueError>>>, +} + +impl PutTask { + pub fn new( + callback: RefPtr<nsIKeyValueVoidCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + value: OwnedValue, + ) -> PutTask { + PutTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + key, + value, + result: AtomicCell::default(), + } + } +} + +impl Task for PutTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some(|| -> Result<(), KeyValueError> { + let env = self.rkv.read()?; + let key = str::from_utf8(&self.key)?; + let v = Value::from(&self.value); + let mut resized = false; + + // Use a loop here in case we want to retry from a recoverable + // error such as `StoreError::MapFull`. + loop { + let mut writer = env.write()?; + + match self.store.put(&mut writer, key, &v) { + Ok(_) => (), + + // Only handle the first MapFull error via passive resizing. + // Propogate the subsequent MapFull error. + Err(StoreError::MapFull) if !resized => { + // abort the failed transaction for resizing. + writer.abort(); + + // calculate the size of pairs and resize the store accordingly. + let pair_size = + key.len() + v.serialized_size().map_err(StoreError::from)? as usize; + let wanted = round_to_pagesize(pair_size); + passive_resize(&env, wanted)?; + resized = true; + continue; + } + + Err(err) => return Err(KeyValueError::StoreError(err)), + } + + // Ignore errors caused by simultaneous access. + // We intend to investigate/revert this in bug 1810212. + match writer.commit() { + Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { + // Explicitly ignore errors from simultaneous access. + } + Err(e) => return Err(From::from(e)), + _ => (), + }; + break; + } + + Ok(()) + }())); + } + + task_done!(void); +} + +pub struct WriteManyTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + pairs: Vec<(nsCString, Option<OwnedValue>)>, + result: AtomicCell<Option<Result<(), KeyValueError>>>, +} + +impl WriteManyTask { + pub fn new( + callback: RefPtr<nsIKeyValueVoidCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + pairs: Vec<(nsCString, Option<OwnedValue>)>, + ) -> WriteManyTask { + WriteManyTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + pairs, + result: AtomicCell::default(), + } + } + + fn calc_pair_size(&self) -> Result<usize, StoreError> { + let mut total = 0; + + for (key, value) in self.pairs.iter() { + if let Some(val) = value { + total += key.len(); + total += Value::from(val) + .serialized_size() + .map_err(StoreError::from)? as usize; + } + } + + Ok(total) + } +} + +impl Task for WriteManyTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some(|| -> Result<(), KeyValueError> { + let env = self.rkv.read()?; + let mut resized = false; + + // Use a loop here in case we want to retry from a recoverable + // error such as `StoreError::MapFull`. + 'outer: loop { + let mut writer = env.write()?; + + for (key, value) in self.pairs.iter() { + let key = str::from_utf8(key)?; + match value { + // To put. + Some(val) => { + match self.store.put(&mut writer, key, &Value::from(val)) { + Ok(_) => (), + + // Only handle the first MapFull error via passive resizing. + // Propogate the subsequent MapFull error. + Err(StoreError::MapFull) if !resized => { + // Abort the failed transaction for resizing. + writer.abort(); + + // Calculate the size of pairs and resize accordingly. + let pair_size = self.calc_pair_size()?; + let wanted = round_to_pagesize(pair_size); + passive_resize(&env, wanted)?; + resized = true; + continue 'outer; + } + + Err(err) => return Err(KeyValueError::StoreError(err)), + } + } + // To delete. + None => { + match self.store.delete(&mut writer, key) { + Ok(_) => (), + + // RKV fails with an error if the key to delete wasn't found, + // and Rkv returns that error, but we ignore it, as we expect most + // of our consumers to want this behavior. + Err(StoreError::KeyValuePairNotFound) => (), + + Err(err) => return Err(KeyValueError::StoreError(err)), + }; + } + } + } + + // Ignore errors caused by simultaneous access. + // We intend to investigate/revert this in bug 1810212. + match writer.commit() { + Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { + // Explicitly ignore errors from simultaneous access. + } + Err(e) => return Err(From::from(e)), + _ => (), + }; + break; // 'outer: loop + } + + Ok(()) + }())); + } + + task_done!(void); +} + +pub struct GetTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + default_value: Option<OwnedValue>, + result: AtomicCell<Option<Result<Option<OwnedValue>, KeyValueError>>>, +} + +impl GetTask { + pub fn new( + callback: RefPtr<nsIKeyValueVariantCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + default_value: Option<OwnedValue>, + ) -> GetTask { + GetTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + key, + default_value, + result: AtomicCell::default(), + } + } + + fn convert(&self, result: Option<OwnedValue>) -> Result<RefPtr<nsIVariant>, KeyValueError> { + Ok(match result { + Some(val) => owned_to_variant(val)?, + None => ().into_variant(), + }) + } +} + +impl Task for GetTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result + .store(Some(|| -> Result<Option<OwnedValue>, KeyValueError> { + let key = str::from_utf8(&self.key)?; + let env = self.rkv.read()?; + let reader = env.read()?; + let value = self.store.get(&reader, key)?; + + Ok(match value { + Some(value) => Some(OwnedValue::from(&value)), + None => match self.default_value { + Some(ref val) => Some(val.clone()), + None => None, + }, + }) + }())); + } + + task_done!(value); +} + +pub struct HasTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + result: AtomicCell<Option<Result<bool, KeyValueError>>>, +} + +impl HasTask { + pub fn new( + callback: RefPtr<nsIKeyValueVariantCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + ) -> HasTask { + HasTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + key, + result: AtomicCell::default(), + } + } + + fn convert(&self, result: bool) -> Result<RefPtr<nsIVariant>, KeyValueError> { + Ok(result.into_variant()) + } +} + +impl Task for HasTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some(|| -> Result<bool, KeyValueError> { + let key = str::from_utf8(&self.key)?; + let env = self.rkv.read()?; + let reader = env.read()?; + let value = self.store.get(&reader, key)?; + Ok(value.is_some()) + }())); + } + + task_done!(value); +} + +pub struct DeleteTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + result: AtomicCell<Option<Result<(), KeyValueError>>>, +} + +impl DeleteTask { + pub fn new( + callback: RefPtr<nsIKeyValueVoidCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + key: nsCString, + ) -> DeleteTask { + DeleteTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + key, + result: AtomicCell::default(), + } + } +} + +impl Task for DeleteTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some(|| -> Result<(), KeyValueError> { + let key = str::from_utf8(&self.key)?; + let env = self.rkv.read()?; + let mut writer = env.write()?; + + match self.store.delete(&mut writer, key) { + Ok(_) => (), + + // RKV fails with an error if the key to delete wasn't found, + // and Rkv returns that error, but we ignore it, as we expect most + // of our consumers to want this behavior. + Err(StoreError::KeyValuePairNotFound) => (), + + Err(err) => return Err(KeyValueError::StoreError(err)), + }; + + // Ignore errors caused by simultaneous access. + // We intend to investigate/revert this in bug 1810212. + match writer.commit() { + Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { + // Explicitly ignore errors from simultaneous access. + } + Err(e) => return Err(From::from(e)), + _ => (), + }; + + Ok(()) + }())); + } + + task_done!(void); +} + +pub struct ClearTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + result: AtomicCell<Option<Result<(), KeyValueError>>>, +} + +impl ClearTask { + pub fn new( + callback: RefPtr<nsIKeyValueVoidCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + ) -> ClearTask { + ClearTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + result: AtomicCell::default(), + } + } +} + +impl Task for ClearTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some(|| -> Result<(), KeyValueError> { + let env = self.rkv.read()?; + let mut writer = env.write()?; + self.store.clear(&mut writer)?; + // Ignore errors caused by simultaneous access. + // We intend to investigate/revert this in bug 1810212. + match writer.commit() { + Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { + // Explicitly ignore errors from simultaneous access. + } + Err(e) => return Err(From::from(e)), + _ => (), + }; + + Ok(()) + }())); + } + + task_done!(void); +} + +pub struct EnumerateTask { + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueEnumeratorCallback>>>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + from_key: nsCString, + to_key: nsCString, + result: AtomicCell<Option<Result<Vec<KeyValuePairResult>, KeyValueError>>>, +} + +impl EnumerateTask { + pub fn new( + callback: RefPtr<nsIKeyValueEnumeratorCallback>, + rkv: Arc<RwLock<Rkv>>, + store: SingleStore, + from_key: nsCString, + to_key: nsCString, + ) -> EnumerateTask { + EnumerateTask { + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + rkv, + store, + from_key, + to_key, + result: AtomicCell::default(), + } + } + + fn convert( + &self, + result: Vec<KeyValuePairResult>, + ) -> Result<RefPtr<KeyValueEnumerator>, KeyValueError> { + Ok(KeyValueEnumerator::new(result)) + } +} + +impl Task for EnumerateTask { + fn run(&self) { + // We do the work within a closure that returns a Result so we can + // use the ? operator to simplify the implementation. + self.result.store(Some( + || -> Result<Vec<KeyValuePairResult>, KeyValueError> { + let env = self.rkv.read()?; + let reader = env.read()?; + let from_key = str::from_utf8(&self.from_key)?; + let to_key = str::from_utf8(&self.to_key)?; + + let iterator = if from_key.is_empty() { + self.store.iter_start(&reader)? + } else { + self.store.iter_from(&reader, &from_key)? + }; + + // Ideally, we'd enumerate pairs lazily, as the consumer calls + // nsIKeyValueEnumerator.getNext(), which calls our + // KeyValueEnumerator.get_next() implementation. But KeyValueEnumerator + // can't reference the Iter because Rust "cannot #[derive(xpcom)] + // on a generic type," and the Iter requires a lifetime parameter, + // which would make KeyValueEnumerator generic. + // + // Our fallback approach is to eagerly collect the iterator + // into a collection that KeyValueEnumerator owns. Fixing this so we + // enumerate pairs lazily is bug 1499252. + let pairs: Vec<KeyValuePairResult> = iterator + // Convert the key to a string so we can compare it to the "to" key. + // For forward compatibility, we don't fail here if we can't convert + // a key to UTF-8. Instead, we store the Err in the collection + // and fail lazily in KeyValueEnumerator.get_next(). + .map(|result| match result { + Ok((key, val)) => Ok((str::from_utf8(&key), val)), + Err(err) => Err(err), + }) + // Stop iterating once we reach the to_key, if any. + .take_while(|result| match result { + Ok((key, _val)) => { + if to_key.is_empty() { + true + } else { + match *key { + Ok(key) => key < to_key, + Err(_err) => true, + } + } + } + Err(_) => true, + }) + // Convert the key/value pair to owned. + .map(|result| match result { + Ok((key, val)) => match (key, val) { + (Ok(key), val) => Ok((key.to_owned(), OwnedValue::from(&val))), + (Err(err), _) => Err(err.into()), + }, + Err(err) => Err(KeyValueError::StoreError(err)), + }) + .collect(); + + Ok(pairs) + }(), + )); + } + + task_done!(value); +} |