diff options
Diffstat (limited to 'intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs')
-rw-r--r-- | intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs b/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs new file mode 100644 index 0000000000..bfcff941b5 --- /dev/null +++ b/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs @@ -0,0 +1,294 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use crate::{ + env::ErrorReporter, + errors::{L10nRegistryError, L10nRegistrySetupError}, + fluent::{FluentBundle, FluentError}, + registry::{BundleAdapter, L10nRegistry, MetaSources}, + solver::{AsyncTester, ParallelProblemSolver}, + source::{ResourceOption, ResourceStatus}, +}; + +use fluent_fallback::{generator::BundleStream, types::ResourceId}; +use futures::{ + stream::{Collect, FuturesOrdered}, + Stream, StreamExt, +}; +use std::future::Future; +use unic_langid::LanguageIdentifier; + +impl<P, B> L10nRegistry<P, B> +where + P: Clone, + B: Clone, +{ + /// This method is useful for testing various configurations. + #[cfg(feature = "test-fluent")] + pub fn generate_bundles_for_lang( + &self, + langid: LanguageIdentifier, + resource_ids: Vec<ResourceId>, + ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> { + let lang_ids = vec![langid]; + + Ok(GenerateBundles::new( + self.clone(), + lang_ids.into_iter(), + resource_ids, + // Cheaply create an immutable shallow copy of the [MetaSources]. + self.try_borrow_metasources()?.clone(), + )) + } + + // Asynchronously generate the bundles. + pub fn generate_bundles( + &self, + locales: std::vec::IntoIter<LanguageIdentifier>, + resource_ids: Vec<ResourceId>, + ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> { + Ok(GenerateBundles::new( + self.clone(), + locales, + resource_ids, + // Cheaply create an immutable shallow copy of the [MetaSources]. + self.try_borrow_metasources()?.clone(), + )) + } +} + +/// This enum contains the various states the [GenerateBundles] can be in during the +/// asynchronous generation step. +enum State<P, B> { + Empty, + Locale(LanguageIdentifier), + Solver { + locale: LanguageIdentifier, + solver: ParallelProblemSolver<GenerateBundles<P, B>>, + }, +} + +impl<P, B> Default for State<P, B> { + fn default() -> Self { + Self::Empty + } +} + +impl<P, B> State<P, B> { + fn get_locale(&self) -> &LanguageIdentifier { + match self { + Self::Locale(locale) => locale, + Self::Solver { locale, .. } => locale, + Self::Empty => unreachable!("Attempting to get a locale for an empty state."), + } + } + + fn take_solver(&mut self) -> ParallelProblemSolver<GenerateBundles<P, B>> { + replace_with::replace_with_or_default_and_return(self, |self_| match self_ { + Self::Solver { locale, solver } => (solver, Self::Locale(locale)), + _ => unreachable!("Attempting to take a solver in an invalid state."), + }) + } + + fn put_back_solver(&mut self, solver: ParallelProblemSolver<GenerateBundles<P, B>>) { + replace_with::replace_with_or_default(self, |self_| match self_ { + Self::Locale(locale) => Self::Solver { locale, solver }, + _ => unreachable!("Attempting to put back a solver in an invalid state."), + }) + } +} + +pub struct GenerateBundles<P, B> { + /// Do not access the metasources in the registry, as they may be mutated between + /// async iterations. + reg: L10nRegistry<P, B>, + /// This is an immutable shallow copy of the MetaSources that should not be mutated + /// during the iteration process. This ensures that the iterator will still be + /// valid if the L10nRegistry is mutated while iterating through the sources. + metasources: MetaSources, + locales: std::vec::IntoIter<LanguageIdentifier>, + current_metasource: usize, + resource_ids: Vec<ResourceId>, + state: State<P, B>, +} + +impl<P, B> GenerateBundles<P, B> { + fn new( + reg: L10nRegistry<P, B>, + locales: std::vec::IntoIter<LanguageIdentifier>, + resource_ids: Vec<ResourceId>, + metasources: MetaSources, + ) -> Self { + Self { + reg, + metasources, + locales, + current_metasource: 0, + resource_ids, + state: State::Empty, + } + } +} + +pub type ResourceSetStream = Collect<FuturesOrdered<ResourceStatus>, Vec<ResourceOption>>; +pub struct TestResult(ResourceSetStream); +impl std::marker::Unpin for TestResult {} + +impl Future for TestResult { + type Output = Vec<bool>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let pinned = Pin::new(&mut self.0); + pinned + .poll(cx) + .map(|set| set.iter().map(|c| !c.is_required_and_missing()).collect()) + } +} + +impl<'l, P, B> AsyncTester for GenerateBundles<P, B> { + type Result = TestResult; + + fn test_async(&self, query: Vec<(usize, usize)>) -> Self::Result { + let locale = self.state.get_locale(); + + let stream = query + .iter() + .map(|(res_idx, source_idx)| { + let resource_id = &self.resource_ids[*res_idx]; + self.metasources + .filesource(self.current_metasource, *source_idx) + .fetch_file(locale, resource_id) + }) + .collect::<FuturesOrdered<_>>(); + TestResult(stream.collect::<_>()) + } +} + +#[async_trait::async_trait(?Send)] +impl<P, B> BundleStream for GenerateBundles<P, B> { + async fn prefetch_async(&mut self) { + todo!(); + } +} + +/// Generate [FluentBundles](FluentBundle) asynchronously. +impl<P, B> Stream for GenerateBundles<P, B> +where + P: ErrorReporter, + B: BundleAdapter, +{ + type Item = Result<FluentBundle, (FluentBundle, Vec<FluentError>)>; + + /// Asynchronously try and get a solver, and then with the solver generate a bundle. + /// If the solver is not ready yet, then this function will return as `Pending`, and + /// the Future runner will need to re-enter at a later point to try again. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if self.metasources.is_empty() { + // There are no metasources available, so no bundles can be generated. + return None.into(); + } + loop { + if let State::Solver { .. } = self.state { + // A solver has already been set up, continue iterating through the + // resources and generating a bundle. + + // Pin the solver so that the async try_poll_next can be called. + let mut solver = self.state.take_solver(); + let pinned_solver = Pin::new(&mut solver); + + if let std::task::Poll::Ready(solver_result) = + pinned_solver.try_poll_next(cx, &self, false) + { + // The solver is ready, but may not have generated an ordering. + + if let Ok(Some(order)) = solver_result { + // The solver resolved an ordering, and a bundle may be able + // to be generated. + + let bundle = self.metasources.bundle_from_order( + self.current_metasource, + self.state.get_locale().clone(), + &order, + &self.resource_ids, + &self.reg.shared.provider, + self.reg.shared.bundle_adapter.as_ref(), + ); + + self.state.put_back_solver(solver); + + if bundle.is_some() { + // The bundle was successfully generated. + return bundle.into(); + } + + // No bundle was generated, continue on. + continue; + } + + // There is no bundle ordering available. + + if self.current_metasource > 0 { + // There are more metasources, create a new solver and try the + // next metasource. If there is an error in the solver_result + // ignore it for now, since there are more metasources. + self.current_metasource -= 1; + let solver = ParallelProblemSolver::new( + self.resource_ids.len(), + self.metasources.get(self.current_metasource).len(), + ); + self.state = State::Solver { + locale: self.state.get_locale().clone(), + solver, + }; + continue; + } + + if let Err(idx) = solver_result { + // Since there are no more metasources, and there is an error, + // report it instead of ignoring it. + self.reg.shared.provider.report_errors(vec![ + L10nRegistryError::MissingResource { + locale: self.state.get_locale().clone(), + resource_id: self.resource_ids[idx].clone(), + }, + ]); + } + + // There are no more metasources. + self.state = State::Empty; + continue; + } + + // The solver is not ready yet, so exit out of this async task + // and mark it as pending. It can be tried again later. + self.state.put_back_solver(solver); + return std::task::Poll::Pending; + } + + // There are no more metasources to search. + + // Try the next locale. + if let Some(locale) = self.locales.next() { + // Restart at the end of the metasources for this locale, and iterate + // backwards. + let last_metasource_idx = self.metasources.len() - 1; + self.current_metasource = last_metasource_idx; + + let solver = ParallelProblemSolver::new( + self.resource_ids.len(), + self.metasources.get(self.current_metasource).len(), + ); + self.state = State::Solver { locale, solver }; + + // Continue iterating on the next solver. + continue; + } + + // There are no more locales or metasources to search. This iterator + // is done. + return None.into(); + } + } +} |