summaryrefslogtreecommitdiffstats
path: root/intl/l10n/rust/l10nregistry-rs/src/registry/asynchronous.rs
blob: bfcff941b59c2eba45e4d0cf8f96a2c000e99ff9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use crate::{
    env::ErrorReporter,
    errors::{L10nRegistryError, L10nRegistrySetupError},
    fluent::{FluentBundle, FluentError},
    registry::{BundleAdapter, L10nRegistry, MetaSources},
    solver::{AsyncTester, ParallelProblemSolver},
    source::{ResourceOption, ResourceStatus},
};

use fluent_fallback::{generator::BundleStream, types::ResourceId};
use futures::{
    stream::{Collect, FuturesOrdered},
    Stream, StreamExt,
};
use std::future::Future;
use unic_langid::LanguageIdentifier;

impl<P, B> L10nRegistry<P, B>
where
    P: Clone,
    B: Clone,
{
    /// This method is useful for testing various configurations.
    #[cfg(feature = "test-fluent")]
    pub fn generate_bundles_for_lang(
        &self,
        langid: LanguageIdentifier,
        resource_ids: Vec<ResourceId>,
    ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
        let lang_ids = vec![langid];

        Ok(GenerateBundles::new(
            self.clone(),
            lang_ids.into_iter(),
            resource_ids,
            // Cheaply create an immutable shallow copy of the [MetaSources].
            self.try_borrow_metasources()?.clone(),
        ))
    }

    // Asynchronously generate the bundles.
    pub fn generate_bundles(
        &self,
        locales: std::vec::IntoIter<LanguageIdentifier>,
        resource_ids: Vec<ResourceId>,
    ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
        Ok(GenerateBundles::new(
            self.clone(),
            locales,
            resource_ids,
            // Cheaply create an immutable shallow copy of the [MetaSources].
            self.try_borrow_metasources()?.clone(),
        ))
    }
}

/// This enum contains the various states the [GenerateBundles] can be in during the
/// asynchronous generation step.
enum State<P, B> {
    Empty,
    Locale(LanguageIdentifier),
    Solver {
        locale: LanguageIdentifier,
        solver: ParallelProblemSolver<GenerateBundles<P, B>>,
    },
}

impl<P, B> Default for State<P, B> {
    fn default() -> Self {
        Self::Empty
    }
}

impl<P, B> State<P, B> {
    fn get_locale(&self) -> &LanguageIdentifier {
        match self {
            Self::Locale(locale) => locale,
            Self::Solver { locale, .. } => locale,
            Self::Empty => unreachable!("Attempting to get a locale for an empty state."),
        }
    }

    fn take_solver(&mut self) -> ParallelProblemSolver<GenerateBundles<P, B>> {
        replace_with::replace_with_or_default_and_return(self, |self_| match self_ {
            Self::Solver { locale, solver } => (solver, Self::Locale(locale)),
            _ => unreachable!("Attempting to take a solver in an invalid state."),
        })
    }

    fn put_back_solver(&mut self, solver: ParallelProblemSolver<GenerateBundles<P, B>>) {
        replace_with::replace_with_or_default(self, |self_| match self_ {
            Self::Locale(locale) => Self::Solver { locale, solver },
            _ => unreachable!("Attempting to put back a solver in an invalid state."),
        })
    }
}

pub struct GenerateBundles<P, B> {
    /// Do not access the metasources in the registry, as they may be mutated between
    /// async iterations.
    reg: L10nRegistry<P, B>,
    /// This is an immutable shallow copy of the MetaSources that should not be mutated
    /// during the iteration process. This ensures that the iterator will still be
    /// valid if the L10nRegistry is mutated while iterating through the sources.
    metasources: MetaSources,
    locales: std::vec::IntoIter<LanguageIdentifier>,
    current_metasource: usize,
    resource_ids: Vec<ResourceId>,
    state: State<P, B>,
}

impl<P, B> GenerateBundles<P, B> {
    fn new(
        reg: L10nRegistry<P, B>,
        locales: std::vec::IntoIter<LanguageIdentifier>,
        resource_ids: Vec<ResourceId>,
        metasources: MetaSources,
    ) -> Self {
        Self {
            reg,
            metasources,
            locales,
            current_metasource: 0,
            resource_ids,
            state: State::Empty,
        }
    }
}

pub type ResourceSetStream = Collect<FuturesOrdered<ResourceStatus>, Vec<ResourceOption>>;
pub struct TestResult(ResourceSetStream);
impl std::marker::Unpin for TestResult {}

