fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146    fn to_bytes(&self) -> Vec<u8>;
147}
148
149/// A key + value pair in the database with a unique prefix
150/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
151pub trait DatabaseRecord: DatabaseKeyPrefix {
152    const DB_PREFIX: u8;
153    const NOTIFY_ON_MODIFY: bool = false;
154    type Key: DatabaseKey + Debug;
155    type Value: DatabaseValue + Debug;
156}
157
158/// A key that can be used to query one or more `DatabaseRecord`
159/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
160pub trait DatabaseLookup: DatabaseKeyPrefix {
161    type Record: DatabaseRecord;
162}
163
164// Every `DatabaseRecord` is automatically a `DatabaseLookup`
165impl<Record> DatabaseLookup for Record
166where
167    Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169    type Record = Record;
170}
171
172/// `DatabaseKey` that represents the lookup structure for retrieving key/value
173/// pairs from the database.
174pub trait DatabaseKey: Sized {
175    /// Send a notification to tasks waiting to be notified if the value of
176    /// `DatabaseKey` is modified
177    ///
178    /// For instance, this can be used to be notified when a key in the
179    /// database is created. It is also possible to run a closure with the
180    /// value of the `DatabaseKey` as parameter to verify some changes to
181    /// that value.
182    const NOTIFY_ON_MODIFY: bool = false;
183    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
187pub trait DatabaseKeyWithNotify {}
188
189/// `DatabaseValue` that represents the value structure of database records.
190pub trait DatabaseValue: Sized + Debug {
191    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192    fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197/// Just ignore this type, it's only there to make compiler happy
198///
199/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
200pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202/// Error returned when the autocommit function fails
203#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205    /// Committing the transaction failed too many times, giving up
206    #[error("Commit Failed: {last_error}")]
207    CommitFailed {
208        /// Number of attempts
209        attempts: usize,
210        /// Last error on commit
211        last_error: anyhow::Error,
212    },
213    /// Error returned by the closure provided to `autocommit`. If returned no
214    /// commit was attempted in that round
215    #[error("Closure error: {error}")]
216    ClosureError {
217        /// The attempt on which the closure returned an error
218        ///
219        /// Values other than 0 typically indicate a logic error since the
220        /// closure given to `autocommit` should not have side effects
221        /// and thus keep succeeding if it succeeded once.
222        attempts: usize,
223        /// Error returned by the closure
224        error: E,
225    },
226}
227
228/// Raw database implementation
229///
230/// This and [`IRawDatabaseTransaction`] are meant to be implemented
231/// by crates like `fedimint-rocksdb` to provide a concrete implementation
232/// of a database to be used by Fedimint.
233///
234/// This is in contrast of [`IDatabase`] which includes extra
235/// functionality that Fedimint needs (and adds) on top of it.
236#[apply(async_trait_maybe_send!)]
237pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
238    /// A raw database transaction type
239    type Transaction<'a>: IRawDatabaseTransaction + Debug;
240
241    /// Start a database transaction
242    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
243
244    // Checkpoint the database to a backup directory
245    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
246}
247
248#[apply(async_trait_maybe_send!)]
249impl<T> IRawDatabase for Box<T>
250where
251    T: IRawDatabase,
252{
253    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
254
255    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
256        (**self).begin_transaction().await
257    }
258
259    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
260        (**self).checkpoint(backup_path)
261    }
262}
263
264/// An extension trait with convenience operations on [`IRawDatabase`]
265pub trait IRawDatabaseExt: IRawDatabase + Sized {
266    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
267    ///
268    /// When type inference is not an issue, [`Into::into`] can be used instead.
269    fn into_database(self) -> Database {
270        Database::new(self, ModuleRegistry::default())
271    }
272}
273
274impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
275
276impl<T> From<T> for Database
277where
278    T: IRawDatabase,
279{
280    fn from(raw: T) -> Self {
281        Self::new(raw, ModuleRegistry::default())
282    }
283}
284
285/// A database that on top of a raw database operation, implements
286/// key notification system.
287#[apply(async_trait_maybe_send!)]
288pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
289    /// Start a database transaction
290    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
291    /// Register (and wait) for `key` updates
292    async fn register(&self, key: &[u8]);
293    /// Notify about `key` update (creation, modification, deletion)
294    async fn notify(&self, key: &[u8]);
295
296    /// The prefix len of this database refers to the global (as opposed to
297    /// module-isolated) key space
298    fn is_global(&self) -> bool;
299
300    /// Checkpoints the database to a backup directory
301    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
302}
303
304#[apply(async_trait_maybe_send!)]
305impl<T> IDatabase for Arc<T>
306where
307    T: IDatabase + ?Sized,
308{
309    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
310        (**self).begin_transaction().await
311    }
312    async fn register(&self, key: &[u8]) {
313        (**self).register(key).await;
314    }
315    async fn notify(&self, key: &[u8]) {
316        (**self).notify(key).await;
317    }
318
319    fn is_global(&self) -> bool {
320        (**self).is_global()
321    }
322
323    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
324        (**self).checkpoint(backup_path)
325    }
326}
327
328/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
329///
330/// Mostly notification system, but also run-time single-commit handling.
331struct BaseDatabase<RawDatabase> {
332    notifications: Arc<Notifications>,
333    raw: RawDatabase,
334}
335
336impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        f.write_str("BaseDatabase")
339    }
340}
341
342#[apply(async_trait_maybe_send!)]
343impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
344    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
345        Box::new(BaseDatabaseTransaction::new(
346            self.raw.begin_transaction().await,
347            self.notifications.clone(),
348        ))
349    }
350    async fn register(&self, key: &[u8]) {
351        self.notifications.register(key).await;
352    }
353    async fn notify(&self, key: &[u8]) {
354        self.notifications.notify(key);
355    }
356
357    fn is_global(&self) -> bool {
358        true
359    }
360
361    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
362        self.raw.checkpoint(backup_path)
363    }
364}
365
366/// A public-facing newtype over `IDatabase`
367///
368/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
369/// and implements common utility function for auto-commits, db isolation,
370/// and other.
371#[derive(Clone, Debug)]
372pub struct Database {
373    inner: Arc<dyn IDatabase + 'static>,
374    module_decoders: ModuleDecoderRegistry,
375}
376
377impl Database {
378    pub fn strong_count(&self) -> usize {
379        Arc::strong_count(&self.inner)
380    }
381
382    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
383        self.inner
384    }
385}
386
387impl Database {
388    /// Creates a new Fedimint database from any object implementing
389    /// [`IDatabase`].
390    ///
391    /// See also [`Database::new_from_arc`].
392    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
393        let inner = BaseDatabase {
394            raw,
395            notifications: Arc::new(Notifications::new()),
396        };
397        Self::new_from_arc(
398            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
399            module_decoders,
400        )
401    }
402
403    /// Create [`Database`] from an already typed-erased `IDatabase`.
404    pub fn new_from_arc(
405        inner: Arc<dyn IDatabase + 'static>,
406        module_decoders: ModuleDecoderRegistry,
407    ) -> Self {
408        Self {
409            inner,
410            module_decoders,
411        }
412    }
413
414    /// Create [`Database`] isolated to a partition with a given `prefix`
415    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
416        Self {
417            inner: Arc::new(PrefixDatabase {
418                inner: self.inner.clone(),
419                global_dbtx_access_token: None,
420                prefix,
421            }),
422            module_decoders: self.module_decoders.clone(),
423        }
424    }
425
426    /// Create [`Database`] isolated to a partition with a prefix for a given
427    /// `module_instance_id`, allowing the module to access `global_dbtx` with
428    /// the right `access_token`
429    pub fn with_prefix_module_id(
430        &self,
431        module_instance_id: ModuleInstanceId,
432    ) -> (Self, GlobalDBTxAccessToken) {
433        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
434        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
435        (
436            Self {
437                inner: Arc::new(PrefixDatabase {
438                    inner: self.inner.clone(),
439                    global_dbtx_access_token: Some(global_dbtx_access_token),
440                    prefix,
441                }),
442                module_decoders: self.module_decoders.clone(),
443            },
444            global_dbtx_access_token,
445        )
446    }
447
448    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
449        Self {
450            inner: self.inner.clone(),
451            module_decoders,
452        }
453    }
454
455    /// Is this `Database` a global, unpartitioned `Database`
456    pub fn is_global(&self) -> bool {
457        self.inner.is_global()
458    }
459
460    /// `Err` if [`Self::is_global`] is not true
461    pub fn ensure_global(&self) -> Result<()> {
462        if !self.is_global() {
463            bail!("Database instance not global");
464        }
465
466        Ok(())
467    }
468
469    /// `Err` if [`Self::is_global`] is true
470    pub fn ensure_isolated(&self) -> Result<()> {
471        if self.is_global() {
472            bail!("Database instance not isolated");
473        }
474
475        Ok(())
476    }
477
478    /// Begin a new committable database transaction
479    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
480    where
481        's: 'tx,
482    {
483        DatabaseTransaction::<Committable>::new(
484            self.inner.begin_transaction().await,
485            self.module_decoders.clone(),
486        )
487    }
488
489    /// Begin a new non-committable database transaction
490    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
491    where
492        's: 'tx,
493    {
494        self.begin_transaction().await.into_nc()
495    }
496
497    pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
498        self.inner.checkpoint(backup_path)
499    }
500
501    /// Runs a closure with a reference to a database transaction and tries to
502    /// commit the transaction if the closure returns `Ok` and rolls it back
503    /// otherwise. If committing fails the closure is run for up to
504    /// `max_attempts` times. If `max_attempts` is `None` it will run
505    /// `usize::MAX` times which is close enough to infinite times.
506    ///
507    /// The closure `tx_fn` provided should not have side effects outside of the
508    /// database transaction provided, or if it does these should be
509    /// idempotent, since the closure might be run multiple times.
510    ///
511    /// # Lifetime Parameters
512    ///
513    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
514    /// mutable reference to the database transaction ensures that the
515    /// reference lives as least as long as the returned future of the
516    /// closure.
517    ///
518    /// Further, the reference to self (`'s`) must outlive the
519    /// `DatabaseTransaction<'dt>`. In other words, the
520    /// `DatabaseTransaction` must live as least as long as `self` and that is
521    /// true as the `DatabaseTransaction` is only dropped at the end of the
522    /// `loop{}`.
523    ///
524    /// # Panics
525    ///
526    /// This function panics when the given number of maximum attempts is zero.
527    /// `max_attempts` must be greater or equal to one.
528    pub async fn autocommit<'s, 'dbtx, F, T, E>(
529        &'s self,
530        tx_fn: F,
531        max_attempts: Option<usize>,
532    ) -> Result<T, AutocommitError<E>>
533    where
534        's: 'dbtx,
535        for<'r, 'o> F: Fn(
536            &'r mut DatabaseTransaction<'o>,
537            PhantomBound<'dbtx, 'o>,
538        ) -> BoxFuture<'r, Result<T, E>>,
539    {
540        assert_ne!(max_attempts, Some(0));
541        let mut curr_attempts: usize = 0;
542
543        loop {
544            // The `checked_add()` function is used to catch the `usize` overflow.
545            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
546            // after ~50 days. But if that's the case, something else must be wrong.
547            // With `usize=64bit` it would take much longer, obviously.
548            curr_attempts = curr_attempts
549                .checked_add(1)
550                .expect("db autocommit attempt counter overflowed");
551
552            let mut dbtx = self.begin_transaction().await;
553
554            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
555            let val = match tx_fn_res {
556                Ok(val) => val,
557                Err(err) => {
558                    dbtx.ignore_uncommitted();
559                    return Err(AutocommitError::ClosureError {
560                        attempts: curr_attempts,
561                        error: err,
562                    });
563                }
564            };
565
566            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
567
568            match dbtx.commit_tx_result().await {
569                Ok(()) => {
570                    return Ok(val);
571                }
572                Err(err) => {
573                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
574                        warn!(
575                            target: LOG_DB,
576                            curr_attempts,
577                            ?err,
578                            "Database commit failed in an autocommit block - terminating"
579                        );
580                        return Err(AutocommitError::CommitFailed {
581                            attempts: curr_attempts,
582                            last_error: err,
583                        });
584                    }
585
586                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
587                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
588                    warn!(
589                        target: LOG_DB,
590                        curr_attempts,
591                        err = %err.fmt_compact_anyhow(),
592                        delay_ms = %delay,
593                        "Database commit failed in an autocommit block - retrying"
594                    );
595                    crate::runtime::sleep(Duration::from_millis(delay)).await;
596                }
597            }
598        }
599    }
600
601    /// Waits for key to be notified.
602    ///
603    /// Calls the `checker` when value of the key may have changed.
604    /// Returns the value when `checker` returns a `Some(T)`.
605    pub async fn wait_key_check<'a, K, T>(
606        &'a self,
607        key: &K,
608        checker: impl Fn(Option<K::Value>) -> Option<T>,
609    ) -> (T, DatabaseTransaction<'a, Committable>)
610    where
611        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
612    {
613        let key_bytes = key.to_bytes();
614        loop {
615            // register for notification
616            let notify = self.inner.register(&key_bytes);
617
618            // check for value in db
619            let mut tx = self.inner.begin_transaction().await;
620
621            let maybe_value_bytes = tx
622                .raw_get_bytes(&key_bytes)
623                .await
624                .expect("Unrecoverable error when reading from database")
625                .map(|value_bytes| {
626                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
627                });
628
629            if let Some(value) = checker(maybe_value_bytes) {
630                return (
631                    value,
632                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
633                );
634            }
635
636            // key not found, try again
637            notify.await;
638            // if miss a notification between await and next register, it is
639            // fine. because we are going check the database
640        }
641    }
642
643    /// Waits for key to be present in database.
644    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
645    where
646        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
647    {
648        self.wait_key_check(key, std::convert::identity).await.0
649    }
650}
651
652fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
653    let mut bytes = vec![MODULE_GLOBAL_PREFIX];
654    bytes.append(&mut module_instance_id.consensus_encode_to_vec());
655    bytes
656}
657
658/// A database that wraps an `inner` one and adds a prefix to all operations,
659/// effectively creating an isolated partition.
660#[derive(Clone, Debug)]
661struct PrefixDatabase<Inner>
662where
663    Inner: Debug,
664{
665    prefix: Vec<u8>,
666    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
667    inner: Inner,
668}
669
670impl<Inner> PrefixDatabase<Inner>
671where
672    Inner: Debug,
673{
674    // TODO: we should optimize these concatenations, maybe by having an internal
675    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
676    // something
677    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
678        let mut full_key = self.prefix.clone();
679        full_key.extend_from_slice(key);
680        full_key
681    }
682}
683
684#[apply(async_trait_maybe_send!)]
685impl<Inner> IDatabase for PrefixDatabase<Inner>
686where
687    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
688{
689    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
690        Box::new(PrefixDatabaseTransaction {
691            inner: self.inner.begin_transaction().await,
692            global_dbtx_access_token: self.global_dbtx_access_token,
693            prefix: self.prefix.clone(),
694        })
695    }
696    async fn register(&self, key: &[u8]) {
697        self.inner.register(&self.get_full_key(key)).await;
698    }
699
700    async fn notify(&self, key: &[u8]) {
701        self.inner.notify(&self.get_full_key(key)).await;
702    }
703
704    fn is_global(&self) -> bool {
705        if self.global_dbtx_access_token.is_some() {
706            false
707        } else {
708            self.inner.is_global()
709        }
710    }
711
712    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
713        self.inner.checkpoint(backup_path)
714    }
715}
716
717/// A database transactions that wraps an `inner` one and adds a prefix to all
718/// operations, effectively creating an isolated partition.
719///
720/// Produced by [`PrefixDatabase`].
721#[derive(Debug)]
722struct PrefixDatabaseTransaction<Inner> {
723    inner: Inner,
724    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
725    prefix: Vec<u8>,
726}
727
728impl<Inner> PrefixDatabaseTransaction<Inner> {
729    // TODO: we should optimize these concatenations, maybe by having an internal
730    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
731    // something
732    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
733        let mut full_key = self.prefix.clone();
734        full_key.extend_from_slice(key);
735        full_key
736    }
737
738    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
739        Range {
740            start: self.get_full_key(range.start),
741            end: self.get_full_key(range.end),
742        }
743    }
744
745    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
746        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
747    }
748}
749
750#[apply(async_trait_maybe_send!)]
751impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
752where
753    Inner: IDatabaseTransaction,
754{
755    async fn commit_tx(&mut self) -> Result<()> {
756        self.inner.commit_tx().await
757    }
758
759    fn is_global(&self) -> bool {
760        if self.global_dbtx_access_token.is_some() {
761            false
762        } else {
763            self.inner.is_global()
764        }
765    }
766
767    fn global_dbtx(
768        &mut self,
769        access_token: GlobalDBTxAccessToken,
770    ) -> &mut dyn IDatabaseTransaction {
771        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
772            assert_eq!(
773                access_token, self_global_dbtx_access_token,
774                "Invalid access key used to access global_dbtx"
775            );
776            &mut self.inner
777        } else {
778            self.inner.global_dbtx(access_token)
779        }
780    }
781}
782
783#[apply(async_trait_maybe_send!)]
784impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
785where
786    Inner: IDatabaseTransactionOpsCore,
787{
788    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
789        let key = self.get_full_key(key);
790        self.inner.raw_insert_bytes(&key, value).await
791    }
792
793    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794        let key = self.get_full_key(key);
795        self.inner.raw_get_bytes(&key).await
796    }
797
798    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
799        let key = self.get_full_key(key);
800        self.inner.raw_remove_entry(&key).await
801    }
802
803    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
804        let key = self.get_full_key(key_prefix);
805        let stream = self.inner.raw_find_by_prefix(&key).await?;
806        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
807    }
808
809    async fn raw_find_by_prefix_sorted_descending(
810        &mut self,
811        key_prefix: &[u8],
812    ) -> Result<PrefixStream<'_>> {
813        let key = self.get_full_key(key_prefix);
814        let stream = self
815            .inner
816            .raw_find_by_prefix_sorted_descending(&key)
817            .await?;
818        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
819    }
820
821    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
822        let range = self.get_full_range(range);
823        let stream = self
824            .inner
825            .raw_find_by_range(Range {
826                start: &range.start,
827                end: &range.end,
828            })
829            .await?;
830        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
831    }
832
833    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
834        let key = self.get_full_key(key_prefix);
835        self.inner.raw_remove_by_prefix(&key).await
836    }
837}
838
839#[apply(async_trait_maybe_send!)]
840impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
841where
842    Inner: IDatabaseTransactionOps,
843{
844    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
845        self.inner.rollback_tx_to_savepoint().await
846    }
847
848    async fn set_tx_savepoint(&mut self) -> Result<()> {
849        self.set_tx_savepoint().await
850    }
851}
852
853/// Core raw a operations database transactions supports
854///
855/// Used to enforce the same signature on all types supporting it
856#[apply(async_trait_maybe_send!)]
857pub trait IDatabaseTransactionOpsCore: MaybeSend {
858    /// Insert entry
859    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
860
861    /// Get key value
862    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
863
864    /// Remove entry by `key`
865    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
866
867    /// Returns an stream of key-value pairs with keys that start with
868    /// `key_prefix`, sorted by key.
869    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
870
871    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
872    async fn raw_find_by_prefix_sorted_descending(
873        &mut self,
874        key_prefix: &[u8],
875    ) -> Result<PrefixStream<'_>>;
876
877    /// Returns an stream of key-value pairs with keys within a `range`, sorted
878    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
879    /// exclusively above.
880    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
881
882    /// Delete keys matching prefix
883    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
884}
885
886#[apply(async_trait_maybe_send!)]
887impl<T> IDatabaseTransactionOpsCore for Box<T>
888where
889    T: IDatabaseTransactionOpsCore + ?Sized,
890{
891    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
892        (**self).raw_insert_bytes(key, value).await
893    }
894
895    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
896        (**self).raw_get_bytes(key).await
897    }
898
899    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
900        (**self).raw_remove_entry(key).await
901    }
902
903    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
904        (**self).raw_find_by_prefix(key_prefix).await
905    }
906
907    async fn raw_find_by_prefix_sorted_descending(
908        &mut self,
909        key_prefix: &[u8],
910    ) -> Result<PrefixStream<'_>> {
911        (**self)
912            .raw_find_by_prefix_sorted_descending(key_prefix)
913            .await
914    }
915
916    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
917        (**self).raw_find_by_range(range).await
918    }
919
920    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
921        (**self).raw_remove_by_prefix(key_prefix).await
922    }
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for &mut T
927where
928    T: IDatabaseTransactionOpsCore + ?Sized,
929{
930    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
931        (**self).raw_insert_bytes(key, value).await
932    }
933
934    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
935        (**self).raw_get_bytes(key).await
936    }
937
938    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
939        (**self).raw_remove_entry(key).await
940    }
941
942    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
943        (**self).raw_find_by_prefix(key_prefix).await
944    }
945
946    async fn raw_find_by_prefix_sorted_descending(
947        &mut self,
948        key_prefix: &[u8],
949    ) -> Result<PrefixStream<'_>> {
950        (**self)
951            .raw_find_by_prefix_sorted_descending(key_prefix)
952            .await
953    }
954
955    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
956        (**self).raw_find_by_range(range).await
957    }
958
959    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
960        (**self).raw_remove_by_prefix(key_prefix).await
961    }
962}
963
964/// Additional operations (only some) database transactions expose, on top of
965/// [`IDatabaseTransactionOpsCore`]
966///
967/// In certain contexts exposing these operations would be a problem, so they
968/// are moved to a separate trait.
969#[apply(async_trait_maybe_send!)]
970pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
971    /// Create a savepoint during the transaction that can be rolled back to
972    /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
973    /// atomically remove the writes that were applied since the savepoint
974    /// was created.
975    ///
976    /// Warning: Avoid using this in fedimint client code as not all database
977    /// transaction implementations will support setting a savepoint during
978    /// a transaction.
979    async fn set_tx_savepoint(&mut self) -> Result<()>;
980
981    async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
982}
983
984#[apply(async_trait_maybe_send!)]
985impl<T> IDatabaseTransactionOps for Box<T>
986where
987    T: IDatabaseTransactionOps + ?Sized,
988{
989    async fn set_tx_savepoint(&mut self) -> Result<()> {
990        (**self).set_tx_savepoint().await
991    }
992
993    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
994        (**self).rollback_tx_to_savepoint().await
995    }
996}
997
998#[apply(async_trait_maybe_send!)]
999impl<T> IDatabaseTransactionOps for &mut T
1000where
1001    T: IDatabaseTransactionOps + ?Sized,
1002{
1003    async fn set_tx_savepoint(&mut self) -> Result<()> {
1004        (**self).set_tx_savepoint().await
1005    }
1006
1007    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1008        (**self).rollback_tx_to_savepoint().await
1009    }
1010}
1011
1012/// Like [`IDatabaseTransactionOpsCore`], but typed
1013///
1014/// Implemented via blanket impl for everything that implements
1015/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1016/// [`WithDecoders`]).
1017#[apply(async_trait_maybe_send!)]
1018pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1019    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1020    where
1021        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1022
1023    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1024    where
1025        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1026        K::Value: MaybeSend + MaybeSync;
1027
1028    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1029    where
1030        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1031        K::Value: MaybeSend + MaybeSync;
1032
1033    async fn find_by_range<K>(
1034        &mut self,
1035        key_range: Range<K>,
1036    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1037    where
1038        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1039        K::Value: MaybeSend + MaybeSync;
1040
1041    async fn find_by_prefix<KP>(
1042        &mut self,
1043        key_prefix: &KP,
1044    ) -> Pin<
1045        Box<
1046            maybe_add_send!(
1047                dyn Stream<
1048                        Item = (
1049                            KP::Record,
1050                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1051                        ),
1052                    > + '_
1053            ),
1054        >,
1055    >
1056    where
1057        KP: DatabaseLookup + MaybeSend + MaybeSync,
1058        KP::Record: DatabaseKey;
1059
1060    async fn find_by_prefix_sorted_descending<KP>(
1061        &mut self,
1062        key_prefix: &KP,
1063    ) -> Pin<
1064        Box<
1065            maybe_add_send!(
1066                dyn Stream<
1067                        Item = (
1068                            KP::Record,
1069                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070                        ),
1071                    > + '_
1072            ),
1073        >,
1074    >
1075    where
1076        KP: DatabaseLookup + MaybeSend + MaybeSync,
1077        KP::Record: DatabaseKey;
1078
1079    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1080    where
1081        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1082
1083    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1084    where
1085        KP: DatabaseLookup + MaybeSend + MaybeSync;
1086}
1087
1088// blanket implementation of typed ops for anything that implements raw ops and
1089// has decoders
1090#[apply(async_trait_maybe_send!)]
1091impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1092where
1093    T: IDatabaseTransactionOpsCore + WithDecoders,
1094{
1095    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1096    where
1097        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1098    {
1099        let key_bytes = key.to_bytes();
1100        let raw = self
1101            .raw_get_bytes(&key_bytes)
1102            .await
1103            .expect("Unrecoverable error occurred while reading and entry from the database");
1104        raw.map(|value_bytes| {
1105            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1106        })
1107    }
1108
1109    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1110    where
1111        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1112        K::Value: MaybeSend + MaybeSync,
1113    {
1114        let key_bytes = key.to_bytes();
1115        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1116            .await
1117            .expect("Unrecoverable error occurred while inserting entry into the database")
1118            .map(|value_bytes| {
1119                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1120            })
1121    }
1122
1123    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1124    where
1125        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1126        K::Value: MaybeSend + MaybeSync,
1127    {
1128        if let Some(prev) = self.insert_entry(key, value).await {
1129            panic!(
1130                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1131            );
1132        }
1133    }
1134
1135    async fn find_by_range<K>(
1136        &mut self,
1137        key_range: Range<K>,
1138    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1139    where
1140        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1141        K::Value: MaybeSend + MaybeSync,
1142    {
1143        let decoders = self.decoders().clone();
1144        Box::pin(
1145            self.raw_find_by_range(Range {
1146                start: &key_range.start.to_bytes(),
1147                end: &key_range.end.to_bytes(),
1148            })
1149            .await
1150            .expect("Unrecoverable error occurred while listing entries from the database")
1151            .map(move |(key_bytes, value_bytes)| {
1152                let key = decode_key_expect(&key_bytes, &decoders);
1153                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1154                (key, value)
1155            }),
1156        )
1157    }
1158
1159    async fn find_by_prefix<KP>(
1160        &mut self,
1161        key_prefix: &KP,
1162    ) -> Pin<
1163        Box<
1164            maybe_add_send!(
1165                dyn Stream<
1166                        Item = (
1167                            KP::Record,
1168                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1169                        ),
1170                    > + '_
1171            ),
1172        >,
1173    >
1174    where
1175        KP: DatabaseLookup + MaybeSend + MaybeSync,
1176        KP::Record: DatabaseKey,
1177    {
1178        let decoders = self.decoders().clone();
1179        Box::pin(
1180            self.raw_find_by_prefix(&key_prefix.to_bytes())
1181                .await
1182                .expect("Unrecoverable error occurred while listing entries from the database")
1183                .map(move |(key_bytes, value_bytes)| {
1184                    let key = decode_key_expect(&key_bytes, &decoders);
1185                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1186                    (key, value)
1187                }),
1188        )
1189    }
1190
1191    async fn find_by_prefix_sorted_descending<KP>(
1192        &mut self,
1193        key_prefix: &KP,
1194    ) -> Pin<
1195        Box<
1196            maybe_add_send!(
1197                dyn Stream<
1198                        Item = (
1199                            KP::Record,
1200                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1201                        ),
1202                    > + '_
1203            ),
1204        >,
1205    >
1206    where
1207        KP: DatabaseLookup + MaybeSend + MaybeSync,
1208        KP::Record: DatabaseKey,
1209    {
1210        let decoders = self.decoders().clone();
1211        Box::pin(
1212            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1213                .await
1214                .expect("Unrecoverable error occurred while listing entries from the database")
1215                .map(move |(key_bytes, value_bytes)| {
1216                    let key = decode_key_expect(&key_bytes, &decoders);
1217                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1218                    (key, value)
1219                }),
1220        )
1221    }
1222    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1223    where
1224        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1225    {
1226        let key_bytes = key.to_bytes();
1227        self.raw_remove_entry(&key_bytes)
1228            .await
1229            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1230            .map(|value_bytes| {
1231                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1232            })
1233    }
1234    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1235    where
1236        KP: DatabaseLookup + MaybeSend + MaybeSync,
1237    {
1238        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1239            .await
1240            .expect("Unrecoverable error when removing entries from the database");
1241    }
1242}
1243
1244/// A database type that has decoders, which allows it to implement
1245/// [`IDatabaseTransactionOpsCoreTyped`]
1246pub trait WithDecoders {
1247    fn decoders(&self) -> &ModuleDecoderRegistry;
1248}
1249
1250/// Raw database transaction (e.g. rocksdb implementation)
1251#[apply(async_trait_maybe_send!)]
1252pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1253    async fn commit_tx(self) -> Result<()>;
1254}
1255
1256/// Fedimint database transaction
1257///
1258/// See [`IDatabase`] for more info.
1259#[apply(async_trait_maybe_send!)]
1260pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1261    /// Commit the transaction
1262    async fn commit_tx(&mut self) -> Result<()>;
1263
1264    /// Is global database
1265    fn is_global(&self) -> bool;
1266
1267    /// Get the global database tx from a module-prefixed database transaction
1268    ///
1269    /// Meant to be called only by core internals, and module developers should
1270    /// not call it directly.
1271    #[doc(hidden)]
1272    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1273    -> &mut dyn IDatabaseTransaction;
1274}
1275
1276#[apply(async_trait_maybe_send!)]
1277impl<T> IDatabaseTransaction for Box<T>
1278where
1279    T: IDatabaseTransaction + ?Sized,
1280{
1281    async fn commit_tx(&mut self) -> Result<()> {
1282        (**self).commit_tx().await
1283    }
1284
1285    fn is_global(&self) -> bool {
1286        (**self).is_global()
1287    }
1288
1289    fn global_dbtx(
1290        &mut self,
1291        access_token: GlobalDBTxAccessToken,
1292    ) -> &mut dyn IDatabaseTransaction {
1293        (**self).global_dbtx(access_token)
1294    }
1295}
1296
1297#[apply(async_trait_maybe_send!)]
1298impl<'a, T> IDatabaseTransaction for &'a mut T
1299where
1300    T: IDatabaseTransaction + ?Sized,
1301{
1302    async fn commit_tx(&mut self) -> Result<()> {
1303        (**self).commit_tx().await
1304    }
1305
1306    fn is_global(&self) -> bool {
1307        (**self).is_global()
1308    }
1309
1310    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1311        (**self).global_dbtx(access_key)
1312    }
1313}
1314
1315/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1316/// easier in other structs since it does not consumed `self` by move.
1317struct BaseDatabaseTransaction<Tx> {
1318    // TODO: merge options
1319    raw: Option<Tx>,
1320    notify_queue: Option<NotifyQueue>,
1321    notifications: Arc<Notifications>,
1322}
1323
1324impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1325where
1326    Tx: fmt::Debug,
1327{
1328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329        f.write_fmt(format_args!(
1330            "BaseDatabaseTransaction{{ raw={:?} }}",
1331            self.raw
1332        ))
1333    }
1334}
1335impl<Tx> BaseDatabaseTransaction<Tx>
1336where
1337    Tx: IRawDatabaseTransaction,
1338{
1339    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1340        Self {
1341            raw: Some(dbtx),
1342            notifications,
1343            notify_queue: Some(NotifyQueue::new()),
1344        }
1345    }
1346
1347    fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1348        self.notify_queue
1349            .as_mut()
1350            .context("can not call add_notification_key after commit")?
1351            .add(&key);
1352        Ok(())
1353    }
1354}
1355
1356#[apply(async_trait_maybe_send!)]
1357impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1358    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1359        self.add_notification_key(key)?;
1360        self.raw
1361            .as_mut()
1362            .context("Cannot insert into already consumed transaction")?
1363            .raw_insert_bytes(key, value)
1364            .await
1365    }
1366
1367    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1368        self.raw
1369            .as_mut()
1370            .context("Cannot retrieve from already consumed transaction")?
1371            .raw_get_bytes(key)
1372            .await
1373    }
1374
1375    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1376        self.add_notification_key(key)?;
1377        self.raw
1378            .as_mut()
1379            .context("Cannot remove from already consumed transaction")?
1380            .raw_remove_entry(key)
1381            .await
1382    }
1383
1384    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1385        self.raw
1386            .as_mut()
1387            .context("Cannot retrieve from already consumed transaction")?
1388            .raw_find_by_range(key_range)
1389            .await
1390    }
1391
1392    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1393        self.raw
1394            .as_mut()
1395            .context("Cannot retrieve from already consumed transaction")?
1396            .raw_find_by_prefix(key_prefix)
1397            .await
1398    }
1399
1400    async fn raw_find_by_prefix_sorted_descending(
1401        &mut self,
1402        key_prefix: &[u8],
1403    ) -> Result<PrefixStream<'_>> {
1404        self.raw
1405            .as_mut()
1406            .context("Cannot retrieve from already consumed transaction")?
1407            .raw_find_by_prefix_sorted_descending(key_prefix)
1408            .await
1409    }
1410
1411    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1412        self.raw
1413            .as_mut()
1414            .context("Cannot remove from already consumed transaction")?
1415            .raw_remove_by_prefix(key_prefix)
1416            .await
1417    }
1418}
1419
1420#[apply(async_trait_maybe_send!)]
1421impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1422    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1423        self.raw
1424            .as_mut()
1425            .context("Cannot rollback to a savepoint on an already consumed transaction")?
1426            .rollback_tx_to_savepoint()
1427            .await?;
1428        Ok(())
1429    }
1430
1431    async fn set_tx_savepoint(&mut self) -> Result<()> {
1432        self.raw
1433            .as_mut()
1434            .context("Cannot set a tx savepoint on an already consumed transaction")?
1435            .set_tx_savepoint()
1436            .await?;
1437        Ok(())
1438    }
1439}
1440
1441#[apply(async_trait_maybe_send!)]
1442impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1443    for BaseDatabaseTransaction<Tx>
1444{
1445    async fn commit_tx(&mut self) -> Result<()> {
1446        self.raw
1447            .take()
1448            .context("Cannot commit an already committed transaction")?
1449            .commit_tx()
1450            .await?;
1451        self.notifications.submit_queue(
1452            &self
1453                .notify_queue
1454                .take()
1455                .expect("commit must be called only once"),
1456        );
1457        Ok(())
1458    }
1459
1460    fn is_global(&self) -> bool {
1461        true
1462    }
1463
1464    fn global_dbtx(
1465        &mut self,
1466        _access_token: GlobalDBTxAccessToken,
1467    ) -> &mut dyn IDatabaseTransaction {
1468        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1469    }
1470}
1471
1472/// A helper for tracking and logging on `Drop` any instances of uncommitted
1473/// writes
1474#[derive(Clone)]
1475struct CommitTracker {
1476    /// Is the dbtx committed
1477    is_committed: bool,
1478    /// Does the dbtx have any writes
1479    has_writes: bool,
1480    /// Don't warn-log uncommitted writes
1481    ignore_uncommitted: bool,
1482}
1483
1484impl Drop for CommitTracker {
1485    fn drop(&mut self) {
1486        if self.has_writes && !self.is_committed {
1487            if self.ignore_uncommitted {
1488                trace!(
1489                    target: LOG_DB,
1490                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1491                );
1492            } else {
1493                warn!(
1494                    target: LOG_DB,
1495                    location = ?backtrace::Backtrace::new(),
1496                    "DatabaseTransaction has writes and has not called commit."
1497                );
1498            }
1499        }
1500    }
1501}
1502
1503enum MaybeRef<'a, T> {
1504    Owned(T),
1505    Borrowed(&'a mut T),
1506}
1507
1508impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1509    type Target = T;
1510
1511    fn deref(&self) -> &Self::Target {
1512        match self {
1513            MaybeRef::Owned(o) => o,
1514            MaybeRef::Borrowed(r) => r,
1515        }
1516    }
1517}
1518
1519impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1520    fn deref_mut(&mut self) -> &mut Self::Target {
1521        match self {
1522            MaybeRef::Owned(o) => o,
1523            MaybeRef::Borrowed(r) => r,
1524        }
1525    }
1526}
1527
1528/// Session type for [`DatabaseTransaction`] that is allowed to commit
1529///
1530/// Opposite of [`NonCommittable`].
1531pub struct Committable;
1532
1533/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1534///
1535/// Opposite of [`Committable`].
1536pub struct NonCommittable;
1537
1538/// A high level database transaction handle
1539///
1540/// `Cap` is a session type
1541pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1542    tx: Box<dyn IDatabaseTransaction + 'tx>,
1543    decoders: ModuleDecoderRegistry,
1544    commit_tracker: MaybeRef<'tx, CommitTracker>,
1545    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1546    capability: marker::PhantomData<Cap>,
1547}
1548
1549impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1550    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1551        f.write_fmt(format_args!(
1552            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1553            self.tx, self.decoders
1554        ))
1555    }
1556}
1557
1558impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1559    fn decoders(&self) -> &ModuleDecoderRegistry {
1560        &self.decoders
1561    }
1562}
1563
1564#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1565fn decode_value<V: DatabaseValue>(
1566    value_bytes: &[u8],
1567    decoders: &ModuleDecoderRegistry,
1568) -> Result<V, DecodingError> {
1569    trace!(
1570        bytes = %AbbreviateHexBytes(value_bytes),
1571        "decoding value",
1572    );
1573    V::from_bytes(value_bytes, decoders)
1574}
1575
1576fn decode_value_expect<V: DatabaseValue>(
1577    value_bytes: &[u8],
1578    decoders: &ModuleDecoderRegistry,
1579    key_bytes: &[u8],
1580) -> V {
1581    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1582        panic!(
1583            "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1584            any::type_name::<V>(),
1585            err,
1586            AbbreviateHexBytes(key_bytes),
1587            AbbreviateHexBytes(value_bytes),
1588        )
1589    })
1590}
1591
1592fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1593    trace!(
1594        bytes = %AbbreviateHexBytes(key_bytes),
1595        "decoding key",
1596    );
1597    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1598        panic!(
1599            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1600            any::type_name::<K>(),
1601            err,
1602            AbbreviateHexBytes(key_bytes)
1603        )
1604    })
1605}
1606
1607impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1608    /// Convert into a non-committable version
1609    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1610        DatabaseTransaction {
1611            tx: self.tx,
1612            decoders: self.decoders,
1613            commit_tracker: self.commit_tracker,
1614            on_commit_hooks: self.on_commit_hooks,
1615            capability: PhantomData::<NonCommittable>,
1616        }
1617    }
1618
1619    /// Get a reference to a non-committeable version
1620    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1621    where
1622        's: 'a,
1623    {
1624        self.to_ref().into_nc()
1625    }
1626
1627    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1628    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1629    where
1630        'tx: 'a,
1631    {
1632        DatabaseTransaction {
1633            tx: Box::new(PrefixDatabaseTransaction {
1634                inner: self.tx,
1635                global_dbtx_access_token: None,
1636                prefix,
1637            }),
1638            decoders: self.decoders,
1639            commit_tracker: self.commit_tracker,
1640            on_commit_hooks: self.on_commit_hooks,
1641            capability: self.capability,
1642        }
1643    }
1644
1645    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1646    /// `module_instance_id`, allowing the module to access global_dbtx
1647    /// with the right access token.
1648    pub fn with_prefix_module_id<'a: 'tx>(
1649        self,
1650        module_instance_id: ModuleInstanceId,
1651    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1652    where
1653        'tx: 'a,
1654    {
1655        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1656        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1657        (
1658            DatabaseTransaction {
1659                tx: Box::new(PrefixDatabaseTransaction {
1660                    inner: self.tx,
1661                    global_dbtx_access_token: Some(global_dbtx_access_token),
1662                    prefix,
1663                }),
1664                decoders: self.decoders,
1665                commit_tracker: self.commit_tracker,
1666                on_commit_hooks: self.on_commit_hooks,
1667                capability: self.capability,
1668            },
1669            global_dbtx_access_token,
1670        )
1671    }
1672
1673    /// Get [`DatabaseTransaction`] to `self`
1674    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1675    where
1676        's: 'a,
1677    {
1678        let decoders = self.decoders.clone();
1679
1680        DatabaseTransaction {
1681            tx: Box::new(&mut self.tx),
1682            decoders,
1683            commit_tracker: match self.commit_tracker {
1684                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1685                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1686            },
1687            on_commit_hooks: match self.on_commit_hooks {
1688                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1689                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1690            },
1691            capability: self.capability,
1692        }
1693    }
1694
1695    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1696    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1697    where
1698        'tx: 'a,
1699    {
1700        DatabaseTransaction {
1701            tx: Box::new(PrefixDatabaseTransaction {
1702                inner: &mut self.tx,
1703                global_dbtx_access_token: None,
1704                prefix,
1705            }),
1706            decoders: self.decoders.clone(),
1707            commit_tracker: match self.commit_tracker {
1708                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1709                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1710            },
1711            on_commit_hooks: match self.on_commit_hooks {
1712                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1713                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1714            },
1715            capability: self.capability,
1716        }
1717    }
1718
1719    pub fn to_ref_with_prefix_module_id<'a>(
1720        &'a mut self,
1721        module_instance_id: ModuleInstanceId,
1722    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1723    where
1724        'tx: 'a,
1725    {
1726        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1727        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1728        (
1729            DatabaseTransaction {
1730                tx: Box::new(PrefixDatabaseTransaction {
1731                    inner: &mut self.tx,
1732                    global_dbtx_access_token: Some(global_dbtx_access_token),
1733                    prefix,
1734                }),
1735                decoders: self.decoders.clone(),
1736                commit_tracker: match self.commit_tracker {
1737                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1738                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1739                },
1740                on_commit_hooks: match self.on_commit_hooks {
1741                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1742                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1743                },
1744                capability: self.capability,
1745            },
1746            global_dbtx_access_token,
1747        )
1748    }
1749
1750    /// Is this `Database` a global, unpartitioned `Database`
1751    pub fn is_global(&self) -> bool {
1752        self.tx.is_global()
1753    }
1754
1755    /// `Err` if [`Self::is_global`] is not true
1756    pub fn ensure_global(&self) -> Result<()> {
1757        if !self.is_global() {
1758            bail!("Database instance not global");
1759        }
1760
1761        Ok(())
1762    }
1763
1764    /// `Err` if [`Self::is_global`] is true
1765    pub fn ensure_isolated(&self) -> Result<()> {
1766        if self.is_global() {
1767            bail!("Database instance not isolated");
1768        }
1769
1770        Ok(())
1771    }
1772
1773    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1774    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1775        self.commit_tracker.ignore_uncommitted = true;
1776        self
1777    }
1778
1779    /// Create warnings about uncommitted writes
1780    pub fn warn_uncommitted(&mut self) -> &mut Self {
1781        self.commit_tracker.ignore_uncommitted = false;
1782        self
1783    }
1784
1785    /// Register a hook that will be run after commit succeeds.
1786    #[instrument(target = LOG_DB, level = "trace", skip_all)]
1787    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1788        self.on_commit_hooks.push(Box::new(f));
1789    }
1790
1791    pub fn global_dbtx<'a>(
1792        &'a mut self,
1793        access_token: GlobalDBTxAccessToken,
1794    ) -> DatabaseTransaction<'a, Cap>
1795    where
1796        'tx: 'a,
1797    {
1798        let decoders = self.decoders.clone();
1799
1800        DatabaseTransaction {
1801            tx: Box::new(self.tx.global_dbtx(access_token)),
1802            decoders,
1803            commit_tracker: match self.commit_tracker {
1804                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1805                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1806            },
1807            on_commit_hooks: match self.on_commit_hooks {
1808                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1809                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1810            },
1811            capability: self.capability,
1812        }
1813    }
1814}
1815
1816/// Code used to access `global_dbtx`
1817#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1818pub struct GlobalDBTxAccessToken(u32);
1819
1820impl GlobalDBTxAccessToken {
1821    /// Calculate an access code for accessing global_dbtx from a prefixed
1822    /// database tx
1823    ///
1824    /// Since we need to do it at runtime, we want the user modules not to be
1825    /// able to call `global_dbtx` too easily. But at the same time we don't
1826    /// need to be paranoid.
1827    ///
1828    /// This must be deterministic during whole instance of the software running
1829    /// (because it's being rederived independently in multiple codepahs) , but
1830    /// it could be somewhat randomized between different runs and releases.
1831    fn from_prefix(prefix: &[u8]) -> Self {
1832        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1833    }
1834}
1835
1836impl<'tx> DatabaseTransaction<'tx, Committable> {
1837    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1838        Self {
1839            tx: dbtx,
1840            decoders,
1841            commit_tracker: MaybeRef::Owned(CommitTracker {
1842                is_committed: false,
1843                has_writes: false,
1844                ignore_uncommitted: false,
1845            }),
1846            on_commit_hooks: MaybeRef::Owned(vec![]),
1847            capability: PhantomData,
1848        }
1849    }
1850
1851    pub async fn commit_tx_result(mut self) -> Result<()> {
1852        self.commit_tracker.is_committed = true;
1853        let commit_result = self.tx.commit_tx().await;
1854
1855        // Run commit hooks in case commit was successful
1856        if commit_result.is_ok() {
1857            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1858                hook();
1859            }
1860        }
1861
1862        commit_result
1863    }
1864
1865    pub async fn commit_tx(mut self) {
1866        self.commit_tracker.is_committed = true;
1867        self.commit_tx_result()
1868            .await
1869            .expect("Unrecoverable error occurred while committing to the database.");
1870    }
1871}
1872
1873#[apply(async_trait_maybe_send!)]
1874impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1875where
1876    Cap: Send,
1877{
1878    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1879        self.commit_tracker.has_writes = true;
1880        self.tx.raw_insert_bytes(key, value).await
1881    }
1882
1883    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1884        self.tx.raw_get_bytes(key).await
1885    }
1886
1887    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1888        self.tx.raw_remove_entry(key).await
1889    }
1890
1891    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1892        self.tx.raw_find_by_range(key_range).await
1893    }
1894
1895    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1896        self.tx.raw_find_by_prefix(key_prefix).await
1897    }
1898
1899    async fn raw_find_by_prefix_sorted_descending(
1900        &mut self,
1901        key_prefix: &[u8],
1902    ) -> Result<PrefixStream<'_>> {
1903        self.tx
1904            .raw_find_by_prefix_sorted_descending(key_prefix)
1905            .await
1906    }
1907
1908    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1909        self.commit_tracker.has_writes = true;
1910        self.tx.raw_remove_by_prefix(key_prefix).await
1911    }
1912}
1913#[apply(async_trait_maybe_send!)]
1914impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1915    async fn set_tx_savepoint(&mut self) -> Result<()> {
1916        self.tx.set_tx_savepoint().await
1917    }
1918
1919    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1920        self.tx.rollback_tx_to_savepoint().await
1921    }
1922}
1923
1924impl<T> DatabaseKeyPrefix for T
1925where
1926    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1927{
1928    fn to_bytes(&self) -> Vec<u8> {
1929        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1930        data.append(&mut self.consensus_encode_to_vec());
1931        data
1932    }
1933}
1934
1935impl<T> DatabaseKey for T
1936where
1937    // Note: key can only be `T` that can be decoded without modules (even if
1938    // module type is `()`)
1939    T: DatabaseRecord + crate::encoding::Decodable + Sized,
1940{
1941    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1942    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1943        if data.is_empty() {
1944            // TODO: build better coding errors, pretty useless right now
1945            return Err(DecodingError::wrong_length(1, 0));
1946        }
1947
1948        if data[0] != Self::DB_PREFIX {
1949            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1950        }
1951
1952        <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1953            .map_err(|decode_error| DecodingError::Other(decode_error.0))
1954    }
1955}
1956
1957impl<T> DatabaseValue for T
1958where
1959    T: Debug + Encodable + Decodable,
1960{
1961    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1962        T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1963    }
1964
1965    fn to_bytes(&self) -> Vec<u8> {
1966        self.consensus_encode_to_vec()
1967    }
1968}
1969
1970/// This is a helper macro that generates the implementations of
1971/// `DatabaseRecord` necessary for reading/writing to the
1972/// database and fetching by prefix.
1973///
1974/// - `key`: This is the type of struct that will be used as the key into the
1975///   database
1976/// - `value`: This is the type of struct that will be used as the value into
1977///   the database
1978/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
1979///   prepended to this key
1980/// - `query_prefix`: Optional type of struct that can be passed zero or more
1981///   times. Every query prefix can be used to query the database via
1982///   `find_by_prefix`
1983///
1984/// # Examples
1985///
1986/// ```
1987/// use fedimint_core::encoding::{Decodable, Encodable};
1988/// use fedimint_core::impl_db_record;
1989///
1990/// #[derive(Debug, Encodable, Decodable)]
1991/// struct MyKey;
1992///
1993/// #[derive(Debug, Encodable, Decodable)]
1994/// struct MyValue;
1995///
1996/// #[repr(u8)]
1997/// #[derive(Clone, Debug)]
1998/// pub enum DbKeyPrefix {
1999///     MyKey = 0x50,
2000/// }
2001///
2002/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2003/// ```
2004///
2005/// Use the required parameters and specify one `query_prefix`
2006///
2007/// ```
2008/// use fedimint_core::encoding::{Decodable, Encodable};
2009/// use fedimint_core::{impl_db_lookup, impl_db_record};
2010///
2011/// #[derive(Debug, Encodable, Decodable)]
2012/// struct MyKey;
2013///
2014/// #[derive(Debug, Encodable, Decodable)]
2015/// struct MyValue;
2016///
2017/// #[repr(u8)]
2018/// #[derive(Clone, Debug)]
2019/// pub enum DbKeyPrefix {
2020///     MyKey = 0x50,
2021/// }
2022///
2023/// #[derive(Debug, Encodable, Decodable)]
2024/// struct MyKeyPrefix;
2025///
2026/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2027///
2028/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2029/// ```
2030#[macro_export]
2031macro_rules! impl_db_record {
2032    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2033        impl $crate::db::DatabaseRecord for $key {
2034            const DB_PREFIX: u8 = $db_prefix as u8;
2035            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2036            type Key = Self;
2037            type Value = $val;
2038        }
2039        $(
2040            impl_db_record! {
2041                @impl_notify_marker key = $key, notify_on_modify = $notify
2042            }
2043        )?
2044    };
2045    // if notify is set to true
2046    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2047        impl $crate::db::DatabaseKeyWithNotify for $key {}
2048    };
2049    // if notify is set to false
2050    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2051}
2052
2053#[macro_export]
2054macro_rules! impl_db_lookup{
2055    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2056        $(
2057            impl $crate::db::DatabaseLookup for $query_prefix {
2058                type Record = $key;
2059            }
2060        )*
2061    };
2062}
2063
2064/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2065#[derive(Debug, Encodable, Decodable, Serialize)]
2066pub struct DatabaseVersionKeyV0;
2067
2068#[derive(Debug, Encodable, Decodable, Serialize)]
2069pub struct DatabaseVersionKey(pub ModuleInstanceId);
2070
2071#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2072pub struct DatabaseVersion(pub u64);
2073
2074impl_db_record!(
2075    key = DatabaseVersionKeyV0,
2076    value = DatabaseVersion,
2077    db_prefix = DbKeyPrefix::DatabaseVersion
2078);
2079
2080impl_db_record!(
2081    key = DatabaseVersionKey,
2082    value = DatabaseVersion,
2083    db_prefix = DbKeyPrefix::DatabaseVersion
2084);
2085
2086impl std::fmt::Display for DatabaseVersion {
2087    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2088        write!(f, "{}", self.0)
2089    }
2090}
2091
2092impl DatabaseVersion {
2093    pub fn increment(&self) -> Self {
2094        Self(self.0 + 1)
2095    }
2096}
2097
2098impl std::fmt::Display for DbKeyPrefix {
2099    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2100        write!(f, "{self:?}")
2101    }
2102}
2103
2104#[repr(u8)]
2105#[derive(Clone, EnumIter, Debug)]
2106pub enum DbKeyPrefix {
2107    DatabaseVersion = 0x50,
2108    ClientBackup = 0x51,
2109}
2110
2111#[derive(Debug, Error)]
2112pub enum DecodingError {
2113    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2114    WrongPrefix { expected: u8, found: u8 },
2115    #[error("Key had a wrong length, expected {expected} but got {found}")]
2116    WrongLength { expected: usize, found: usize },
2117    #[error("Other decoding error: {0:#}")]
2118    Other(anyhow::Error),
2119}
2120
2121impl DecodingError {
2122    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2123        Self::Other(anyhow::Error::from(error))
2124    }
2125
2126    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2127        Self::WrongPrefix { expected, found }
2128    }
2129
2130    pub fn wrong_length(expected: usize, found: usize) -> Self {
2131        Self::WrongLength { expected, found }
2132    }
2133}
2134
2135#[macro_export]
2136macro_rules! push_db_pair_items {
2137    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2138        let db_items =
2139            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2140                .await
2141                .map(|(key, val)| {
2142                    (
2143                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2144                        val,
2145                    )
2146                })
2147                .collect::<BTreeMap<String, $value_type>>()
2148                .await;
2149
2150        $map.insert($key_literal.to_string(), Box::new(db_items));
2151    };
2152}
2153
2154#[macro_export]
2155macro_rules! push_db_key_items {
2156    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2157        let db_items =
2158            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2159                .await
2160                .map(|(key, _)| key)
2161                .collect::<Vec<$key_type>>()
2162                .await;
2163
2164        $map.insert($key_literal.to_string(), Box::new(db_items));
2165    };
2166}
2167
2168/// `CoreMigrationFn` that modules can implement to "migrate" the database
2169/// to the next database version.
2170pub type CoreMigrationFn = for<'tx> fn(
2171    MigrationContext<'tx>,
2172) -> Pin<
2173    Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2174>;
2175
2176/// Verifies that all database migrations are defined contiguously and returns
2177/// the "current" database version, which is one greater than the last key in
2178/// the map.
2179pub fn get_current_database_version<F>(
2180    migrations: &BTreeMap<DatabaseVersion, F>,
2181) -> DatabaseVersion {
2182    let versions = migrations.keys().copied().collect::<Vec<_>>();
2183
2184    // Verify that all database migrations are defined contiguously. If there is a
2185    // gap, this indicates a programming error and we should panic.
2186    if !versions
2187        .windows(2)
2188        .all(|window| window[0].increment() == window[1])
2189    {
2190        panic!("Database Migrations are not defined contiguously");
2191    }
2192
2193    versions
2194        .last()
2195        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2196}
2197
2198/// See [`apply_migrations_server_dbtx`]
2199pub async fn apply_migrations_server(
2200    db: &Database,
2201    kind: String,
2202    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2203) -> Result<(), anyhow::Error> {
2204    let mut global_dbtx = db.begin_transaction().await;
2205    global_dbtx.ensure_global()?;
2206    apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), kind, migrations).await?;
2207    global_dbtx.commit_tx_result().await
2208}
2209
2210/// Applies the database migrations to a non-isolated database.
2211pub async fn apply_migrations_server_dbtx(
2212    global_dbtx: &mut DatabaseTransaction<'_>,
2213    kind: String,
2214    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2215) -> Result<(), anyhow::Error> {
2216    global_dbtx.ensure_global()?;
2217    apply_migrations_dbtx(global_dbtx, kind, migrations, None, None).await
2218}
2219
2220pub async fn apply_migrations(
2221    db: &Database,
2222    kind: String,
2223    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2224    module_instance_id: Option<ModuleInstanceId>,
2225    // When used in client side context, we can/should ignore keys that external app
2226    // is allowed to use, and but since this function is shared, we make it optional argument
2227    external_prefixes_above: Option<u8>,
2228) -> Result<(), anyhow::Error> {
2229    let mut dbtx = db.begin_transaction().await;
2230    apply_migrations_dbtx(
2231        &mut dbtx.to_ref_nc(),
2232        kind,
2233        migrations,
2234        module_instance_id,
2235        external_prefixes_above,
2236    )
2237    .await?;
2238
2239    dbtx.commit_tx_result().await
2240}
2241/// `apply_migrations` iterates from the on disk database version for the
2242/// module.
2243///
2244/// `apply_migrations` iterates from the on disk database version for the module
2245/// up to `target_db_version` and executes all of the migrations that exist in
2246/// the migrations map. Each migration in migrations map updates the
2247/// database to have the correct on-disk structures that the code is expecting.
2248/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2249/// happen atomically). This function is called before the module is initialized
2250/// and as long as the correct migrations are supplied in the migrations map,
2251/// the module will be able to read and write from the database successfully.
2252pub async fn apply_migrations_dbtx(
2253    global_dbtx: &mut DatabaseTransaction<'_>,
2254    kind: String,
2255    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2256    module_instance_id: Option<ModuleInstanceId>,
2257    // When used in client side context, we can/should ignore keys that external app
2258    // is allowed to use, and but since this function is shared, we make it optional argument
2259    external_prefixes_above: Option<u8>,
2260) -> Result<(), anyhow::Error> {
2261    // Newly created databases will not have any data since they have just been
2262    // instantiated.
2263    let is_new_db = global_dbtx
2264        .raw_find_by_prefix(&[])
2265        .await?
2266        .filter(|(key, _v)| {
2267            std::future::ready(
2268                external_prefixes_above.is_none_or(|external_prefixes_above| {
2269                    !key.is_empty() && key[0] < external_prefixes_above
2270                }),
2271            )
2272        })
2273        .next()
2274        .await
2275        .is_none();
2276
2277    let target_db_version = get_current_database_version(&migrations);
2278
2279    // First write the database version to disk if it does not exist.
2280    create_database_version_dbtx(
2281        global_dbtx,
2282        target_db_version,
2283        module_instance_id,
2284        kind.clone(),
2285        is_new_db,
2286    )
2287    .await?;
2288
2289    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2290
2291    let disk_version = global_dbtx
2292        .get_value(&DatabaseVersionKey(module_instance_id_key))
2293        .await;
2294
2295    let db_version = if let Some(disk_version) = disk_version {
2296        let mut current_db_version = disk_version;
2297
2298        if current_db_version > target_db_version {
2299            return Err(anyhow::anyhow!(format!(
2300                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2301            )));
2302        }
2303
2304        while current_db_version < target_db_version {
2305            if let Some(migration) = migrations.get(&current_db_version) {
2306                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2307                migration(MigrationContext {
2308                    dbtx: global_dbtx.to_ref_nc(),
2309                    module_instance_id,
2310                })
2311                .await?;
2312            } else {
2313                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2314            }
2315
2316            current_db_version = current_db_version.increment();
2317            global_dbtx
2318                .insert_entry(
2319                    &DatabaseVersionKey(module_instance_id_key),
2320                    &current_db_version,
2321                )
2322                .await;
2323        }
2324
2325        current_db_version
2326    } else {
2327        target_db_version
2328    };
2329
2330    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2331    Ok(())
2332}
2333
2334pub async fn create_database_version(
2335    db: &Database,
2336    target_db_version: DatabaseVersion,
2337    module_instance_id: Option<ModuleInstanceId>,
2338    kind: String,
2339    is_new_db: bool,
2340) -> Result<(), anyhow::Error> {
2341    let mut dbtx = db.begin_transaction().await;
2342
2343    create_database_version_dbtx(
2344        &mut dbtx.to_ref_nc(),
2345        target_db_version,
2346        module_instance_id,
2347        kind,
2348        is_new_db,
2349    )
2350    .await?;
2351
2352    dbtx.commit_tx_result().await?;
2353    Ok(())
2354}
2355
2356/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2357/// necessary, this function will migrate the legacy database version to the
2358/// expected `DatabaseVersionKey`.
2359pub async fn create_database_version_dbtx(
2360    global_dbtx: &mut DatabaseTransaction<'_>,
2361    target_db_version: DatabaseVersion,
2362    module_instance_id: Option<ModuleInstanceId>,
2363    kind: String,
2364    is_new_db: bool,
2365) -> Result<(), anyhow::Error> {
2366    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2367
2368    // First check if the module has a `DatabaseVersion` written to
2369    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2370    // nothing to do.
2371    if global_dbtx
2372        .get_value(&DatabaseVersionKey(key_module_instance_id))
2373        .await
2374        .is_none()
2375    {
2376        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2377        // in the module's isolated namespace (but not for fedimint-server or
2378        // fedimint-client).
2379        //
2380        // Otherwise, if the previous database contains data and no legacy database
2381        // version, use `DatabaseVersion(0)` so that all database migrations are
2382        // run. Otherwise, this database can assumed to be new and can use
2383        // `target_db_version` to skip the database migrations.
2384        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2385            remove_current_db_version_if_exists(
2386                &mut global_dbtx
2387                    .to_ref_with_prefix_module_id(module_instance_id)
2388                    .0
2389                    .into_nc(),
2390                is_new_db,
2391                target_db_version,
2392            )
2393            .await
2394        } else {
2395            remove_current_db_version_if_exists(
2396                &mut global_dbtx.to_ref().into_nc(),
2397                is_new_db,
2398                target_db_version,
2399            )
2400            .await
2401        };
2402
2403        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2404        debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2405        global_dbtx
2406            .insert_new_entry(
2407                &DatabaseVersionKey(key_module_instance_id),
2408                &current_version_in_module,
2409            )
2410            .await;
2411    }
2412
2413    Ok(())
2414}
2415
2416/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2417/// returns the current database version. If the current version does not
2418/// exist, use `target_db_version` if the database is new. Otherwise, return
2419/// `DatabaseVersion(0)` to ensure all migrations are run.
2420async fn remove_current_db_version_if_exists(
2421    version_dbtx: &mut DatabaseTransaction<'_>,
2422    is_new_db: bool,
2423    target_db_version: DatabaseVersion,
2424) -> DatabaseVersion {
2425    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2426    // exist, just use the 0 for the version so that all of the migrations are
2427    // executed.
2428    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2429    match current_version_in_module {
2430        Some(database_version) => database_version,
2431        None if is_new_db => target_db_version,
2432        None => DatabaseVersion(0),
2433    }
2434}
2435
2436/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2437/// return 0xff for the global namespace.
2438fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2439    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2440    module_instance_id.map_or_else(
2441        || MODULE_GLOBAL_PREFIX.into(),
2442        |module_instance_id| module_instance_id,
2443    )
2444}
2445
2446pub struct MigrationContext<'tx> {
2447    dbtx: DatabaseTransaction<'tx>,
2448    module_instance_id: Option<ModuleInstanceId>,
2449}
2450
2451impl<'tx> MigrationContext<'tx> {
2452    pub fn dbtx(&mut self) -> DatabaseTransaction {
2453        if let Some(module_instance_id) = self.module_instance_id {
2454            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2455        } else {
2456            self.dbtx.to_ref_nc()
2457        }
2458    }
2459
2460    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2461        self.module_instance_id
2462    }
2463
2464    #[doc(hidden)]
2465    pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2466        &mut self.dbtx
2467    }
2468}
2469
2470#[allow(unused_imports)]
2471mod test_utils {
2472    use std::collections::BTreeMap;
2473    use std::time::Duration;
2474
2475    use fedimint_core::db::MigrationContext;
2476    use futures::future::ready;
2477    use futures::{Future, FutureExt, StreamExt};
2478    use rand::Rng;
2479    use tokio::join;
2480
2481    use super::{
2482        CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
2483        DatabaseVersionKeyV0, apply_migrations,
2484    };
2485    use crate::core::ModuleKind;
2486    use crate::db::mem_impl::MemDatabase;
2487    use crate::db::{
2488        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2489    };
2490    use crate::encoding::{Decodable, Encodable};
2491    use crate::module::registry::ModuleDecoderRegistry;
2492
2493    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2494        crate::runtime::timeout(Duration::from_millis(10), fut)
2495            .await
2496            .ok()
2497    }
2498
2499    #[repr(u8)]
2500    #[derive(Clone)]
2501    pub enum TestDbKeyPrefix {
2502        Test = 0x42,
2503        AltTest = 0x43,
2504        PercentTestKey = 0x25,
2505    }
2506
2507    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2508    pub(super) struct TestKey(pub u64);
2509
2510    #[derive(Debug, Encodable, Decodable)]
2511    struct DbPrefixTestPrefix;
2512
2513    impl_db_record!(
2514        key = TestKey,
2515        value = TestVal,
2516        db_prefix = TestDbKeyPrefix::Test,
2517        notify_on_modify = true,
2518    );
2519    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2520
2521    #[derive(Debug, Encodable, Decodable)]
2522    struct TestKeyV0(u64, u64);
2523
2524    #[derive(Debug, Encodable, Decodable)]
2525    struct DbPrefixTestPrefixV0;
2526
2527    impl_db_record!(
2528        key = TestKeyV0,
2529        value = TestVal,
2530        db_prefix = TestDbKeyPrefix::Test,
2531    );
2532    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2533
2534    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2535    struct AltTestKey(u64);
2536
2537    #[derive(Debug, Encodable, Decodable)]
2538    struct AltDbPrefixTestPrefix;
2539
2540    impl_db_record!(
2541        key = AltTestKey,
2542        value = TestVal,
2543        db_prefix = TestDbKeyPrefix::AltTest,
2544    );
2545    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2546
2547    #[derive(Debug, Encodable, Decodable)]
2548    struct PercentTestKey(u64);
2549
2550    #[derive(Debug, Encodable, Decodable)]
2551    struct PercentPrefixTestPrefix;
2552
2553    impl_db_record!(
2554        key = PercentTestKey,
2555        value = TestVal,
2556        db_prefix = TestDbKeyPrefix::PercentTestKey,
2557    );
2558
2559    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2560    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2561    pub(super) struct TestVal(pub u64);
2562
2563    const TEST_MODULE_PREFIX: u16 = 1;
2564    const ALT_MODULE_PREFIX: u16 = 2;
2565
2566    pub async fn verify_insert_elements(db: Database) {
2567        let mut dbtx = db.begin_transaction().await;
2568        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2569        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2570        dbtx.commit_tx().await;
2571
2572        // Test values were persisted
2573        let mut dbtx = db.begin_transaction().await;
2574        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2575        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2576        dbtx.commit_tx().await;
2577
2578        // Test overwrites work as expected
2579        let mut dbtx = db.begin_transaction().await;
2580        assert_eq!(
2581            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2582            Some(TestVal(2))
2583        );
2584        assert_eq!(
2585            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2586            Some(TestVal(3))
2587        );
2588        dbtx.commit_tx().await;
2589
2590        let mut dbtx = db.begin_transaction().await;
2591        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2592        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2593        dbtx.commit_tx().await;
2594    }
2595
2596    pub async fn verify_remove_nonexisting(db: Database) {
2597        let mut dbtx = db.begin_transaction().await;
2598        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2599        let removed = dbtx.remove_entry(&TestKey(1)).await;
2600        assert!(removed.is_none());
2601
2602        // Commit to suppress the warning message
2603        dbtx.commit_tx().await;
2604    }
2605
2606    pub async fn verify_remove_existing(db: Database) {
2607        let mut dbtx = db.begin_transaction().await;
2608
2609        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2610
2611        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2612
2613        let removed = dbtx.remove_entry(&TestKey(1)).await;
2614        assert_eq!(removed, Some(TestVal(2)));
2615        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2616
2617        // Commit to suppress the warning message
2618        dbtx.commit_tx().await;
2619    }
2620
2621    pub async fn verify_read_own_writes(db: Database) {
2622        let mut dbtx = db.begin_transaction().await;
2623
2624        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2625
2626        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2627
2628        // Commit to suppress the warning message
2629        dbtx.commit_tx().await;
2630    }
2631
2632    pub async fn verify_prevent_dirty_reads(db: Database) {
2633        let mut dbtx = db.begin_transaction().await;
2634
2635        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2636
2637        // dbtx2 should not be able to see uncommitted changes
2638        let mut dbtx2 = db.begin_transaction().await;
2639        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2640
2641        // Commit to suppress the warning message
2642        dbtx.commit_tx().await;
2643    }
2644
2645    pub async fn verify_find_by_range(db: Database) {
2646        let mut dbtx = db.begin_transaction().await;
2647        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2648        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2649        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2650
2651        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2652        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2653
2654        {
2655            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2656            module_dbtx
2657                .insert_entry(&TestKey(300), &TestVal(3000))
2658                .await;
2659        }
2660
2661        dbtx.commit_tx().await;
2662
2663        // Verify finding by prefix returns the correct set of key pairs
2664        let mut dbtx = db.begin_transaction_nc().await;
2665
2666        let returned_keys = dbtx
2667            .find_by_range(TestKey(55)..TestKey(56))
2668            .await
2669            .collect::<Vec<_>>()
2670            .await;
2671
2672        let expected = vec![(TestKey(55), TestVal(9999))];
2673
2674        assert_eq!(returned_keys, expected);
2675
2676        let returned_keys = dbtx
2677            .find_by_range(TestKey(54)..TestKey(56))
2678            .await
2679            .collect::<Vec<_>>()
2680            .await;
2681
2682        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2683        assert_eq!(returned_keys, expected);
2684
2685        let returned_keys = dbtx
2686            .find_by_range(TestKey(54)..TestKey(57))
2687            .await
2688            .collect::<Vec<_>>()
2689            .await;
2690
2691        let expected = vec![
2692            (TestKey(54), TestVal(8888)),
2693            (TestKey(55), TestVal(9999)),
2694            (TestKey(56), TestVal(7777)),
2695        ];
2696        assert_eq!(returned_keys, expected);
2697
2698        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2699        let test_range = module_dbtx
2700            .find_by_range(TestKey(300)..TestKey(301))
2701            .await
2702            .collect::<Vec<_>>()
2703            .await;
2704        assert!(test_range.len() == 1);
2705    }
2706
2707    pub async fn verify_find_by_prefix(db: Database) {
2708        let mut dbtx = db.begin_transaction().await;
2709        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2710        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2711
2712        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2713        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2714        dbtx.commit_tx().await;
2715
2716        // Verify finding by prefix returns the correct set of key pairs
2717        let mut dbtx = db.begin_transaction().await;
2718
2719        let returned_keys = dbtx
2720            .find_by_prefix(&DbPrefixTestPrefix)
2721            .await
2722            .collect::<Vec<_>>()
2723            .await;
2724
2725        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2726        assert_eq!(returned_keys, expected);
2727
2728        let reversed = dbtx
2729            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2730            .await
2731            .collect::<Vec<_>>()
2732            .await;
2733        let mut reversed_expected = expected;
2734        reversed_expected.reverse();
2735        assert_eq!(reversed, reversed_expected);
2736
2737        let returned_keys = dbtx
2738            .find_by_prefix(&AltDbPrefixTestPrefix)
2739            .await
2740            .collect::<Vec<_>>()
2741            .await;
2742
2743        let expected = vec![
2744            (AltTestKey(54), TestVal(6666)),
2745            (AltTestKey(55), TestVal(7777)),
2746        ];
2747        assert_eq!(returned_keys, expected);
2748
2749        let reversed = dbtx
2750            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2751            .await
2752            .collect::<Vec<_>>()
2753            .await;
2754        let mut reversed_expected = expected;
2755        reversed_expected.reverse();
2756        assert_eq!(reversed, reversed_expected);
2757    }
2758
2759    pub async fn verify_commit(db: Database) {
2760        let mut dbtx = db.begin_transaction().await;
2761
2762        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2763        dbtx.commit_tx().await;
2764
2765        // Verify dbtx2 can see committed transactions
2766        let mut dbtx2 = db.begin_transaction().await;
2767        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2768    }
2769
2770    pub async fn verify_rollback_to_savepoint(db: Database) {
2771        let mut dbtx_rollback = db.begin_transaction().await;
2772
2773        dbtx_rollback
2774            .insert_entry(&TestKey(20), &TestVal(2000))
2775            .await;
2776
2777        dbtx_rollback
2778            .set_tx_savepoint()
2779            .await
2780            .expect("Error setting transaction savepoint");
2781
2782        dbtx_rollback
2783            .insert_entry(&TestKey(21), &TestVal(2001))
2784            .await;
2785
2786        assert_eq!(
2787            dbtx_rollback.get_value(&TestKey(20)).await,
2788            Some(TestVal(2000))
2789        );
2790        assert_eq!(
2791            dbtx_rollback.get_value(&TestKey(21)).await,
2792            Some(TestVal(2001))
2793        );
2794
2795        dbtx_rollback
2796            .rollback_tx_to_savepoint()
2797            .await
2798            .expect("Error setting transaction savepoint");
2799
2800        assert_eq!(
2801            dbtx_rollback.get_value(&TestKey(20)).await,
2802            Some(TestVal(2000))
2803        );
2804
2805        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2806
2807        // Commit to suppress the warning message
2808        dbtx_rollback.commit_tx().await;
2809    }
2810
2811    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2812        let mut dbtx = db.begin_transaction().await;
2813        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2814
2815        let mut dbtx2 = db.begin_transaction().await;
2816
2817        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2818
2819        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2820
2821        dbtx2.commit_tx().await;
2822
2823        // dbtx should still read None because it is operating over a snapshot
2824        // of the data when the transaction started
2825        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2826
2827        let expected_keys = 0;
2828        let returned_keys = dbtx
2829            .find_by_prefix(&DbPrefixTestPrefix)
2830            .await
2831            .fold(0, |returned_keys, (key, value)| async move {
2832                if key == TestKey(100) {
2833                    assert!(value.eq(&TestVal(101)));
2834                }
2835                returned_keys + 1
2836            })
2837            .await;
2838
2839        assert_eq!(returned_keys, expected_keys);
2840    }
2841
2842    pub async fn verify_snapshot_isolation(db: Database) {
2843        async fn random_yield() {
2844            let times = if rand::thread_rng().gen_bool(0.5) {
2845                0
2846            } else {
2847                10
2848            };
2849            for _ in 0..times {
2850                tokio::task::yield_now().await;
2851            }
2852        }
2853
2854        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2855        for i in 0..1000 {
2856            let base_key = i * 2;
2857            let tx_accepted_key = base_key;
2858            let spent_input_key = base_key + 1;
2859
2860            join!(
2861                async {
2862                    random_yield().await;
2863                    let mut dbtx = db.begin_transaction().await;
2864
2865                    random_yield().await;
2866                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2867                    random_yield().await;
2868                    // we have 4 operations that can give you the db key,
2869                    // try all of them
2870                    let s = match i % 5 {
2871                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2872                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2873                        2 => {
2874                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2875                                .await
2876                        }
2877                        3 => {
2878                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
2879                                .await
2880                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2881                                .map(|(_k, v)| v)
2882                                .next()
2883                                .await
2884                        }
2885                        4 => {
2886                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2887                                .await
2888                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2889                                .map(|(_k, v)| v)
2890                                .next()
2891                                .await
2892                        }
2893                        _ => {
2894                            panic!("woot?");
2895                        }
2896                    };
2897
2898                    match (a, s) {
2899                        (None, None) | (Some(_), Some(_)) => {}
2900                        (None, Some(_)) => panic!("none some?! {i}"),
2901                        (Some(_), None) => panic!("some none?! {i}"),
2902                    }
2903                },
2904                async {
2905                    random_yield().await;
2906
2907                    let mut dbtx = db.begin_transaction().await;
2908                    random_yield().await;
2909                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2910
2911                    random_yield().await;
2912                    assert_eq!(
2913                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2914                            .await,
2915                        None
2916                    );
2917
2918                    random_yield().await;
2919                    assert_eq!(
2920                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2921                            .await,
2922                        None
2923                    );
2924                    random_yield().await;
2925                    dbtx.commit_tx().await;
2926                }
2927            );
2928        }
2929    }
2930
2931    pub async fn verify_phantom_entry(db: Database) {
2932        let mut dbtx = db.begin_transaction().await;
2933
2934        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2935
2936        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2937
2938        dbtx.commit_tx().await;
2939
2940        let mut dbtx = db.begin_transaction().await;
2941        let expected_keys = 2;
2942        let returned_keys = dbtx
2943            .find_by_prefix(&DbPrefixTestPrefix)
2944            .await
2945            .fold(0, |returned_keys, (key, value)| async move {
2946                match key {
2947                    TestKey(100) => {
2948                        assert!(value.eq(&TestVal(101)));
2949                    }
2950                    TestKey(101) => {
2951                        assert!(value.eq(&TestVal(102)));
2952                    }
2953                    _ => {}
2954                };
2955                returned_keys + 1
2956            })
2957            .await;
2958
2959        assert_eq!(returned_keys, expected_keys);
2960
2961        let mut dbtx2 = db.begin_transaction().await;
2962
2963        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2964
2965        dbtx2.commit_tx().await;
2966
2967        let returned_keys = dbtx
2968            .find_by_prefix(&DbPrefixTestPrefix)
2969            .await
2970            .fold(0, |returned_keys, (key, value)| async move {
2971                match key {
2972                    TestKey(100) => {
2973                        assert!(value.eq(&TestVal(101)));
2974                    }
2975                    TestKey(101) => {
2976                        assert!(value.eq(&TestVal(102)));
2977                    }
2978                    _ => {}
2979                };
2980                returned_keys + 1
2981            })
2982            .await;
2983
2984        assert_eq!(returned_keys, expected_keys);
2985    }
2986
2987    pub async fn expect_write_conflict(db: Database) {
2988        let mut dbtx = db.begin_transaction().await;
2989        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2990        dbtx.commit_tx().await;
2991
2992        let mut dbtx2 = db.begin_transaction().await;
2993        let mut dbtx3 = db.begin_transaction().await;
2994
2995        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2996
2997        // Depending on if the database implementation supports optimistic or
2998        // pessimistic transactions, this test should generate an error here
2999        // (pessimistic) or at commit time (optimistic)
3000        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3001
3002        dbtx2.commit_tx().await;
3003        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3004    }
3005
3006    pub async fn verify_string_prefix(db: Database) {
3007        let mut dbtx = db.begin_transaction().await;
3008        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3009
3010        assert_eq!(
3011            dbtx.get_value(&PercentTestKey(100)).await,
3012            Some(TestVal(101))
3013        );
3014
3015        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3016
3017        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3018
3019        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3020
3021        // If the wildcard character ('%') is not handled properly, this will make
3022        // find_by_prefix return 5 results instead of 4
3023        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3024
3025        let expected_keys = 4;
3026        let returned_keys = dbtx
3027            .find_by_prefix(&PercentPrefixTestPrefix)
3028            .await
3029            .fold(0, |returned_keys, (key, value)| async move {
3030                if matches!(key, PercentTestKey(101)) {
3031                    assert!(value.eq(&TestVal(100)));
3032                }
3033                returned_keys + 1
3034            })
3035            .await;
3036
3037        assert_eq!(returned_keys, expected_keys);
3038    }
3039
3040    pub async fn verify_remove_by_prefix(db: Database) {
3041        let mut dbtx = db.begin_transaction().await;
3042
3043        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3044
3045        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3046
3047        dbtx.commit_tx().await;
3048
3049        let mut remove_dbtx = db.begin_transaction().await;
3050        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3051        remove_dbtx.commit_tx().await;
3052
3053        let mut dbtx = db.begin_transaction().await;
3054        let expected_keys = 0;
3055        let returned_keys = dbtx
3056            .find_by_prefix(&DbPrefixTestPrefix)
3057            .await
3058            .fold(0, |returned_keys, (key, value)| async move {
3059                match key {
3060                    TestKey(100) => {
3061                        assert!(value.eq(&TestVal(101)));
3062                    }
3063                    TestKey(101) => {
3064                        assert!(value.eq(&TestVal(102)));
3065                    }
3066                    _ => {}
3067                };
3068                returned_keys + 1
3069            })
3070            .await;
3071
3072        assert_eq!(returned_keys, expected_keys);
3073    }
3074
3075    pub async fn verify_module_db(db: Database, module_db: Database) {
3076        let mut dbtx = db.begin_transaction().await;
3077
3078        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3079
3080        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3081
3082        dbtx.commit_tx().await;
3083
3084        // verify module_dbtx can only read key/value pairs from its own module
3085        let mut module_dbtx = module_db.begin_transaction().await;
3086        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3087
3088        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3089
3090        // verify module_dbtx can read key/value pairs that it wrote
3091        let mut dbtx = db.begin_transaction().await;
3092        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3093
3094        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3095
3096        let mut module_dbtx = module_db.begin_transaction().await;
3097
3098        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3099
3100        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3101
3102        module_dbtx.commit_tx().await;
3103
3104        let expected_keys = 2;
3105        let mut dbtx = db.begin_transaction().await;
3106        let returned_keys = dbtx
3107            .find_by_prefix(&DbPrefixTestPrefix)
3108            .await
3109            .fold(0, |returned_keys, (key, value)| async move {
3110                match key {
3111                    TestKey(100) => {
3112                        assert!(value.eq(&TestVal(101)));
3113                    }
3114                    TestKey(101) => {
3115                        assert!(value.eq(&TestVal(102)));
3116                    }
3117                    _ => {}
3118                };
3119                returned_keys + 1
3120            })
3121            .await;
3122
3123        assert_eq!(returned_keys, expected_keys);
3124
3125        let removed = dbtx.remove_entry(&TestKey(100)).await;
3126        assert_eq!(removed, Some(TestVal(101)));
3127        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3128
3129        let mut module_dbtx = module_db.begin_transaction().await;
3130        assert_eq!(
3131            module_dbtx.get_value(&TestKey(100)).await,
3132            Some(TestVal(103))
3133        );
3134    }
3135
3136    pub async fn verify_module_prefix(db: Database) {
3137        let mut test_dbtx = db.begin_transaction().await;
3138        {
3139            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3140
3141            test_module_dbtx
3142                .insert_entry(&TestKey(100), &TestVal(101))
3143                .await;
3144
3145            test_module_dbtx
3146                .insert_entry(&TestKey(101), &TestVal(102))
3147                .await;
3148        }
3149
3150        test_dbtx.commit_tx().await;
3151
3152        let mut alt_dbtx = db.begin_transaction().await;
3153        {
3154            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3155
3156            alt_module_dbtx
3157                .insert_entry(&TestKey(100), &TestVal(103))
3158                .await;
3159
3160            alt_module_dbtx
3161                .insert_entry(&TestKey(101), &TestVal(104))
3162                .await;
3163        }
3164
3165        alt_dbtx.commit_tx().await;
3166
3167        // verify test_module_dbtx can only see key/value pairs from its own module
3168        let mut test_dbtx = db.begin_transaction().await;
3169        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3170        assert_eq!(
3171            test_module_dbtx.get_value(&TestKey(100)).await,
3172            Some(TestVal(101))
3173        );
3174
3175        assert_eq!(
3176            test_module_dbtx.get_value(&TestKey(101)).await,
3177            Some(TestVal(102))
3178        );
3179
3180        let expected_keys = 2;
3181        let returned_keys = test_module_dbtx
3182            .find_by_prefix(&DbPrefixTestPrefix)
3183            .await
3184            .fold(0, |returned_keys, (key, value)| async move {
3185                match key {
3186                    TestKey(100) => {
3187                        assert!(value.eq(&TestVal(101)));
3188                    }
3189                    TestKey(101) => {
3190                        assert!(value.eq(&TestVal(102)));
3191                    }
3192                    _ => {}
3193                };
3194                returned_keys + 1
3195            })
3196            .await;
3197
3198        assert_eq!(returned_keys, expected_keys);
3199
3200        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3201        assert_eq!(removed, Some(TestVal(101)));
3202        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3203
3204        // test_dbtx on its own wont find the key because it does not use a module
3205        // prefix
3206        let mut test_dbtx = db.begin_transaction().await;
3207        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3208
3209        test_dbtx.commit_tx().await;
3210    }
3211
3212    #[cfg(test)]
3213    #[tokio::test]
3214    pub async fn verify_test_migration() {
3215        // Insert a bunch of old dummy data that needs to be migrated to a new version
3216        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3217        let expected_test_keys_size: usize = 100;
3218        let mut dbtx = db.begin_transaction().await;
3219        for i in 0..expected_test_keys_size {
3220            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3221                .await;
3222        }
3223
3224        // Will also be migrated to `DatabaseVersionKey`
3225        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3226            .await;
3227        dbtx.commit_tx().await;
3228
3229        let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3230
3231        migrations.insert(DatabaseVersion(0), |ctx| {
3232            migrate_test_db_version_0(ctx).boxed()
3233        });
3234
3235        apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3236            .await
3237            .expect("Error applying migrations for TestModule");
3238
3239        // Verify that the migrations completed successfully
3240        let mut dbtx = db.begin_transaction().await;
3241
3242        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3243        // to `DatabaseVersionKey`
3244        assert!(
3245            dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3246                .await
3247                .is_some()
3248        );
3249
3250        // Verify Dummy module migration
3251        let test_keys = dbtx
3252            .find_by_prefix(&DbPrefixTestPrefix)
3253            .await
3254            .collect::<Vec<_>>()
3255            .await;
3256        let test_keys_size = test_keys.len();
3257        assert_eq!(test_keys_size, expected_test_keys_size);
3258        for (key, val) in test_keys {
3259            assert_eq!(key.0, val.0 + 1);
3260        }
3261    }
3262
3263    #[allow(dead_code)]
3264    async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3265        let mut dbtx = ctx.dbtx();
3266        let example_keys_v0 = dbtx
3267            .find_by_prefix(&DbPrefixTestPrefixV0)
3268            .await
3269            .collect::<Vec<_>>()
3270            .await;
3271        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3272        for (key, val) in example_keys_v0 {
3273            let key_v2 = TestKey(key.1);
3274            dbtx.insert_new_entry(&key_v2, &val).await;
3275        }
3276        Ok(())
3277    }
3278
3279    #[cfg(test)]
3280    #[tokio::test]
3281    async fn test_autocommit() {
3282        use std::marker::PhantomData;
3283        use std::ops::Range;
3284        use std::path::Path;
3285
3286        use anyhow::anyhow;
3287        use async_trait::async_trait;
3288
3289        use crate::ModuleDecoderRegistry;
3290        use crate::db::{
3291            AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3292            IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3293            IRawDatabaseTransaction,
3294        };
3295
3296        #[derive(Debug)]
3297        struct FakeDatabase;
3298
3299        #[async_trait]
3300        impl IRawDatabase for FakeDatabase {
3301            type Transaction<'a> = FakeTransaction<'a>;
3302            async fn begin_transaction(&self) -> FakeTransaction {
3303                FakeTransaction(PhantomData)
3304            }
3305
3306            fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3307                Ok(())
3308            }
3309        }
3310
3311        #[derive(Debug)]
3312        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3313
3314        #[async_trait]
3315        impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3316            async fn raw_insert_bytes(
3317                &mut self,
3318                _key: &[u8],
3319                _value: &[u8],
3320            ) -> anyhow::Result<Option<Vec<u8>>> {
3321                unimplemented!()
3322            }
3323
3324            async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3325                unimplemented!()
3326            }
3327
3328            async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3329                unimplemented!()
3330            }
3331
3332            async fn raw_find_by_range(
3333                &mut self,
3334                _key_range: Range<&[u8]>,
3335            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3336                unimplemented!()
3337            }
3338
3339            async fn raw_find_by_prefix(
3340                &mut self,
3341                _key_prefix: &[u8],
3342            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3343                unimplemented!()
3344            }
3345
3346            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3347                unimplemented!()
3348            }
3349
3350            async fn raw_find_by_prefix_sorted_descending(
3351                &mut self,
3352                _key_prefix: &[u8],
3353            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3354                unimplemented!()
3355            }
3356        }
3357
3358        #[async_trait]
3359        impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3360            async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3361                unimplemented!()
3362            }
3363
3364            async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3365                unimplemented!()
3366            }
3367        }
3368
3369        #[async_trait]
3370        impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3371            async fn commit_tx(self) -> anyhow::Result<()> {
3372                Err(anyhow!("Can't commit!"))
3373            }
3374        }
3375
3376        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3377        let err = db
3378            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3379            .await
3380            .unwrap_err();
3381
3382        match err {
3383            AutocommitError::CommitFailed {
3384                attempts: failed_attempts,
3385                ..
3386            } => {
3387                assert_eq!(failed_attempts, 5);
3388            }
3389            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3390        }
3391    }
3392}
3393
3394pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3395    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3396    decoders: ModuleDecoderRegistry,
3397    key_prefix: &KP,
3398) -> impl Stream<
3399    Item = (
3400        KP::Record,
3401        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3402    ),
3403>
3404+ 'r
3405+ use<'r, KP>
3406where
3407    'inner: 'r,
3408    KP: DatabaseLookup,
3409    KP::Record: DatabaseKey,
3410{
3411    debug!(target: LOG_DB, "find by prefix sorted descending");
3412    let prefix_bytes = key_prefix.to_bytes();
3413    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3414        .await
3415        .expect("Error doing prefix search in database")
3416        .map(move |(key_bytes, value_bytes)| {
3417            let key = decode_key_expect(&key_bytes, &decoders);
3418            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3419            (key, value)
3420        })
3421}
3422
3423pub async fn verify_module_db_integrity_dbtx(
3424    dbtx: &mut DatabaseTransaction<'_>,
3425    module_id: ModuleInstanceId,
3426    module_kind: ModuleKind,
3427    prefixes: &BTreeSet<u8>,
3428) {
3429    let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3430    if module_id < 250 {
3431        assert_eq!(module_db_prefix.len(), 2);
3432    }
3433    let mut records = dbtx
3434        .raw_find_by_prefix(&module_db_prefix)
3435        .await
3436        .expect("DB fail");
3437    while let Some((k, v)) = records.next().await {
3438        assert!(
3439            prefixes.contains(&k[module_db_prefix.len()]),
3440            "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3441            k.as_hex(),
3442            v.as_hex()
3443        );
3444    }
3445}
3446
3447#[cfg(test)]
3448mod tests;