summaryrefslogtreecommitdiffstats
path: root/toolkit/components/kvstore/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/kvstore/src/task.rs')
-rw-r--r--toolkit/components/kvstore/src/task.rs727
1 files changed, 727 insertions, 0 deletions
diff --git a/toolkit/components/kvstore/src/task.rs b/toolkit/components/kvstore/src/task.rs
new file mode 100644
index 0000000000..3608dc9665
--- /dev/null
+++ b/toolkit/components/kvstore/src/task.rs
@@ -0,0 +1,727 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+extern crate xpcom;
+
+use crossbeam_utils::atomic::AtomicCell;
+use error::KeyValueError;
+use moz_task::Task;
+use nserror::{nsresult, NS_ERROR_FAILURE};
+use nsstring::nsCString;
+use owned_value::owned_to_variant;
+use rkv::backend::{BackendInfo, SafeMode, SafeModeDatabase, SafeModeEnvironment};
+use rkv::{OwnedValue, StoreError, StoreOptions, Value};
+use std::{
+ path::Path,
+ str,
+ sync::{Arc, RwLock},
+};
+use storage_variant::VariantType;
+use xpcom::{
+ interfaces::{
+ nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback,
+ nsIKeyValueVoidCallback, nsIVariant,
+ },
+ RefPtr, ThreadBoundRefPtr,
+};
+use KeyValueDatabase;
+use KeyValueEnumerator;
+use KeyValuePairResult;
+
+type Manager = rkv::Manager<SafeModeEnvironment>;
+type Rkv = rkv::Rkv<SafeModeEnvironment>;
+type SingleStore = rkv::SingleStore<SafeModeDatabase>;
+
+/// A macro to generate a done() implementation for a Task.
+/// Takes one argument that specifies the type of the Task's callback function:
+/// value: a callback function that takes a value
+/// void: the callback function doesn't take a value
+///
+/// The "value" variant calls self.convert() to convert a successful result
+/// into the value to pass to the callback function. So if you generate done()
+/// for a callback that takes a value, ensure you also implement convert()!
+macro_rules! task_done {
+ (value) => {
+ fn done(&self) -> Result<(), nsresult> {
+ // If TaskRunnable calls Task.done() to return a result on the
+ // main thread before TaskRunnable returns on the database thread,
+ // then the Task will get dropped on the database thread.
+ //
+ // But the callback is an nsXPCWrappedJS that isn't safe to release
+ // on the database thread. So we move it out of the Task here to ensure
+ // it gets released on the main thread.
+ let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
+ let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;
+
+ match self.result.swap(None) {
+ Some(Ok(value)) => unsafe { callback.Resolve(self.convert(value)?.coerce()) },
+ Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
+ None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
+ }
+ .to_result()
+ }
+ };
+
+ (void) => {
+ fn done(&self) -> Result<(), nsresult> {
+ // If TaskRunnable calls Task.done() to return a result on the
+ // main thread before TaskRunnable returns on the database thread,
+ // then the Task will get dropped on the database thread.
+ //
+ // But the callback is an nsXPCWrappedJS that isn't safe to release
+ // on the database thread. So we move it out of the Task here to ensure
+ // it gets released on the main thread.
+ let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
+ let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;
+
+ match self.result.swap(None) {
+ Some(Ok(())) => unsafe { callback.Resolve() },
+ Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
+ None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
+ }
+ .to_result()
+ }
+ };
+}
+
+/// A tuple comprising an Arc<RwLock<Rkv>> and a SingleStore, which is
+/// the result of GetOrCreateTask. We declare this type because otherwise
+/// Clippy complains "error: very complex type used. Consider factoring
+/// parts into `type` definitions" (i.e. clippy::type-complexity) when we
+/// declare the type of `GetOrCreateTask::result`.
+type RkvStoreTuple = (Arc<RwLock<Rkv>>, SingleStore);
+
+// The threshold for active resizing.
+const RESIZE_RATIO: f32 = 0.85;
+
+/// The threshold (50 MB) to switch the resizing policy from the double size to
+/// the constant increment for active resizing.
+const INCREMENTAL_RESIZE_THRESHOLD: usize = 52_428_800;
+
+/// The incremental resize step (5 MB)
+const INCREMENTAL_RESIZE_STEP: usize = 5_242_880;
+
+/// The RKV disk page size and mask.
+const PAGE_SIZE: usize = 4096;
+const PAGE_SIZE_MASK: usize = 0b_1111_1111_1111;
+
+/// Round the non-zero size to the multiple of page size greater or equal.
+///
+/// It does not handle the special cases such as size zero and overflow,
+/// because even if that happens (extremely unlikely though), RKV will
+/// ignore the new size if it's smaller than the current size.
+///
+/// E.g:
+/// [ 1 - 4096] -> 4096,
+/// [4097 - 8192] -> 8192,
+/// [8193 - 12286] -> 12286,
+fn round_to_pagesize(size: usize) -> usize {
+ if size & PAGE_SIZE_MASK == 0 {
+ size
+ } else {
+ (size & !PAGE_SIZE_MASK) + PAGE_SIZE
+ }
+}
+
+/// Kvstore employes two auto resizing strategies: active and passive resizing.
+/// They work together to liberate consumers from having to guess the "proper"
+/// size of the store upfront. See more detail about this in Bug 1543861.
+///
+/// Active resizing that is performed at the store startup.
+///
+/// It either increases the size in double, or by a constant size if its size
+/// reaches INCREMENTAL_RESIZE_THRESHOLD.
+///
+/// Note that on Linux / MAC OSX, the increased size would only take effect if
+/// there is a write transaction committed afterwards.
+fn active_resize(env: &Rkv) -> Result<(), StoreError> {
+ let info = env.info()?;
+ let current_size = info.map_size();
+
+ let size = if current_size < INCREMENTAL_RESIZE_THRESHOLD {
+ current_size << 1
+ } else {
+ current_size + INCREMENTAL_RESIZE_STEP
+ };
+
+ env.set_map_size(size)?;
+ Ok(())
+}
+
+/// Passive resizing that is performed when the MAP_FULL error occurs. It
+/// increases the store with a `wanted` size.
+///
+/// Note that the `wanted` size must be rounded to a multiple of page size
+/// by using `round_to_pagesize`.
+fn passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError> {
+ let info = env.info()?;
+ let current_size = info.map_size();
+ env.set_map_size(current_size + wanted)?;
+ Ok(())
+}
+
+pub struct GetOrCreateTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueDatabaseCallback>>>,
+ path: nsCString,
+ name: nsCString,
+ result: AtomicCell<Option<Result<RkvStoreTuple, KeyValueError>>>,
+}
+
+impl GetOrCreateTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueDatabaseCallback>,
+ path: nsCString,
+ name: nsCString,
+ ) -> GetOrCreateTask {
+ GetOrCreateTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ path,
+ name,
+ result: AtomicCell::default(),
+ }
+ }
+
+ fn convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> {
+ Ok(KeyValueDatabase::new(result.0, result.1)?)
+ }
+}
+
+impl Task for GetOrCreateTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result
+ .store(Some(|| -> Result<RkvStoreTuple, KeyValueError> {
+ let store;
+ let mut manager = Manager::singleton().write()?;
+ // Note that path canonicalization is diabled to work around crashes on Fennec:
+ // https://bugzilla.mozilla.org/show_bug.cgi?id=1531887
+ let path = Path::new(str::from_utf8(&self.path)?);
+ let rkv = manager.get_or_create(path, Rkv::new::<SafeMode>)?;
+ {
+ let env = rkv.read()?;
+ let load_ratio = env.load_ratio()?.unwrap_or(0.0);
+ if load_ratio > RESIZE_RATIO {
+ active_resize(&env)?;
+ }
+ store = env.open_single(str::from_utf8(&self.name)?, StoreOptions::create())?;
+ }
+ Ok((rkv, store))
+ }()));
+ }
+
+ task_done!(value);
+}
+
+pub struct PutTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ value: OwnedValue,
+ result: AtomicCell<Option<Result<(), KeyValueError>>>,
+}
+
+impl PutTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVoidCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ value: OwnedValue,
+ ) -> PutTask {
+ PutTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ key,
+ value,
+ result: AtomicCell::default(),
+ }
+ }
+}
+
+impl Task for PutTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(|| -> Result<(), KeyValueError> {
+ let env = self.rkv.read()?;
+ let key = str::from_utf8(&self.key)?;
+ let v = Value::from(&self.value);
+ let mut resized = false;
+
+ // Use a loop here in case we want to retry from a recoverable
+ // error such as `StoreError::MapFull`.
+ loop {
+ let mut writer = env.write()?;
+
+ match self.store.put(&mut writer, key, &v) {
+ Ok(_) => (),
+
+ // Only handle the first MapFull error via passive resizing.
+ // Propogate the subsequent MapFull error.
+ Err(StoreError::MapFull) if !resized => {
+ // abort the failed transaction for resizing.
+ writer.abort();
+
+ // calculate the size of pairs and resize the store accordingly.
+ let pair_size =
+ key.len() + v.serialized_size().map_err(StoreError::from)? as usize;
+ let wanted = round_to_pagesize(pair_size);
+ passive_resize(&env, wanted)?;
+ resized = true;
+ continue;
+ }
+
+ Err(err) => return Err(KeyValueError::StoreError(err)),
+ }
+
+ // Ignore errors caused by simultaneous access.
+ // We intend to investigate/revert this in bug 1810212.
+ match writer.commit() {
+ Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
+ // Explicitly ignore errors from simultaneous access.
+ }
+ Err(e) => return Err(From::from(e)),
+ _ => (),
+ };
+ break;
+ }
+
+ Ok(())
+ }()));
+ }
+
+ task_done!(void);
+}
+
+pub struct WriteManyTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ pairs: Vec<(nsCString, Option<OwnedValue>)>,
+ result: AtomicCell<Option<Result<(), KeyValueError>>>,
+}
+
+impl WriteManyTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVoidCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ pairs: Vec<(nsCString, Option<OwnedValue>)>,
+ ) -> WriteManyTask {
+ WriteManyTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ pairs,
+ result: AtomicCell::default(),
+ }
+ }
+
+ fn calc_pair_size(&self) -> Result<usize, StoreError> {
+ let mut total = 0;
+
+ for (key, value) in self.pairs.iter() {
+ if let Some(val) = value {
+ total += key.len();
+ total += Value::from(val)
+ .serialized_size()
+ .map_err(StoreError::from)? as usize;
+ }
+ }
+
+ Ok(total)
+ }
+}
+
+impl Task for WriteManyTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(|| -> Result<(), KeyValueError> {
+ let env = self.rkv.read()?;
+ let mut resized = false;
+
+ // Use a loop here in case we want to retry from a recoverable
+ // error such as `StoreError::MapFull`.
+ 'outer: loop {
+ let mut writer = env.write()?;
+
+ for (key, value) in self.pairs.iter() {
+ let key = str::from_utf8(key)?;
+ match value {
+ // To put.
+ Some(val) => {
+ match self.store.put(&mut writer, key, &Value::from(val)) {
+ Ok(_) => (),
+
+ // Only handle the first MapFull error via passive resizing.
+ // Propogate the subsequent MapFull error.
+ Err(StoreError::MapFull) if !resized => {
+ // Abort the failed transaction for resizing.
+ writer.abort();
+
+ // Calculate the size of pairs and resize accordingly.
+ let pair_size = self.calc_pair_size()?;
+ let wanted = round_to_pagesize(pair_size);
+ passive_resize(&env, wanted)?;
+ resized = true;
+ continue 'outer;
+ }
+
+ Err(err) => return Err(KeyValueError::StoreError(err)),
+ }
+ }
+ // To delete.
+ None => {
+ match self.store.delete(&mut writer, key) {
+ Ok(_) => (),
+
+ // RKV fails with an error if the key to delete wasn't found,
+ // and Rkv returns that error, but we ignore it, as we expect most
+ // of our consumers to want this behavior.
+ Err(StoreError::KeyValuePairNotFound) => (),
+
+ Err(err) => return Err(KeyValueError::StoreError(err)),
+ };
+ }
+ }
+ }
+
+ // Ignore errors caused by simultaneous access.
+ // We intend to investigate/revert this in bug 1810212.
+ match writer.commit() {
+ Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
+ // Explicitly ignore errors from simultaneous access.
+ }
+ Err(e) => return Err(From::from(e)),
+ _ => (),
+ };
+ break; // 'outer: loop
+ }
+
+ Ok(())
+ }()));
+ }
+
+ task_done!(void);
+}
+
+pub struct GetTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ default_value: Option<OwnedValue>,
+ result: AtomicCell<Option<Result<Option<OwnedValue>, KeyValueError>>>,
+}
+
+impl GetTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVariantCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ default_value: Option<OwnedValue>,
+ ) -> GetTask {
+ GetTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ key,
+ default_value,
+ result: AtomicCell::default(),
+ }
+ }
+
+ fn convert(&self, result: Option<OwnedValue>) -> Result<RefPtr<nsIVariant>, KeyValueError> {
+ Ok(match result {
+ Some(val) => owned_to_variant(val)?,
+ None => ().into_variant(),
+ })
+ }
+}
+
+impl Task for GetTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result
+ .store(Some(|| -> Result<Option<OwnedValue>, KeyValueError> {
+ let key = str::from_utf8(&self.key)?;
+ let env = self.rkv.read()?;
+ let reader = env.read()?;
+ let value = self.store.get(&reader, key)?;
+
+ Ok(match value {
+ Some(value) => Some(OwnedValue::from(&value)),
+ None => match self.default_value {
+ Some(ref val) => Some(val.clone()),
+ None => None,
+ },
+ })
+ }()));
+ }
+
+ task_done!(value);
+}
+
+pub struct HasTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ result: AtomicCell<Option<Result<bool, KeyValueError>>>,
+}
+
+impl HasTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVariantCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ ) -> HasTask {
+ HasTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ key,
+ result: AtomicCell::default(),
+ }
+ }
+
+ fn convert(&self, result: bool) -> Result<RefPtr<nsIVariant>, KeyValueError> {
+ Ok(result.into_variant())
+ }
+}
+
+impl Task for HasTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(|| -> Result<bool, KeyValueError> {
+ let key = str::from_utf8(&self.key)?;
+ let env = self.rkv.read()?;
+ let reader = env.read()?;
+ let value = self.store.get(&reader, key)?;
+ Ok(value.is_some())
+ }()));
+ }
+
+ task_done!(value);
+}
+
+pub struct DeleteTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ result: AtomicCell<Option<Result<(), KeyValueError>>>,
+}
+
+impl DeleteTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVoidCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ key: nsCString,
+ ) -> DeleteTask {
+ DeleteTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ key,
+ result: AtomicCell::default(),
+ }
+ }
+}
+
+impl Task for DeleteTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(|| -> Result<(), KeyValueError> {
+ let key = str::from_utf8(&self.key)?;
+ let env = self.rkv.read()?;
+ let mut writer = env.write()?;
+
+ match self.store.delete(&mut writer, key) {
+ Ok(_) => (),
+
+ // RKV fails with an error if the key to delete wasn't found,
+ // and Rkv returns that error, but we ignore it, as we expect most
+ // of our consumers to want this behavior.
+ Err(StoreError::KeyValuePairNotFound) => (),
+
+ Err(err) => return Err(KeyValueError::StoreError(err)),
+ };
+
+ // Ignore errors caused by simultaneous access.
+ // We intend to investigate/revert this in bug 1810212.
+ match writer.commit() {
+ Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
+ // Explicitly ignore errors from simultaneous access.
+ }
+ Err(e) => return Err(From::from(e)),
+ _ => (),
+ };
+
+ Ok(())
+ }()));
+ }
+
+ task_done!(void);
+}
+
+pub struct ClearTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ result: AtomicCell<Option<Result<(), KeyValueError>>>,
+}
+
+impl ClearTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueVoidCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ ) -> ClearTask {
+ ClearTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ result: AtomicCell::default(),
+ }
+ }
+}
+
+impl Task for ClearTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(|| -> Result<(), KeyValueError> {
+ let env = self.rkv.read()?;
+ let mut writer = env.write()?;
+ self.store.clear(&mut writer)?;
+ // Ignore errors caused by simultaneous access.
+ // We intend to investigate/revert this in bug 1810212.
+ match writer.commit() {
+ Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
+ // Explicitly ignore errors from simultaneous access.
+ }
+ Err(e) => return Err(From::from(e)),
+ _ => (),
+ };
+
+ Ok(())
+ }()));
+ }
+
+ task_done!(void);
+}
+
+pub struct EnumerateTask {
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueEnumeratorCallback>>>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ from_key: nsCString,
+ to_key: nsCString,
+ result: AtomicCell<Option<Result<Vec<KeyValuePairResult>, KeyValueError>>>,
+}
+
+impl EnumerateTask {
+ pub fn new(
+ callback: RefPtr<nsIKeyValueEnumeratorCallback>,
+ rkv: Arc<RwLock<Rkv>>,
+ store: SingleStore,
+ from_key: nsCString,
+ to_key: nsCString,
+ ) -> EnumerateTask {
+ EnumerateTask {
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ rkv,
+ store,
+ from_key,
+ to_key,
+ result: AtomicCell::default(),
+ }
+ }
+
+ fn convert(
+ &self,
+ result: Vec<KeyValuePairResult>,
+ ) -> Result<RefPtr<KeyValueEnumerator>, KeyValueError> {
+ Ok(KeyValueEnumerator::new(result))
+ }
+}
+
+impl Task for EnumerateTask {
+ fn run(&self) {
+ // We do the work within a closure that returns a Result so we can
+ // use the ? operator to simplify the implementation.
+ self.result.store(Some(
+ || -> Result<Vec<KeyValuePairResult>, KeyValueError> {
+ let env = self.rkv.read()?;
+ let reader = env.read()?;
+ let from_key = str::from_utf8(&self.from_key)?;
+ let to_key = str::from_utf8(&self.to_key)?;
+
+ let iterator = if from_key.is_empty() {
+ self.store.iter_start(&reader)?
+ } else {
+ self.store.iter_from(&reader, &from_key)?
+ };
+
+ // Ideally, we'd enumerate pairs lazily, as the consumer calls
+ // nsIKeyValueEnumerator.getNext(), which calls our
+ // KeyValueEnumerator.get_next() implementation. But KeyValueEnumerator
+ // can't reference the Iter because Rust "cannot #[derive(xpcom)]
+ // on a generic type," and the Iter requires a lifetime parameter,
+ // which would make KeyValueEnumerator generic.
+ //
+ // Our fallback approach is to eagerly collect the iterator
+ // into a collection that KeyValueEnumerator owns. Fixing this so we
+ // enumerate pairs lazily is bug 1499252.
+ let pairs: Vec<KeyValuePairResult> = iterator
+ // Convert the key to a string so we can compare it to the "to" key.
+ // For forward compatibility, we don't fail here if we can't convert
+ // a key to UTF-8. Instead, we store the Err in the collection
+ // and fail lazily in KeyValueEnumerator.get_next().
+ .map(|result| match result {
+ Ok((key, val)) => Ok((str::from_utf8(&key), val)),
+ Err(err) => Err(err),
+ })
+ // Stop iterating once we reach the to_key, if any.
+ .take_while(|result| match result {
+ Ok((key, _val)) => {
+ if to_key.is_empty() {
+ true
+ } else {
+ match *key {
+ Ok(key) => key < to_key,
+ Err(_err) => true,
+ }
+ }
+ }
+ Err(_) => true,
+ })
+ // Convert the key/value pair to owned.
+ .map(|result| match result {
+ Ok((key, val)) => match (key, val) {
+ (Ok(key), val) => Ok((key.to_owned(), OwnedValue::from(&val))),
+ (Err(err), _) => Err(err.into()),
+ },
+ Err(err) => Err(KeyValueError::StoreError(err)),
+ })
+ .collect();
+
+ Ok(pairs)
+ }(),
+ ));
+ }
+
+ task_done!(value);
+}