impl Future for TestResult {
    type Output = Vec<bool>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let pinned = Pin::new(&mut self.0);
        pinned
            .poll(cx)
            .map(|set| set.iter().map(|c| !c.is_required_and_missing()).collect())
    }
}

impl<'l, P, B> AsyncTester for GenerateBundles<P, B> {
    type Result = TestResult;

    fn test_async(&self, query: Vec<(usize, usize)>) -> Self::Result {
        let locale = self.state.get_locale();

        let stream = query
            .iter()
            .map(|(res_idx, source_idx)| {
                let resource_id = &self.resource_ids[*res_idx];
                self.metasources
                    .filesource(self.current_metasource, *source_idx)
                    .fetch_file(locale, resource_id)
            })
            .collect::<FuturesOrdered<_>>();
        TestResult(stream.collect::<_>())
    }
}

#[async_trait::async_trait(?Send)]
impl<P, B> BundleStream for GenerateBundles<P, B> {
    async fn prefetch_async(&mut self) {
        todo!();
    }
}

/// Generate [FluentBundles](FluentBundle) asynchronously.
impl<P, B> Stream for GenerateBundles<P, B>
where
    P: ErrorReporter,
    B: BundleAdapter,
{
    type Item = Result<FluentBundle, (FluentBundle, Vec<FluentError>)>;

    /// Asynchronously try and get a solver, and then with the solver generate a bundle.
    /// If the solver is not ready yet, then this function will return as `Pending`, and
    /// the Future runner will need to re-enter at a later point to try again.
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.metasources.is_empty() {
            // There are no metasources available, so no bundles can be generated.
            return None.into();
        }
        loop {
            if let State::Solver { .. } = self.state {
                // A solver has already been set up, continue iterating through the
                // resources and generating a bundle.

                // Pin the solver so that the async try_poll_next can be called.
                let mut solver = self.state.take_solver();
                let pinned_solver = Pin::new(&mut solver);

                if let std::task::Poll::Ready(solver_result) =
                    pinned_solver.try_poll_next(cx, &self, false)
                {
                    // The solver is ready, but may not have generated an ordering.

                    if let Ok(Some(order)) = solver_result {
                        // The solver resolved an ordering, and a bundle may be able
                        // to be generated.

                        let bundle = self.metasources.bundle_from_order(
                            self.current_metasource,
                            self.state.get_locale().clone(),
                            &order,
                            &self.resource_ids,
                            &self.reg.shared.provider,
                            self.reg.shared.bundle_adapter.as_ref(),
                        );

                        self.state.put_back_solver(solver);

                        if bundle.is_some() {
                            // The bundle was successfully generated.
                            return bundle.into();
                        }

                        // No bundle was generated, continue on.
                        continue;
                    }

                    // There is no bundle ordering available.

                    if self.current_metasource > 0 {
                        // There are more metasources, create a new solver and try the
                        // next metasource. If there is an error in the solver_result
                        // ignore it for now, since there are more metasources.
                        self.current_metasource -= 1;
                        let solver = ParallelProblemSolver::new(
                            self.resource_ids.len(),
                            self.metasources.get(self.current_metasource).len(),
                        );
                        self.state = State::Solver {
                            locale: self.state.get_locale().clone(),
                            solver,
                        };
                        continue;
                    }

                    if let Err(idx) = solver_result {
                        // Since there are no more metasources, and there is an error,
                        // report it instead of ignoring it.
                        self.reg.shared.provider.report_errors(vec![
                            L10nRegistryError::MissingResource {
                                locale: self.state.get_locale().clone(),
                                resource_id: self.resource_ids[idx].clone(),
                            },
                        ]);
                    }

                    // There are no more metasources.
                    self.state = State::Empty;
                    continue;
                }

                // The solver is not ready yet, so exit out of this async task
                // and mark it as pending. It can be tried again later.
                self.state.put_back_solver(solver);
                return std::task::Poll::Pending;
            }

            // There are no more metasources to search.

            // Try the next locale.
            if let Some(locale) = self.locales.next() {
                // Restart at the end of the metasources for this locale, and iterate
                // backwards.
                let last_metasource_idx = self.metasources.len() - 1;
                self.current_metasource = last_metasource_idx;

                let solver = ParallelProblemSolver::new(
                    self.resource_ids.len(),
                    self.metasources.get(self.current_metasource).len(),
                );
                self.state = State::Solver { locale, solver };

                // Continue iterating on the next solver.
                continue;
            }

            // There are no more locales or metasources to search. This iterator
            // is done.
            return None.into();
        }
    }
}