summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/client.rs')
-rw-r--r--third_party/rust/sync15/src/client.rs452
1 files changed, 452 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client.rs b/third_party/rust/sync15/src/client.rs
new file mode 100644
index 0000000000..98ed0dbd5d
--- /dev/null
+++ b/third_party/rust/sync15/src/client.rs
@@ -0,0 +1,452 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso_record::{BsoRecord, EncryptedBso};
+use crate::error::{self, ErrorKind, ErrorResponse};
+use crate::record_types::MetaGlobalRecord;
+use crate::request::{
+ BatchPoster, CollectionRequest, InfoCollections, InfoConfiguration, PostQueue, PostResponse,
+ PostResponseHandler,
+};
+use crate::token;
+use crate::util::ServerTimestamp;
+use serde_json::Value;
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU32, Ordering};
+use url::Url;
+use viaduct::{
+ header_names::{self, AUTHORIZATION},
+ Method, Request, Response,
+};
+
+/// A response from a GET request on a Sync15StorageClient, encapsulating all
+/// the variants users of this client needs to care about.
+#[derive(Debug, Clone)]
+pub enum Sync15ClientResponse<T> {
+ Success {
+ status: u16,
+ record: T,
+ last_modified: ServerTimestamp,
+ route: String,
+ },
+ Error(ErrorResponse),
+}
+
+fn parse_seconds(seconds_str: &str) -> Option<u32> {
+ let secs = seconds_str.parse::<f64>().ok()?.ceil();
+ // Note: u32 doesn't impl TryFrom<f64> :(
+ if !secs.is_finite() || secs < 0.0 || secs >= f64::from(u32::max_value()) {
+ Some(secs as u32)
+ } else {
+ log::warn!("invalid backoff value: {}", secs);
+ None
+ }
+}
+
+impl<T> Sync15ClientResponse<T> {
+ pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result<Self>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let route: String = resp.url.path().into();
+ // Android seems to respect retry_after even on success requests, so we
+ // will too if it's present. This also lets us handle both backoff-like
+ // properties in the same place.
+ let retry_after = resp
+ .headers
+ .get(header_names::RETRY_AFTER)
+ .and_then(parse_seconds);
+
+ let backoff = resp
+ .headers
+ .get(header_names::X_WEAVE_BACKOFF)
+ .and_then(parse_seconds);
+
+ if let Some(b) = backoff {
+ backoff_listener.note_backoff(b);
+ }
+ if let Some(ra) = retry_after {
+ backoff_listener.note_retry_after(ra);
+ }
+
+ Ok(if resp.is_success() {
+ let record: T = resp.json()?;
+ let last_modified = resp
+ .headers
+ .get(header_names::X_LAST_MODIFIED)
+ .and_then(|s| ServerTimestamp::from_str(s).ok())
+ .ok_or_else(|| ErrorKind::MissingServerTimestamp)?;
+ log::info!(
+ "Successful request to \"{}\", incoming x-last-modified={:?}",
+ route,
+ last_modified
+ );
+
+ Sync15ClientResponse::Success {
+ status: resp.status,
+ record,
+ last_modified,
+ route,
+ }
+ } else {
+ let status = resp.status;
+ log::info!("Request \"{}\" was an error (status={})", route, status);
+ match status {
+ 404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }),
+ 401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }),
+ 412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }),
+ 500..=600 => {
+ Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status })
+ }
+ _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }),
+ }
+ })
+ }
+
+ pub fn create_storage_error(self) -> ErrorKind {
+ let inner = match self {
+ Sync15ClientResponse::Success { status, route, .. } => {
+ // This should never happen as callers are expected to have
+ // already special-cased this response, so warn if it does.
+ // (or maybe we could panic?)
+ log::warn!("Converting success response into an error");
+ ErrorResponse::RequestFailed { status, route }
+ }
+ Sync15ClientResponse::Error(e) => e,
+ };
+ ErrorKind::StorageHttpError(inner)
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct Sync15StorageClientInit {
+ pub key_id: String,
+ pub access_token: String,
+ pub tokenserver_url: Url,
+}
+
+/// A trait containing the methods required to run through the setup state
+/// machine. This is factored out into a separate trait to make mocking
+/// easier.
+pub trait SetupStorageClient {
+ fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>>;
+ fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>>;
+ fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>>;
+ fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>>;
+
+ fn put_meta_global(
+ &self,
+ xius: ServerTimestamp,
+ global: &MetaGlobalRecord,
+ ) -> error::Result<ServerTimestamp>;
+ fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()>;
+ fn wipe_all_remote(&self) -> error::Result<()>;
+}
+
+#[derive(Debug, Default)]
+pub struct BackoffState {
+ pub backoff_secs: AtomicU32,
+ pub retry_after_secs: AtomicU32,
+}
+
+pub(crate) type BackoffListener = std::sync::Arc<BackoffState>;
+
+pub(crate) fn new_backoff_listener() -> BackoffListener {
+ std::sync::Arc::new(BackoffState::default())
+}
+
+impl BackoffState {
+ pub fn note_backoff(&self, noted: u32) {
+ crate::util::atomic_update_max(&self.backoff_secs, noted)
+ }
+
+ pub fn note_retry_after(&self, noted: u32) {
+ crate::util::atomic_update_max(&self.retry_after_secs, noted)
+ }
+
+ pub fn get_backoff_secs(&self) -> u32 {
+ self.backoff_secs.load(Ordering::SeqCst)
+ }
+
+ pub fn get_retry_after_secs(&self) -> u32 {
+ self.retry_after_secs.load(Ordering::SeqCst)
+ }
+
+ pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option<std::time::Duration> {
+ let bo = self.get_backoff_secs();
+ let ra = self.get_retry_after_secs();
+ let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) });
+ if secs > 0 {
+ Some(std::time::Duration::from_secs(secs))
+ } else {
+ None
+ }
+ }
+
+ pub fn reset(&self) {
+ self.backoff_secs.store(0, Ordering::SeqCst);
+ self.retry_after_secs.store(0, Ordering::SeqCst);
+ }
+}
+
+#[derive(Debug)]
+pub struct Sync15StorageClient {
+ tsc: token::TokenProvider,
+ pub(crate) backoff: BackoffListener,
+}
+
+impl SetupStorageClient for Sync15StorageClient {
+ fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>> {
+ self.relative_storage_request(Method::Get, "info/configuration")
+ }
+
+ fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> {
+ self.relative_storage_request(Method::Get, "info/collections")
+ }
+
+ fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> {
+ // meta/global is a Bso, so there's an extra dance to do.
+ let got: Sync15ClientResponse<BsoRecord<MetaGlobalRecord>> =
+ self.relative_storage_request(Method::Get, "storage/meta/global")?;
+ Ok(match got {
+ Sync15ClientResponse::Success {
+ record,
+ last_modified,
+ route,
+ status,
+ } => {
+ log::debug!(
+ "Got meta global with modified = {}; last-modified = {}",
+ record.modified,
+ last_modified
+ );
+ Sync15ClientResponse::Success {
+ record: record.payload,
+ last_modified,
+ route,
+ status,
+ }
+ }
+ Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e),
+ })
+ }
+
+ fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>> {
+ self.relative_storage_request(Method::Get, "storage/crypto/keys")
+ }
+
+ fn put_meta_global(
+ &self,
+ xius: ServerTimestamp,
+ global: &MetaGlobalRecord,
+ ) -> error::Result<ServerTimestamp> {
+ let bso = BsoRecord::new_record("global".into(), "meta".into(), global);
+ self.put("storage/meta/global", xius, &bso)
+ }
+
+ fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()> {
+ self.put("storage/crypto/keys", xius, keys)?;
+ Ok(())
+ }
+
+ fn wipe_all_remote(&self) -> error::Result<()> {
+ let s = self.tsc.api_endpoint()?;
+ let url = Url::parse(&s)?;
+
+ let req = self.build_request(Method::Delete, url)?;
+ match self.exec_request::<Value>(req, false) {
+ Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
+ | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
+ Ok(resp) => Err(resp.create_storage_error().into()),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl Sync15StorageClient {
+ pub fn new(init_params: Sync15StorageClientInit) -> error::Result<Sync15StorageClient> {
+ rc_crypto::ensure_initialized();
+ let tsc = token::TokenProvider::new(
+ init_params.tokenserver_url,
+ init_params.access_token,
+ init_params.key_id,
+ )?;
+ Ok(Sync15StorageClient {
+ tsc,
+ backoff: new_backoff_listener(),
+ })
+ }
+
+ pub fn get_encrypted_records(
+ &self,
+ collection_request: &CollectionRequest,
+ ) -> error::Result<Sync15ClientResponse<Vec<EncryptedBso>>> {
+ self.collection_request(Method::Get, collection_request)
+ }
+
+ #[inline]
+ fn authorized(&self, req: Request) -> error::Result<Request> {
+ let hawk_header_value = self.tsc.authorization(&req)?;
+ Ok(req.header(AUTHORIZATION, hawk_header_value)?)
+ }
+
+ // TODO: probably want a builder-like API to do collection requests (e.g. something
+ // that occupies roughly the same conceptual role as the Collection class in desktop)
+ fn build_request(&self, method: Method, url: Url) -> error::Result<Request> {
+ self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?)
+ }
+
+ fn relative_storage_request<P, T>(
+ &self,
+ method: Method,
+ relative_path: P,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ P: AsRef<str>,
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(relative_path.as_ref())?;
+ self.exec_request(self.build_request(method, url)?, false)
+ }
+
+ fn exec_request<T>(
+ &self,
+ req: Request,
+ require_success: bool,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ log::trace!(
+ "request: {} {} ({:?})",
+ req.method,
+ req.url.path(),
+ req.url.query()
+ );
+ let resp = req.send()?;
+
+ let result = Sync15ClientResponse::from_response(resp, &self.backoff)?;
+ match result {
+ Sync15ClientResponse::Success { .. } => Ok(result),
+ _ => {
+ if require_success {
+ Err(result.create_storage_error().into())
+ } else {
+ Ok(result)
+ }
+ }
+ }
+ }
+
+ fn collection_request<T>(
+ &self,
+ method: Method,
+ r: &CollectionRequest,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let url = r.build_url(Url::parse(&self.tsc.api_endpoint()?)?)?;
+ self.exec_request(self.build_request(method, url)?, false)
+ }
+
+ pub fn new_post_queue<'a, F: PostResponseHandler>(
+ &'a self,
+ coll: &str,
+ config: &InfoConfiguration,
+ ts: ServerTimestamp,
+ on_response: F,
+ ) -> error::Result<PostQueue<PostWrapper<'a>, F>> {
+ let pw = PostWrapper {
+ client: self,
+ coll: coll.into(),
+ };
+ Ok(PostQueue::new(config, ts, pw, on_response))
+ }
+
+ fn put<P, B>(
+ &self,
+ relative_path: P,
+ xius: ServerTimestamp,
+ body: &B,
+ ) -> error::Result<ServerTimestamp>
+ where
+ P: AsRef<str>,
+ B: serde::ser::Serialize,
+ {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(relative_path.as_ref())?;
+
+ let req = self
+ .build_request(Method::Put, url)?
+ .json(body)
+ .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?;
+
+ let resp = self.exec_request::<Value>(req, true)?;
+ // Note: we pass `true` for require_success, so this panic never happens.
+ if let Sync15ClientResponse::Success { last_modified, .. } = resp {
+ Ok(last_modified)
+ } else {
+ unreachable!("Error returned exec_request when `require_success` was true");
+ }
+ }
+
+ pub fn hashed_uid(&self) -> error::Result<String> {
+ self.tsc.hashed_uid()
+ }
+
+ pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?;
+ log::debug!("Wiping: {:?}", url);
+ let req = self.build_request(Method::Delete, url)?;
+ match self.exec_request::<Value>(req, false) {
+ Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
+ | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
+ Ok(resp) => Err(resp.create_storage_error().into()),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+pub struct PostWrapper<'a> {
+ client: &'a Sync15StorageClient,
+ coll: String,
+}
+
+impl<'a> BatchPoster for PostWrapper<'a> {
+ fn post<T, O>(
+ &self,
+ bytes: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ _: &PostQueue<T, O>,
+ ) -> error::Result<PostResponse> {
+ let url = CollectionRequest::new(self.coll.clone())
+ .batch(batch)
+ .commit(commit)
+ .build_url(Url::parse(&self.client.tsc.api_endpoint()?)?)?;
+
+ let req = self
+ .client
+ .build_request(Method::Post, url)?
+ .header(header_names::CONTENT_TYPE, "application/json")?
+ .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?
+ .body(bytes);
+ self.client.exec_request(req, false)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ #[test]
+ fn test_send() {
+ fn ensure_send<T: Send>() {}
+ // Compile will fail if not send.
+ ensure_send::<Sync15StorageClient>();
+ }
+}