summaryrefslogtreecommitdiffstats
path: root/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs
diff options
context:
space:
mode:
Diffstat (limited to 'intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs')
-rw-r--r--intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs294
1 files changed, 294 insertions, 0 deletions
diff --git a/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs b/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs
new file mode 100644
index 0000000000..bfcff941b5
--- /dev/null
+++ b/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs
@@ -0,0 +1,294 @@
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+use crate::{
+ env::ErrorReporter,
+ errors::{L10nRegistryError, L10nRegistrySetupError},
+ fluent::{FluentBundle, FluentError},
+ registry::{BundleAdapter, L10nRegistry, MetaSources},
+ solver::{AsyncTester, ParallelProblemSolver},
+ source::{ResourceOption, ResourceStatus},
+};
+
+use fluent_fallback::{generator::BundleStream, types::ResourceId};
+use futures::{
+ stream::{Collect, FuturesOrdered},
+ Stream, StreamExt,
+};
+use std::future::Future;
+use unic_langid::LanguageIdentifier;
+
+impl<P, B> L10nRegistry<P, B>
+where
+ P: Clone,
+ B: Clone,
+{
+ /// This method is useful for testing various configurations.
+ #[cfg(feature = "test-fluent")]
+ pub fn generate_bundles_for_lang(
+ &self,
+ langid: LanguageIdentifier,
+ resource_ids: Vec<ResourceId>,
+ ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
+ let lang_ids = vec![langid];
+
+ Ok(GenerateBundles::new(
+ self.clone(),
+ lang_ids.into_iter(),
+ resource_ids,
+ // Cheaply create an immutable shallow copy of the [MetaSources].
+ self.try_borrow_metasources()?.clone(),
+ ))
+ }
+
+ // Asynchronously generate the bundles.
+ pub fn generate_bundles(
+ &self,
+ locales: std::vec::IntoIter<LanguageIdentifier>,
+ resource_ids: Vec<ResourceId>,
+ ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
+ Ok(GenerateBundles::new(
+ self.clone(),
+ locales,
+ resource_ids,
+ // Cheaply create an immutable shallow copy of the [MetaSources].
+ self.try_borrow_metasources()?.clone(),
+ ))
+ }
+}
+
+/// This enum contains the various states the [GenerateBundles] can be in during the
+/// asynchronous generation step.
+enum State<P, B> {
+ Empty,
+ Locale(LanguageIdentifier),
+ Solver {
+ locale: LanguageIdentifier,
+ solver: ParallelProblemSolver<GenerateBundles<P, B>>,
+ },
+}
+
+impl<P, B> Default for State<P, B> {
+ fn default() -> Self {
+ Self::Empty
+ }
+}
+
+impl<P, B> State<P, B> {
+ fn get_locale(&self) -> &LanguageIdentifier {
+ match self {
+ Self::Locale(locale) => locale,
+ Self::Solver { locale, .. } => locale,
+ Self::Empty => unreachable!("Attempting to get a locale for an empty state."),
+ }
+ }
+
+ fn take_solver(&mut self) -> ParallelProblemSolver<GenerateBundles<P, B>> {
+ replace_with::replace_with_or_default_and_return(self, |self_| match self_ {
+ Self::Solver { locale, solver } => (solver, Self::Locale(locale)),
+ _ => unreachable!("Attempting to take a solver in an invalid state."),
+ })
+ }
+
+ fn put_back_solver(&mut self, solver: ParallelProblemSolver<GenerateBundles<P, B>>) {
+ replace_with::replace_with_or_default(self, |self_| match self_ {
+ Self::Locale(locale) => Self::Solver { locale, solver },
+ _ => unreachable!("Attempting to put back a solver in an invalid state."),
+ })
+ }
+}
+
+pub struct GenerateBundles<P, B> {
+ /// Do not access the metasources in the registry, as they may be mutated between
+ /// async iterations.
+ reg: L10nRegistry<P, B>,
+ /// This is an immutable shallow copy of the MetaSources that should not be mutated
+ /// during the iteration process. This ensures that the iterator will still be
+ /// valid if the L10nRegistry is mutated while iterating through the sources.
+ metasources: MetaSources,
+ locales: std::vec::IntoIter<LanguageIdentifier>,
+ current_metasource: usize,
+ resource_ids: Vec<ResourceId>,
+ state: State<P, B>,
+}
+
+impl<P, B> GenerateBundles<P, B> {
+ fn new(
+ reg: L10nRegistry<P, B>,
+ locales: std::vec::IntoIter<LanguageIdentifier>,
+ resource_ids: Vec<ResourceId>,
+ metasources: MetaSources,
+ ) -> Self {
+ Self {
+ reg,
+ metasources,
+ locales,
+ current_metasource: 0,
+ resource_ids,
+ state: State::Empty,
+ }
+ }
+}
+
+pub type ResourceSetStream = Collect<FuturesOrdered<ResourceStatus>, Vec<ResourceOption>>;
+pub struct TestResult(ResourceSetStream);
+impl std::marker::Unpin for TestResult {}
+
+impl Future for TestResult {
+ type Output = Vec<bool>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let pinned = Pin::new(&mut self.0);
+ pinned
+ .poll(cx)
+ .map(|set| set.iter().map(|c| !c.is_required_and_missing()).collect())
+ }
+}
+
+impl<'l, P, B> AsyncTester for GenerateBundles<P, B> {
+ type Result = TestResult;
+
+ fn test_async(&self, query: Vec<(usize, usize)>) -> Self::Result {
+ let locale = self.state.get_locale();
+
+ let stream = query
+ .iter()
+ .map(|(res_idx, source_idx)| {
+ let resource_id = &self.resource_ids[*res_idx];
+ self.metasources
+ .filesource(self.current_metasource, *source_idx)
+ .fetch_file(locale, resource_id)
+ })
+ .collect::<FuturesOrdered<_>>();
+ TestResult(stream.collect::<_>())
+ }
+}
+
+#[async_trait::async_trait(?Send)]
+impl<P, B> BundleStream for GenerateBundles<P, B> {
+ async fn prefetch_async(&mut self) {
+ todo!();
+ }
+}
+
+/// Generate [FluentBundles](FluentBundle) asynchronously.
+impl<P, B> Stream for GenerateBundles<P, B>
+where
+ P: ErrorReporter,
+ B: BundleAdapter,
+{
+ type Item = Result<FluentBundle, (FluentBundle, Vec<FluentError>)>;
+
+ /// Asynchronously try and get a solver, and then with the solver generate a bundle.
+ /// If the solver is not ready yet, then this function will return as `Pending`, and
+ /// the Future runner will need to re-enter at a later point to try again.
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if self.metasources.is_empty() {
+ // There are no metasources available, so no bundles can be generated.
+ return None.into();
+ }
+ loop {
+ if let State::Solver { .. } = self.state {
+ // A solver has already been set up, continue iterating through the
+ // resources and generating a bundle.
+
+ // Pin the solver so that the async try_poll_next can be called.
+ let mut solver = self.state.take_solver();
+ let pinned_solver = Pin::new(&mut solver);
+
+ if let std::task::Poll::Ready(solver_result) =
+ pinned_solver.try_poll_next(cx, &self, false)
+ {
+ // The solver is ready, but may not have generated an ordering.
+
+ if let Ok(Some(order)) = solver_result {
+ // The solver resolved an ordering, and a bundle may be able
+ // to be generated.
+
+ let bundle = self.metasources.bundle_from_order(
+ self.current_metasource,
+ self.state.get_locale().clone(),
+ &order,
+ &self.resource_ids,
+ &self.reg.shared.provider,
+ self.reg.shared.bundle_adapter.as_ref(),
+ );
+
+ self.state.put_back_solver(solver);
+
+ if bundle.is_some() {
+ // The bundle was successfully generated.
+ return bundle.into();
+ }
+
+ // No bundle was generated, continue on.
+ continue;
+ }
+
+ // There is no bundle ordering available.
+
+ if self.current_metasource > 0 {
+ // There are more metasources, create a new solver and try the
+ // next metasource. If there is an error in the solver_result
+ // ignore it for now, since there are more metasources.
+ self.current_metasource -= 1;
+ let solver = ParallelProblemSolver::new(
+ self.resource_ids.len(),
+ self.metasources.get(self.current_metasource).len(),
+ );
+ self.state = State::Solver {
+ locale: self.state.get_locale().clone(),
+ solver,
+ };
+ continue;
+ }
+
+ if let Err(idx) = solver_result {
+ // Since there are no more metasources, and there is an error,
+ // report it instead of ignoring it.
+ self.reg.shared.provider.report_errors(vec![
+ L10nRegistryError::MissingResource {
+ locale: self.state.get_locale().clone(),
+ resource_id: self.resource_ids[idx].clone(),
+ },
+ ]);
+ }
+
+ // There are no more metasources.
+ self.state = State::Empty;
+ continue;
+ }
+
+ // The solver is not ready yet, so exit out of this async task
+ // and mark it as pending. It can be tried again later.
+ self.state.put_back_solver(solver);
+ return std::task::Poll::Pending;
+ }
+
+ // There are no more metasources to search.
+
+ // Try the next locale.
+ if let Some(locale) = self.locales.next() {
+ // Restart at the end of the metasources for this locale, and iterate
+ // backwards.
+ let last_metasource_idx = self.metasources.len() - 1;
+ self.current_metasource = last_metasource_idx;
+
+ let solver = ParallelProblemSolver::new(
+ self.resource_ids.len(),
+ self.metasources.get(self.current_metasource).len(),
+ );
+ self.state = State::Solver { locale, solver };
+
+ // Continue iterating on the next solver.
+ continue;
+ }
+
+ // There are no more locales or metasources to search. This iterator
+ // is done.
+ return None.into();
+ }
+ }
+}