/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ extern crate xpcom; use crossbeam_utils::atomic::AtomicCell; use error::KeyValueError; use moz_task::Task; use nserror::{nsresult, NS_ERROR_FAILURE}; use nsstring::nsCString; use owned_value::owned_to_variant; use rkv::backend::{BackendInfo, SafeMode, SafeModeDatabase, SafeModeEnvironment}; use rkv::{OwnedValue, StoreError, StoreOptions, Value}; use std::{ path::Path, str, sync::{Arc, RwLock}, }; use storage_variant::VariantType; use xpcom::{ interfaces::{ nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback, nsIKeyValueVoidCallback, nsIVariant, }, RefPtr, ThreadBoundRefPtr, }; use KeyValueDatabase; use KeyValueEnumerator; use KeyValuePairResult; type Manager = rkv::Manager; type Rkv = rkv::Rkv; type SingleStore = rkv::SingleStore; /// A macro to generate a done() implementation for a Task. /// Takes one argument that specifies the type of the Task's callback function: /// value: a callback function that takes a value /// void: the callback function doesn't take a value /// /// The "value" variant calls self.convert() to convert a successful result /// into the value to pass to the callback function. So if you generate done() /// for a callback that takes a value, ensure you also implement convert()! macro_rules! task_done { (value) => { fn done(&self) -> Result<(), nsresult> { // If TaskRunnable calls Task.done() to return a result on the // main thread before TaskRunnable returns on the database thread, // then the Task will get dropped on the database thread. // // But the callback is an nsXPCWrappedJS that isn't safe to release // on the database thread. So we move it out of the Task here to ensure // it gets released on the main thread. let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?; let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?; match self.result.swap(None) { Some(Ok(value)) => unsafe { callback.Resolve(self.convert(value)?.coerce()) }, Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) }, None => unsafe { callback.Reject(&*nsCString::from("unexpected")) }, } .to_result() } }; (void) => { fn done(&self) -> Result<(), nsresult> { // If TaskRunnable calls Task.done() to return a result on the // main thread before TaskRunnable returns on the database thread, // then the Task will get dropped on the database thread. // // But the callback is an nsXPCWrappedJS that isn't safe to release // on the database thread. So we move it out of the Task here to ensure // it gets released on the main thread. let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?; let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?; match self.result.swap(None) { Some(Ok(())) => unsafe { callback.Resolve() }, Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) }, None => unsafe { callback.Reject(&*nsCString::from("unexpected")) }, } .to_result() } }; } /// A tuple comprising an Arc> and a SingleStore, which is /// the result of GetOrCreateTask. We declare this type because otherwise /// Clippy complains "error: very complex type used. Consider factoring /// parts into `type` definitions" (i.e. clippy::type-complexity) when we /// declare the type of `GetOrCreateTask::result`. type RkvStoreTuple = (Arc>, SingleStore); // The threshold for active resizing. const RESIZE_RATIO: f32 = 0.85; /// The threshold (50 MB) to switch the resizing policy from the double size to /// the constant increment for active resizing. const INCREMENTAL_RESIZE_THRESHOLD: usize = 52_428_800; /// The incremental resize step (5 MB) const INCREMENTAL_RESIZE_STEP: usize = 5_242_880; /// The RKV disk page size and mask. const PAGE_SIZE: usize = 4096; const PAGE_SIZE_MASK: usize = 0b_1111_1111_1111; /// Round the non-zero size to the multiple of page size greater or equal. /// /// It does not handle the special cases such as size zero and overflow, /// because even if that happens (extremely unlikely though), RKV will /// ignore the new size if it's smaller than the current size. /// /// E.g: /// [ 1 - 4096] -> 4096, /// [4097 - 8192] -> 8192, /// [8193 - 12286] -> 12286, fn round_to_pagesize(size: usize) -> usize { if size & PAGE_SIZE_MASK == 0 { size } else { (size & !PAGE_SIZE_MASK) + PAGE_SIZE } } /// Kvstore employes two auto resizing strategies: active and passive resizing. /// They work together to liberate consumers from having to guess the "proper" /// size of the store upfront. See more detail about this in Bug 1543861. /// /// Active resizing that is performed at the store startup. /// /// It either increases the size in double, or by a constant size if its size /// reaches INCREMENTAL_RESIZE_THRESHOLD. /// /// Note that on Linux / MAC OSX, the increased size would only take effect if /// there is a write transaction committed afterwards. fn active_resize(env: &Rkv) -> Result<(), StoreError> { let info = env.info()?; let current_size = info.map_size(); let size = if current_size < INCREMENTAL_RESIZE_THRESHOLD { current_size << 1 } else { current_size + INCREMENTAL_RESIZE_STEP }; env.set_map_size(size)?; Ok(()) } /// Passive resizing that is performed when the MAP_FULL error occurs. It /// increases the store with a `wanted` size. /// /// Note that the `wanted` size must be rounded to a multiple of page size /// by using `round_to_pagesize`. fn passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError> { let info = env.info()?; let current_size = info.map_size(); env.set_map_size(current_size + wanted)?; Ok(()) } pub struct GetOrCreateTask { callback: AtomicCell>>, path: nsCString, name: nsCString, result: AtomicCell>>, } impl GetOrCreateTask { pub fn new( callback: RefPtr, path: nsCString, name: nsCString, ) -> GetOrCreateTask { GetOrCreateTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), path, name, result: AtomicCell::default(), } } fn convert(&self, result: RkvStoreTuple) -> Result, KeyValueError> { Ok(KeyValueDatabase::new(result.0, result.1)?) } } impl Task for GetOrCreateTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result .store(Some(|| -> Result { let store; let mut manager = Manager::singleton().write()?; // Note that path canonicalization is diabled to work around crashes on Fennec: // https://bugzilla.mozilla.org/show_bug.cgi?id=1531887 let path = Path::new(str::from_utf8(&self.path)?); let rkv = manager.get_or_create(path, Rkv::new::)?; { let env = rkv.read()?; let load_ratio = env.load_ratio()?.unwrap_or(0.0); if load_ratio > RESIZE_RATIO { active_resize(&env)?; } store = env.open_single(str::from_utf8(&self.name)?, StoreOptions::create())?; } Ok((rkv, store)) }())); } task_done!(value); } pub struct PutTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, key: nsCString, value: OwnedValue, result: AtomicCell>>, } impl PutTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, key: nsCString, value: OwnedValue, ) -> PutTask { PutTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, key, value, result: AtomicCell::default(), } } } impl Task for PutTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some(|| -> Result<(), KeyValueError> { let env = self.rkv.read()?; let key = str::from_utf8(&self.key)?; let v = Value::from(&self.value); let mut resized = false; // Use a loop here in case we want to retry from a recoverable // error such as `StoreError::MapFull`. loop { let mut writer = env.write()?; match self.store.put(&mut writer, key, &v) { Ok(_) => (), // Only handle the first MapFull error via passive resizing. // Propogate the subsequent MapFull error. Err(StoreError::MapFull) if !resized => { // abort the failed transaction for resizing. writer.abort(); // calculate the size of pairs and resize the store accordingly. let pair_size = key.len() + v.serialized_size().map_err(StoreError::from)? as usize; let wanted = round_to_pagesize(pair_size); passive_resize(&env, wanted)?; resized = true; continue; } Err(err) => return Err(KeyValueError::StoreError(err)), } // Ignore errors caused by simultaneous access. // We intend to investigate/revert this in bug 1810212. match writer.commit() { Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { // Explicitly ignore errors from simultaneous access. } Err(e) => return Err(From::from(e)), _ => (), }; break; } Ok(()) }())); } task_done!(void); } pub struct WriteManyTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, pairs: Vec<(nsCString, Option)>, result: AtomicCell>>, } impl WriteManyTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, pairs: Vec<(nsCString, Option)>, ) -> WriteManyTask { WriteManyTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, pairs, result: AtomicCell::default(), } } fn calc_pair_size(&self) -> Result { let mut total = 0; for (key, value) in self.pairs.iter() { if let Some(val) = value { total += key.len(); total += Value::from(val) .serialized_size() .map_err(StoreError::from)? as usize; } } Ok(total) } } impl Task for WriteManyTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some(|| -> Result<(), KeyValueError> { let env = self.rkv.read()?; let mut resized = false; // Use a loop here in case we want to retry from a recoverable // error such as `StoreError::MapFull`. 'outer: loop { let mut writer = env.write()?; for (key, value) in self.pairs.iter() { let key = str::from_utf8(key)?; match value { // To put. Some(val) => { match self.store.put(&mut writer, key, &Value::from(val)) { Ok(_) => (), // Only handle the first MapFull error via passive resizing. // Propogate the subsequent MapFull error. Err(StoreError::MapFull) if !resized => { // Abort the failed transaction for resizing. writer.abort(); // Calculate the size of pairs and resize accordingly. let pair_size = self.calc_pair_size()?; let wanted = round_to_pagesize(pair_size); passive_resize(&env, wanted)?; resized = true; continue 'outer; } Err(err) => return Err(KeyValueError::StoreError(err)), } } // To delete. None => { match self.store.delete(&mut writer, key) { Ok(_) => (), // RKV fails with an error if the key to delete wasn't found, // and Rkv returns that error, but we ignore it, as we expect most // of our consumers to want this behavior. Err(StoreError::KeyValuePairNotFound) => (), Err(err) => return Err(KeyValueError::StoreError(err)), }; } } } // Ignore errors caused by simultaneous access. // We intend to investigate/revert this in bug 1810212. match writer.commit() { Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { // Explicitly ignore errors from simultaneous access. } Err(e) => return Err(From::from(e)), _ => (), }; break; // 'outer: loop } Ok(()) }())); } task_done!(void); } pub struct GetTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, key: nsCString, default_value: Option, result: AtomicCell, KeyValueError>>>, } impl GetTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, key: nsCString, default_value: Option, ) -> GetTask { GetTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, key, default_value, result: AtomicCell::default(), } } fn convert(&self, result: Option) -> Result, KeyValueError> { Ok(match result { Some(val) => owned_to_variant(val)?, None => ().into_variant(), }) } } impl Task for GetTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result .store(Some(|| -> Result, KeyValueError> { let key = str::from_utf8(&self.key)?; let env = self.rkv.read()?; let reader = env.read()?; let value = self.store.get(&reader, key)?; Ok(match value { Some(value) => Some(OwnedValue::from(&value)), None => match self.default_value { Some(ref val) => Some(val.clone()), None => None, }, }) }())); } task_done!(value); } pub struct HasTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, key: nsCString, result: AtomicCell>>, } impl HasTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, key: nsCString, ) -> HasTask { HasTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, key, result: AtomicCell::default(), } } fn convert(&self, result: bool) -> Result, KeyValueError> { Ok(result.into_variant()) } } impl Task for HasTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some(|| -> Result { let key = str::from_utf8(&self.key)?; let env = self.rkv.read()?; let reader = env.read()?; let value = self.store.get(&reader, key)?; Ok(value.is_some()) }())); } task_done!(value); } pub struct DeleteTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, key: nsCString, result: AtomicCell>>, } impl DeleteTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, key: nsCString, ) -> DeleteTask { DeleteTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, key, result: AtomicCell::default(), } } } impl Task for DeleteTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some(|| -> Result<(), KeyValueError> { let key = str::from_utf8(&self.key)?; let env = self.rkv.read()?; let mut writer = env.write()?; match self.store.delete(&mut writer, key) { Ok(_) => (), // RKV fails with an error if the key to delete wasn't found, // and Rkv returns that error, but we ignore it, as we expect most // of our consumers to want this behavior. Err(StoreError::KeyValuePairNotFound) => (), Err(err) => return Err(KeyValueError::StoreError(err)), }; // Ignore errors caused by simultaneous access. // We intend to investigate/revert this in bug 1810212. match writer.commit() { Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { // Explicitly ignore errors from simultaneous access. } Err(e) => return Err(From::from(e)), _ => (), }; Ok(()) }())); } task_done!(void); } pub struct ClearTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, result: AtomicCell>>, } impl ClearTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, ) -> ClearTask { ClearTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, result: AtomicCell::default(), } } } impl Task for ClearTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some(|| -> Result<(), KeyValueError> { let env = self.rkv.read()?; let mut writer = env.write()?; self.store.clear(&mut writer)?; // Ignore errors caused by simultaneous access. // We intend to investigate/revert this in bug 1810212. match writer.commit() { Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => { // Explicitly ignore errors from simultaneous access. } Err(e) => return Err(From::from(e)), _ => (), }; Ok(()) }())); } task_done!(void); } pub struct EnumerateTask { callback: AtomicCell>>, rkv: Arc>, store: SingleStore, from_key: nsCString, to_key: nsCString, result: AtomicCell, KeyValueError>>>, } impl EnumerateTask { pub fn new( callback: RefPtr, rkv: Arc>, store: SingleStore, from_key: nsCString, to_key: nsCString, ) -> EnumerateTask { EnumerateTask { callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), rkv, store, from_key, to_key, result: AtomicCell::default(), } } fn convert( &self, result: Vec, ) -> Result, KeyValueError> { Ok(KeyValueEnumerator::new(result)) } } impl Task for EnumerateTask { fn run(&self) { // We do the work within a closure that returns a Result so we can // use the ? operator to simplify the implementation. self.result.store(Some( || -> Result, KeyValueError> { let env = self.rkv.read()?; let reader = env.read()?; let from_key = str::from_utf8(&self.from_key)?; let to_key = str::from_utf8(&self.to_key)?; let iterator = if from_key.is_empty() { self.store.iter_start(&reader)? } else { self.store.iter_from(&reader, &from_key)? }; // Ideally, we'd enumerate pairs lazily, as the consumer calls // nsIKeyValueEnumerator.getNext(), which calls our // KeyValueEnumerator.get_next() implementation. But KeyValueEnumerator // can't reference the Iter because Rust "cannot #[derive(xpcom)] // on a generic type," and the Iter requires a lifetime parameter, // which would make KeyValueEnumerator generic. // // Our fallback approach is to eagerly collect the iterator // into a collection that KeyValueEnumerator owns. Fixing this so we // enumerate pairs lazily is bug 1499252. let pairs: Vec = iterator // Convert the key to a string so we can compare it to the "to" key. // For forward compatibility, we don't fail here if we can't convert // a key to UTF-8. Instead, we store the Err in the collection // and fail lazily in KeyValueEnumerator.get_next(). .map(|result| match result { Ok((key, val)) => Ok((str::from_utf8(&key), val)), Err(err) => Err(err), }) // Stop iterating once we reach the to_key, if any. .take_while(|result| match result { Ok((key, _val)) => { if to_key.is_empty() { true } else { match *key { Ok(key) => key < to_key, Err(_err) => true, } } } Err(_) => true, }) // Convert the key/value pair to owned. .map(|result| match result { Ok((key, val)) => match (key, val) { (Ok(key), val) => Ok((key.to_owned(), OwnedValue::from(&val))), (Err(err), _) => Err(err.into()), }, Err(err) => Err(KeyValueError::StoreError(err)), }) .collect(); Ok(pairs) }(), )); } task_done!(value); }