summaryrefslogtreecommitdiffstats
path: root/third_party/rust/rusqlite/src/session.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/rusqlite/src/session.rs')
-rw-r--r--third_party/rust/rusqlite/src/session.rs938
1 files changed, 938 insertions, 0 deletions
diff --git a/third_party/rust/rusqlite/src/session.rs b/third_party/rust/rusqlite/src/session.rs
new file mode 100644
index 0000000000..f8aa764539
--- /dev/null
+++ b/third_party/rust/rusqlite/src/session.rs
@@ -0,0 +1,938 @@
+//! [Session Extension](https://sqlite.org/sessionintro.html)
+#![allow(non_camel_case_types)]
+
+use std::ffi::CStr;
+use std::io::{Read, Write};
+use std::marker::PhantomData;
+use std::os::raw::{c_char, c_int, c_uchar, c_void};
+use std::panic::{catch_unwind, RefUnwindSafe};
+use std::ptr;
+use std::slice::{from_raw_parts, from_raw_parts_mut};
+
+use fallible_streaming_iterator::FallibleStreamingIterator;
+
+use crate::error::{check, error_from_sqlite_code};
+use crate::ffi;
+use crate::hooks::Action;
+use crate::types::ValueRef;
+use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
+
+// https://sqlite.org/session.html
+
+/// An instance of this object is a session that can be
+/// used to record changes to a database.
+pub struct Session<'conn> {
+ phantom: PhantomData<&'conn Connection>,
+ s: *mut ffi::sqlite3_session,
+ filter: Option<Box<dyn Fn(&str) -> bool>>,
+}
+
+impl Session<'_> {
+ /// Create a new session object
+ #[inline]
+ pub fn new(db: &Connection) -> Result<Session<'_>> {
+ Session::new_with_name(db, DatabaseName::Main)
+ }
+
+ /// Create a new session object
+ #[inline]
+ pub fn new_with_name<'conn>(
+ db: &'conn Connection,
+ name: DatabaseName<'_>,
+ ) -> Result<Session<'conn>> {
+ let name = name.as_cstring()?;
+
+ let db = db.db.borrow_mut().db;
+
+ let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
+ check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?;
+
+ Ok(Session {
+ phantom: PhantomData,
+ s,
+ filter: None,
+ })
+ }
+
+ /// Set a table filter
+ pub fn table_filter<F>(&mut self, filter: Option<F>)
+ where
+ F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
+ {
+ unsafe extern "C" fn call_boxed_closure<F>(
+ p_arg: *mut c_void,
+ tbl_str: *const c_char,
+ ) -> c_int
+ where
+ F: Fn(&str) -> bool + RefUnwindSafe,
+ {
+ use std::str;
+
+ let boxed_filter: *mut F = p_arg as *mut F;
+ let tbl_name = {
+ let c_slice = CStr::from_ptr(tbl_str).to_bytes();
+ str::from_utf8(c_slice)
+ };
+ if let Ok(true) =
+ catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
+ {
+ 1
+ } else {
+ 0
+ }
+ }
+
+ match filter {
+ Some(filter) => {
+ let boxed_filter = Box::new(filter);
+ unsafe {
+ ffi::sqlite3session_table_filter(
+ self.s,
+ Some(call_boxed_closure::<F>),
+ &*boxed_filter as *const F as *mut _,
+ );
+ }
+ self.filter = Some(boxed_filter);
+ }
+ _ => {
+ unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
+ self.filter = None;
+ }
+ };
+ }
+
+ /// Attach a table. `None` means all tables.
+ pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
+ let table = if let Some(table) = table {
+ Some(str_to_cstring(table)?)
+ } else {
+ None
+ };
+ let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
+ check(unsafe { ffi::sqlite3session_attach(self.s, table) })
+ }
+
+ /// Generate a Changeset
+ pub fn changeset(&mut self) -> Result<Changeset> {
+ let mut n = 0;
+ let mut cs: *mut c_void = ptr::null_mut();
+ check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?;
+ Ok(Changeset { cs, n })
+ }
+
+ /// Write the set of changes represented by this session to `output`.
+ #[inline]
+ pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
+ let output_ref = &output;
+ check(unsafe {
+ ffi::sqlite3session_changeset_strm(
+ self.s,
+ Some(x_output),
+ output_ref as *const &mut dyn Write as *mut c_void,
+ )
+ })
+ }
+
+ /// Generate a Patchset
+ #[inline]
+ pub fn patchset(&mut self) -> Result<Changeset> {
+ let mut n = 0;
+ let mut ps: *mut c_void = ptr::null_mut();
+ check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?;
+ // TODO Validate: same struct
+ Ok(Changeset { cs: ps, n })
+ }
+
+ /// Write the set of patches represented by this session to `output`.
+ #[inline]
+ pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
+ let output_ref = &output;
+ check(unsafe {
+ ffi::sqlite3session_patchset_strm(
+ self.s,
+ Some(x_output),
+ output_ref as *const &mut dyn Write as *mut c_void,
+ )
+ })
+ }
+
+ /// Load the difference between tables.
+ pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
+ let from = from.as_cstring()?;
+ let table = str_to_cstring(table)?;
+ let table = table.as_ptr();
+ unsafe {
+ let mut errmsg = ptr::null_mut();
+ let r =
+ ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
+ if r != ffi::SQLITE_OK {
+ let errmsg: *mut c_char = errmsg;
+ let message = errmsg_to_string(&*errmsg);
+ ffi::sqlite3_free(errmsg as *mut c_void);
+ return Err(error_from_sqlite_code(r, Some(message)));
+ }
+ }
+ Ok(())
+ }
+
+ /// Test if a changeset has recorded any changes
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
+ }
+
+ /// Query the current state of the session
+ #[inline]
+ pub fn is_enabled(&self) -> bool {
+ unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
+ }
+
+ /// Enable or disable the recording of changes
+ #[inline]
+ pub fn set_enabled(&mut self, enabled: bool) {
+ unsafe {
+ ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
+ }
+ }
+
+ /// Query the current state of the indirect flag
+ #[inline]
+ pub fn is_indirect(&self) -> bool {
+ unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
+ }
+
+ /// Set or clear the indirect change flag
+ #[inline]
+ pub fn set_indirect(&mut self, indirect: bool) {
+ unsafe {
+ ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
+ }
+ }
+}
+
+impl Drop for Session<'_> {
+ #[inline]
+ fn drop(&mut self) {
+ if self.filter.is_some() {
+ self.table_filter(None::<fn(&str) -> bool>);
+ }
+ unsafe { ffi::sqlite3session_delete(self.s) };
+ }
+}
+
+/// Invert a changeset
+#[inline]
+pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
+ let input_ref = &input;
+ let output_ref = &output;
+ check(unsafe {
+ ffi::sqlite3changeset_invert_strm(
+ Some(x_input),
+ input_ref as *const &mut dyn Read as *mut c_void,
+ Some(x_output),
+ output_ref as *const &mut dyn Write as *mut c_void,
+ )
+ })
+}
+
+/// Combine two changesets
+#[inline]
+pub fn concat_strm(
+ input_a: &mut dyn Read,
+ input_b: &mut dyn Read,
+ output: &mut dyn Write,
+) -> Result<()> {
+ let input_a_ref = &input_a;
+ let input_b_ref = &input_b;
+ let output_ref = &output;
+ check(unsafe {
+ ffi::sqlite3changeset_concat_strm(
+ Some(x_input),
+ input_a_ref as *const &mut dyn Read as *mut c_void,
+ Some(x_input),
+ input_b_ref as *const &mut dyn Read as *mut c_void,
+ Some(x_output),
+ output_ref as *const &mut dyn Write as *mut c_void,
+ )
+ })
+}
+
+/// Changeset or Patchset
+pub struct Changeset {
+ cs: *mut c_void,
+ n: c_int,
+}
+
+impl Changeset {
+ /// Invert a changeset
+ #[inline]
+ pub fn invert(&self) -> Result<Changeset> {
+ let mut n = 0;
+ let mut cs = ptr::null_mut();
+ check(unsafe {
+ ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
+ })?;
+ Ok(Changeset { cs, n })
+ }
+
+ /// Create an iterator to traverse a changeset
+ #[inline]
+ pub fn iter(&self) -> Result<ChangesetIter<'_>> {
+ let mut it = ptr::null_mut();
+ check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?;
+ Ok(ChangesetIter {
+ phantom: PhantomData,
+ it,
+ item: None,
+ })
+ }
+
+ /// Concatenate two changeset objects
+ #[inline]
+ pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
+ let mut n = 0;
+ let mut cs = ptr::null_mut();
+ check(unsafe {
+ ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
+ })?;
+ Ok(Changeset { cs, n })
+ }
+}
+
+impl Drop for Changeset {
+ #[inline]
+ fn drop(&mut self) {
+ unsafe {
+ ffi::sqlite3_free(self.cs);
+ }
+ }
+}
+
+/// Cursor for iterating over the elements of a changeset
+/// or patchset.
+pub struct ChangesetIter<'changeset> {
+ phantom: PhantomData<&'changeset Changeset>,
+ it: *mut ffi::sqlite3_changeset_iter,
+ item: Option<ChangesetItem>,
+}
+
+impl ChangesetIter<'_> {
+ /// Create an iterator on `input`
+ #[inline]
+ pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
+ let mut it = ptr::null_mut();
+ check(unsafe {
+ ffi::sqlite3changeset_start_strm(
+ &mut it as *mut *mut _,
+ Some(x_input),
+ input as *const &mut dyn Read as *mut c_void,
+ )
+ })?;
+ Ok(ChangesetIter {
+ phantom: PhantomData,
+ it,
+ item: None,
+ })
+ }
+}
+
+impl FallibleStreamingIterator for ChangesetIter<'_> {
+ type Error = crate::error::Error;
+ type Item = ChangesetItem;
+
+ #[inline]
+ fn advance(&mut self) -> Result<()> {
+ let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
+ match rc {
+ ffi::SQLITE_ROW => {
+ self.item = Some(ChangesetItem { it: self.it });
+ Ok(())
+ }
+ ffi::SQLITE_DONE => {
+ self.item = None;
+ Ok(())
+ }
+ code => Err(error_from_sqlite_code(code, None)),
+ }
+ }
+
+ #[inline]
+ fn get(&self) -> Option<&ChangesetItem> {
+ self.item.as_ref()
+ }
+}
+
+/// Operation
+pub struct Operation<'item> {
+ table_name: &'item str,
+ number_of_columns: i32,
+ code: Action,
+ indirect: bool,
+}
+
+impl Operation<'_> {
+ /// Returns the table name.
+ #[inline]
+ pub fn table_name(&self) -> &str {
+ self.table_name
+ }
+
+ /// Returns the number of columns in table
+ #[inline]
+ pub fn number_of_columns(&self) -> i32 {
+ self.number_of_columns
+ }
+
+ /// Returns the action code.
+ #[inline]
+ pub fn code(&self) -> Action {
+ self.code
+ }
+
+ /// Returns `true` for an 'indirect' change.
+ #[inline]
+ pub fn indirect(&self) -> bool {
+ self.indirect
+ }
+}
+
+impl Drop for ChangesetIter<'_> {
+ #[inline]
+ fn drop(&mut self) {
+ unsafe {
+ ffi::sqlite3changeset_finalize(self.it);
+ }
+ }
+}
+
+/// An item passed to a conflict-handler by
+/// [`Connection::apply`](crate::Connection::apply), or an item generated by
+/// [`ChangesetIter::next`](ChangesetIter::next).
+// TODO enum ? Delete, Insert, Update, ...
+pub struct ChangesetItem {
+ it: *mut ffi::sqlite3_changeset_iter,
+}
+
+impl ChangesetItem {
+ /// Obtain conflicting row values
+ ///
+ /// May only be called with an `SQLITE_CHANGESET_DATA` or
+ /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
+ #[inline]
+ pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
+ unsafe {
+ let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
+ check(ffi::sqlite3changeset_conflict(
+ self.it,
+ col as i32,
+ &mut p_value,
+ ))?;
+ Ok(ValueRef::from_value(p_value))
+ }
+ }
+
+ /// Determine the number of foreign key constraint violations
+ ///
+ /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
+ /// handler callback.
+ #[inline]
+ pub fn fk_conflicts(&self) -> Result<i32> {
+ unsafe {
+ let mut p_out = 0;
+ check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?;
+ Ok(p_out)
+ }
+ }
+
+ /// Obtain new.* Values
+ ///
+ /// May only be called if the type of change is either `SQLITE_UPDATE` or
+ /// `SQLITE_INSERT`.
+ #[inline]
+ pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
+ unsafe {
+ let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
+ check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?;
+ Ok(ValueRef::from_value(p_value))
+ }
+ }
+
+ /// Obtain old.* Values
+ ///
+ /// May only be called if the type of change is either `SQLITE_DELETE` or
+ /// `SQLITE_UPDATE`.
+ #[inline]
+ pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
+ unsafe {
+ let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
+ check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?;
+ Ok(ValueRef::from_value(p_value))
+ }
+ }
+
+ /// Obtain the current operation
+ #[inline]
+ pub fn op(&self) -> Result<Operation<'_>> {
+ let mut number_of_columns = 0;
+ let mut code = 0;
+ let mut indirect = 0;
+ let tab = unsafe {
+ let mut pz_tab: *const c_char = ptr::null();
+ check(ffi::sqlite3changeset_op(
+ self.it,
+ &mut pz_tab,
+ &mut number_of_columns,
+ &mut code,
+ &mut indirect,
+ ))?;
+ CStr::from_ptr(pz_tab)
+ };
+ let table_name = tab.to_str()?;
+ Ok(Operation {
+ table_name,
+ number_of_columns,
+ code: Action::from(code),
+ indirect: indirect != 0,
+ })
+ }
+
+ /// Obtain the primary key definition of a table
+ #[inline]
+ pub fn pk(&self) -> Result<&[u8]> {
+ let mut number_of_columns = 0;
+ unsafe {
+ let mut pks: *mut c_uchar = ptr::null_mut();
+ check(ffi::sqlite3changeset_pk(
+ self.it,
+ &mut pks,
+ &mut number_of_columns,
+ ))?;
+ Ok(from_raw_parts(pks, number_of_columns as usize))
+ }
+ }
+}
+
+/// Used to combine two or more changesets or
+/// patchsets
+pub struct Changegroup {
+ cg: *mut ffi::sqlite3_changegroup,
+}
+
+impl Changegroup {
+ /// Create a new change group.
+ #[inline]
+ pub fn new() -> Result<Self> {
+ let mut cg = ptr::null_mut();
+ check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?;
+ Ok(Changegroup { cg })
+ }
+
+ /// Add a changeset
+ #[inline]
+ pub fn add(&mut self, cs: &Changeset) -> Result<()> {
+ check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) })
+ }
+
+ /// Add a changeset read from `input` to this change group.
+ #[inline]
+ pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
+ let input_ref = &input;
+ check(unsafe {
+ ffi::sqlite3changegroup_add_strm(
+ self.cg,
+ Some(x_input),
+ input_ref as *const &mut dyn Read as *mut c_void,
+ )
+ })
+ }
+
+ /// Obtain a composite Changeset
+ #[inline]
+ pub fn output(&mut self) -> Result<Changeset> {
+ let mut n = 0;
+ let mut output: *mut c_void = ptr::null_mut();
+ check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?;
+ Ok(Changeset { cs: output, n })
+ }
+
+ /// Write the combined set of changes to `output`.
+ #[inline]
+ pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
+ let output_ref = &output;
+ check(unsafe {
+ ffi::sqlite3changegroup_output_strm(
+ self.cg,
+ Some(x_output),
+ output_ref as *const &mut dyn Write as *mut c_void,
+ )
+ })
+ }
+}
+
+impl Drop for Changegroup {
+ #[inline]
+ fn drop(&mut self) {
+ unsafe {
+ ffi::sqlite3changegroup_delete(self.cg);
+ }
+ }
+}
+
+impl Connection {
+ /// Apply a changeset to a database
+ pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
+ where
+ F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
+ C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
+ {
+ let db = self.db.borrow_mut().db;
+
+ let filtered = filter.is_some();
+ let tuple = &mut (filter, conflict);
+ check(unsafe {
+ if filtered {
+ ffi::sqlite3changeset_apply(
+ db,
+ cs.n,
+ cs.cs,
+ Some(call_filter::<F, C>),
+ Some(call_conflict::<F, C>),
+ tuple as *mut (Option<F>, C) as *mut c_void,
+ )
+ } else {
+ ffi::sqlite3changeset_apply(
+ db,
+ cs.n,
+ cs.cs,
+ None,
+ Some(call_conflict::<F, C>),
+ tuple as *mut (Option<F>, C) as *mut c_void,
+ )
+ }
+ })
+ }
+
+ /// Apply a changeset to a database
+ pub fn apply_strm<F, C>(
+ &self,
+ input: &mut dyn Read,
+ filter: Option<F>,
+ conflict: C,
+ ) -> Result<()>
+ where
+ F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
+ C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
+ {
+ let input_ref = &input;
+ let db = self.db.borrow_mut().db;
+
+ let filtered = filter.is_some();
+ let tuple = &mut (filter, conflict);
+ check(unsafe {
+ if filtered {
+ ffi::sqlite3changeset_apply_strm(
+ db,
+ Some(x_input),
+ input_ref as *const &mut dyn Read as *mut c_void,
+ Some(call_filter::<F, C>),
+ Some(call_conflict::<F, C>),
+ tuple as *mut (Option<F>, C) as *mut c_void,
+ )
+ } else {
+ ffi::sqlite3changeset_apply_strm(
+ db,
+ Some(x_input),
+ input_ref as *const &mut dyn Read as *mut c_void,
+ None,
+ Some(call_conflict::<F, C>),
+ tuple as *mut (Option<F>, C) as *mut c_void,
+ )
+ }
+ })
+ }
+}
+
+/// Constants passed to the conflict handler
+/// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
+#[allow(missing_docs)]
+#[repr(i32)]
+#[derive(Debug, PartialEq)]
+#[non_exhaustive]
+#[allow(clippy::upper_case_acronyms)]
+pub enum ConflictType {
+ UNKNOWN = -1,
+ SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
+ SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
+ SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
+ SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
+ SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
+}
+impl From<i32> for ConflictType {
+ fn from(code: i32) -> ConflictType {
+ match code {
+ ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
+ ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
+ ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
+ ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
+ ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
+ _ => ConflictType::UNKNOWN,
+ }
+ }
+}
+
+/// Constants returned by the conflict handler
+/// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
+#[allow(missing_docs)]
+#[repr(i32)]
+#[derive(Debug, PartialEq)]
+#[non_exhaustive]
+#[allow(clippy::upper_case_acronyms)]
+pub enum ConflictAction {
+ SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
+ SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
+ SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
+}
+
+unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
+where
+ F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
+ C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
+{
+ use std::str;
+
+ let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
+ let tbl_name = {
+ let c_slice = CStr::from_ptr(tbl_str).to_bytes();
+ str::from_utf8(c_slice)
+ };
+ match *tuple {
+ (Some(ref filter), _) => {
+ if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) {
+ 1
+ } else {
+ 0
+ }
+ }
+ _ => unimplemented!(),
+ }
+}
+
+unsafe extern "C" fn call_conflict<F, C>(
+ p_ctx: *mut c_void,
+ e_conflict: c_int,
+ p: *mut ffi::sqlite3_changeset_iter,
+) -> c_int
+where
+ F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
+ C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
+{
+ let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
+ let conflict_type = ConflictType::from(e_conflict);
+ let item = ChangesetItem { it: p };
+ if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
+ action as c_int
+ } else {
+ ffi::SQLITE_CHANGESET_ABORT
+ }
+}
+
+unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
+ if p_in.is_null() {
+ return ffi::SQLITE_MISUSE;
+ }
+ let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
+ let input = p_in as *mut &mut dyn Read;
+ match (*input).read(bytes) {
+ Ok(n) => {
+ *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
+ // produce bytes.
+ ffi::SQLITE_OK
+ }
+ Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
+ }
+}
+
+unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
+ if p_out.is_null() {
+ return ffi::SQLITE_MISUSE;
+ }
+ // The sessions module never invokes an xOutput callback with the third
+ // parameter set to a value less than or equal to zero.
+ let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
+ let output = p_out as *mut &mut dyn Write;
+ match (*output).write_all(bytes) {
+ Ok(_) => ffi::SQLITE_OK,
+ Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use fallible_streaming_iterator::FallibleStreamingIterator;
+ use std::io::Read;
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
+ use crate::hooks::Action;
+ use crate::{Connection, Result};
+
+ fn one_changeset() -> Result<Changeset> {
+ let db = Connection::open_in_memory()?;
+ db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
+
+ let mut session = Session::new(&db)?;
+ assert!(session.is_empty());
+
+ session.attach(None)?;
+ db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
+
+ session.changeset()
+ }
+
+ fn one_changeset_strm() -> Result<Vec<u8>> {
+ let db = Connection::open_in_memory()?;
+ db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
+
+ let mut session = Session::new(&db)?;
+ assert!(session.is_empty());
+
+ session.attach(None)?;
+ db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
+
+ let mut output = Vec::new();
+ session.changeset_strm(&mut output)?;
+ Ok(output)
+ }
+
+ #[test]
+ fn test_changeset() -> Result<()> {
+ let changeset = one_changeset()?;
+ let mut iter = changeset.iter()?;
+ let item = iter.next()?;
+ assert!(item.is_some());
+
+ let item = item.unwrap();
+ let op = item.op()?;
+ assert_eq!("foo", op.table_name());
+ assert_eq!(1, op.number_of_columns());
+ assert_eq!(Action::SQLITE_INSERT, op.code());
+ assert!(!op.indirect());
+
+ let pk = item.pk()?;
+ assert_eq!(&[1], pk);
+
+ let new_value = item.new_value(0)?;
+ assert_eq!(Ok("bar"), new_value.as_str());
+ Ok(())
+ }
+
+ #[test]
+ fn test_changeset_strm() -> Result<()> {
+ let output = one_changeset_strm()?;
+ assert!(!output.is_empty());
+ assert_eq!(14, output.len());
+
+ let input: &mut dyn Read = &mut output.as_slice();
+ let mut iter = ChangesetIter::start_strm(&input)?;
+ let item = iter.next()?;
+ assert!(item.is_some());
+ Ok(())
+ }
+
+ #[test]
+ fn test_changeset_apply() -> Result<()> {
+ let changeset = one_changeset()?;
+
+ let db = Connection::open_in_memory()?;
+ db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
+
+ static CALLED: AtomicBool = AtomicBool::new(false);
+ db.apply(
+ &changeset,
+ None::<fn(&str) -> bool>,
+ |_conflict_type, _item| {
+ CALLED.store(true, Ordering::Relaxed);
+ ConflictAction::SQLITE_CHANGESET_OMIT
+ },
+ )?;
+
+ assert!(!CALLED.load(Ordering::Relaxed));
+ let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
+ row.get::<_, i32>(0)
+ })?;
+ assert_eq!(1, check);
+
+ // conflict expected when same changeset applied again on the same db
+ db.apply(
+ &changeset,
+ None::<fn(&str) -> bool>,
+ |conflict_type, item| {
+ CALLED.store(true, Ordering::Relaxed);
+ assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
+ let conflict = item.conflict(0).unwrap();
+ assert_eq!(Ok("bar"), conflict.as_str());
+ ConflictAction::SQLITE_CHANGESET_OMIT
+ },
+ )?;
+ assert!(CALLED.load(Ordering::Relaxed));
+ Ok(())
+ }
+
+ #[test]
+ fn test_changeset_apply_strm() -> Result<()> {
+ let output = one_changeset_strm()?;
+
+ let db = Connection::open_in_memory()?;
+ db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
+
+ let mut input = output.as_slice();
+ db.apply_strm(
+ &mut input,
+ None::<fn(&str) -> bool>,
+ |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
+ )?;
+
+ let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
+ row.get::<_, i32>(0)
+ })?;
+ assert_eq!(1, check);
+ Ok(())
+ }
+
+ #[test]
+ fn test_session_empty() -> Result<()> {
+ let db = Connection::open_in_memory()?;
+ db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
+
+ let mut session = Session::new(&db)?;
+ assert!(session.is_empty());
+
+ session.attach(None)?;
+ db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
+
+ assert!(!session.is_empty());
+ Ok(())
+ }
+
+ #[test]
+ fn test_session_set_enabled() -> Result<()> {
+ let db = Connection::open_in_memory()?;
+
+ let mut session = Session::new(&db)?;
+ assert!(session.is_enabled());
+ session.set_enabled(false);
+ assert!(!session.is_enabled());
+ Ok(())
+ }
+
+ #[test]
+ fn test_session_set_indirect() -> Result<()> {
+ let db = Connection::open_in_memory()?;
+
+ let mut session = Session::new(&db)?;
+ assert!(!session.is_indirect());
+ session.set_indirect(true);
+ assert!(session.is_indirect());
+ Ok(())
+ }
+}