fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146    fn to_bytes(&self) -> Vec<u8>;
147}
148
149/// A key + value pair in the database with a unique prefix
150/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
151pub trait DatabaseRecord: DatabaseKeyPrefix {
152    const DB_PREFIX: u8;
153    const NOTIFY_ON_MODIFY: bool = false;
154    type Key: DatabaseKey + Debug;
155    type Value: DatabaseValue + Debug;
156}
157
158/// A key that can be used to query one or more `DatabaseRecord`
159/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
160pub trait DatabaseLookup: DatabaseKeyPrefix {
161    type Record: DatabaseRecord;
162}
163
164// Every `DatabaseRecord` is automatically a `DatabaseLookup`
165impl<Record> DatabaseLookup for Record
166where
167    Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169    type Record = Record;
170}
171
172/// `DatabaseKey` that represents the lookup structure for retrieving key/value
173/// pairs from the database.
174pub trait DatabaseKey: Sized {
175    /// Send a notification to tasks waiting to be notified if the value of
176    /// `DatabaseKey` is modified
177    ///
178    /// For instance, this can be used to be notified when a key in the
179    /// database is created. It is also possible to run a closure with the
180    /// value of the `DatabaseKey` as parameter to verify some changes to
181    /// that value.
182    const NOTIFY_ON_MODIFY: bool = false;
183    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
187pub trait DatabaseKeyWithNotify {}
188
189/// `DatabaseValue` that represents the value structure of database records.
190pub trait DatabaseValue: Sized + Debug {
191    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192    fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197/// Just ignore this type, it's only there to make compiler happy
198///
199/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
200pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202/// Error returned when the autocommit function fails
203#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205    /// Committing the transaction failed too many times, giving up
206    #[error("Commit Failed: {last_error}")]
207    CommitFailed {
208        /// Number of attempts
209        attempts: usize,
210        /// Last error on commit
211        last_error: anyhow::Error,
212    },
213    /// Error returned by the closure provided to `autocommit`. If returned no
214    /// commit was attempted in that round
215    #[error("Closure error: {error}")]
216    ClosureError {
217        /// The attempt on which the closure returned an error
218        ///
219        /// Values other than 0 typically indicate a logic error since the
220        /// closure given to `autocommit` should not have side effects
221        /// and thus keep succeeding if it succeeded once.
222        attempts: usize,
223        /// Error returned by the closure
224        error: E,
225    },
226}
227
228pub trait AutocommitResultExt<T, E> {
229    /// Unwraps the "commit failed" error variant. Use this in cases where
230    /// autocommit is instructed to run indefinitely and commit will thus never
231    /// fail.
232    fn unwrap_autocommit(self) -> Result<T, E>;
233}
234
235impl<T, E> AutocommitResultExt<T, E> for Result<T, AutocommitError<E>> {
236    fn unwrap_autocommit(self) -> Result<T, E> {
237        match self {
238            Ok(value) => Ok(value),
239            Err(AutocommitError::CommitFailed { .. }) => {
240                panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
241            }
242            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
243        }
244    }
245}
246
247/// Raw database implementation
248///
249/// This and [`IRawDatabaseTransaction`] are meant to be implemented
250/// by crates like `fedimint-rocksdb` to provide a concrete implementation
251/// of a database to be used by Fedimint.
252///
253/// This is in contrast of [`IDatabase`] which includes extra
254/// functionality that Fedimint needs (and adds) on top of it.
255#[apply(async_trait_maybe_send!)]
256pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
257    /// A raw database transaction type
258    type Transaction<'a>: IRawDatabaseTransaction + Debug;
259
260    /// Start a database transaction
261    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
262
263    // Checkpoint the database to a backup directory
264    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
265}
266
267#[apply(async_trait_maybe_send!)]
268impl<T> IRawDatabase for Box<T>
269where
270    T: IRawDatabase,
271{
272    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
273
274    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
275        (**self).begin_transaction().await
276    }
277
278    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
279        (**self).checkpoint(backup_path)
280    }
281}
282
283/// An extension trait with convenience operations on [`IRawDatabase`]
284pub trait IRawDatabaseExt: IRawDatabase + Sized {
285    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
286    ///
287    /// When type inference is not an issue, [`Into::into`] can be used instead.
288    fn into_database(self) -> Database {
289        Database::new(self, ModuleRegistry::default())
290    }
291}
292
293impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
294
295impl<T> From<T> for Database
296where
297    T: IRawDatabase,
298{
299    fn from(raw: T) -> Self {
300        Self::new(raw, ModuleRegistry::default())
301    }
302}
303
304/// A database that on top of a raw database operation, implements
305/// key notification system.
306#[apply(async_trait_maybe_send!)]
307pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
308    /// Start a database transaction
309    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
310    /// Register (and wait) for `key` updates
311    async fn register(&self, key: &[u8]);
312    /// Notify about `key` update (creation, modification, deletion)
313    async fn notify(&self, key: &[u8]);
314
315    /// The prefix len of this database refers to the global (as opposed to
316    /// module-isolated) key space
317    fn is_global(&self) -> bool;
318
319    /// Checkpoints the database to a backup directory
320    fn checkpoint(&self, backup_path: &Path) -> Result<()>;
321}
322
323#[apply(async_trait_maybe_send!)]
324impl<T> IDatabase for Arc<T>
325where
326    T: IDatabase + ?Sized,
327{
328    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
329        (**self).begin_transaction().await
330    }
331    async fn register(&self, key: &[u8]) {
332        (**self).register(key).await;
333    }
334    async fn notify(&self, key: &[u8]) {
335        (**self).notify(key).await;
336    }
337
338    fn is_global(&self) -> bool {
339        (**self).is_global()
340    }
341
342    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
343        (**self).checkpoint(backup_path)
344    }
345}
346
347/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
348///
349/// Mostly notification system, but also run-time single-commit handling.
350struct BaseDatabase<RawDatabase> {
351    notifications: Arc<Notifications>,
352    raw: RawDatabase,
353}
354
355impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
356    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357        f.write_str("BaseDatabase")
358    }
359}
360
361#[apply(async_trait_maybe_send!)]
362impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
363    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
364        Box::new(BaseDatabaseTransaction::new(
365            self.raw.begin_transaction().await,
366            self.notifications.clone(),
367        ))
368    }
369    async fn register(&self, key: &[u8]) {
370        self.notifications.register(key).await;
371    }
372    async fn notify(&self, key: &[u8]) {
373        self.notifications.notify(key);
374    }
375
376    fn is_global(&self) -> bool {
377        true
378    }
379
380    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
381        self.raw.checkpoint(backup_path)
382    }
383}
384
385/// A public-facing newtype over `IDatabase`
386///
387/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
388/// and implements common utility function for auto-commits, db isolation,
389/// and other.
390#[derive(Clone, Debug)]
391pub struct Database {
392    inner: Arc<dyn IDatabase + 'static>,
393    module_decoders: ModuleDecoderRegistry,
394}
395
396impl Database {
397    pub fn strong_count(&self) -> usize {
398        Arc::strong_count(&self.inner)
399    }
400
401    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
402        self.inner
403    }
404}
405
406impl Database {
407    /// Creates a new Fedimint database from any object implementing
408    /// [`IDatabase`].
409    ///
410    /// See also [`Database::new_from_arc`].
411    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
412        let inner = BaseDatabase {
413            raw,
414            notifications: Arc::new(Notifications::new()),
415        };
416        Self::new_from_arc(
417            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
418            module_decoders,
419        )
420    }
421
422    /// Create [`Database`] from an already typed-erased `IDatabase`.
423    pub fn new_from_arc(
424        inner: Arc<dyn IDatabase + 'static>,
425        module_decoders: ModuleDecoderRegistry,
426    ) -> Self {
427        Self {
428            inner,
429            module_decoders,
430        }
431    }
432
433    /// Create [`Database`] isolated to a partition with a given `prefix`
434    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
435        Self {
436            inner: Arc::new(PrefixDatabase {
437                inner: self.inner.clone(),
438                global_dbtx_access_token: None,
439                prefix,
440            }),
441            module_decoders: self.module_decoders.clone(),
442        }
443    }
444
445    /// Create [`Database`] isolated to a partition with a prefix for a given
446    /// `module_instance_id`, allowing the module to access `global_dbtx` with
447    /// the right `access_token`
448    pub fn with_prefix_module_id(
449        &self,
450        module_instance_id: ModuleInstanceId,
451    ) -> (Self, GlobalDBTxAccessToken) {
452        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
453        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
454        (
455            Self {
456                inner: Arc::new(PrefixDatabase {
457                    inner: self.inner.clone(),
458                    global_dbtx_access_token: Some(global_dbtx_access_token),
459                    prefix,
460                }),
461                module_decoders: self.module_decoders.clone(),
462            },
463            global_dbtx_access_token,
464        )
465    }
466
467    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
468        Self {
469            inner: self.inner.clone(),
470            module_decoders,
471        }
472    }
473
474    /// Is this `Database` a global, unpartitioned `Database`
475    pub fn is_global(&self) -> bool {
476        self.inner.is_global()
477    }
478
479    /// `Err` if [`Self::is_global`] is not true
480    pub fn ensure_global(&self) -> Result<()> {
481        if !self.is_global() {
482            bail!("Database instance not global");
483        }
484
485        Ok(())
486    }
487
488    /// `Err` if [`Self::is_global`] is true
489    pub fn ensure_isolated(&self) -> Result<()> {
490        if self.is_global() {
491            bail!("Database instance not isolated");
492        }
493
494        Ok(())
495    }
496
497    /// Begin a new committable database transaction
498    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
499    where
500        's: 'tx,
501    {
502        DatabaseTransaction::<Committable>::new(
503            self.inner.begin_transaction().await,
504            self.module_decoders.clone(),
505        )
506    }
507
508    /// Begin a new non-committable database transaction
509    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
510    where
511        's: 'tx,
512    {
513        self.begin_transaction().await.into_nc()
514    }
515
516    pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
517        self.inner.checkpoint(backup_path)
518    }
519
520    /// Runs a closure with a reference to a database transaction and tries to
521    /// commit the transaction if the closure returns `Ok` and rolls it back
522    /// otherwise. If committing fails the closure is run for up to
523    /// `max_attempts` times. If `max_attempts` is `None` it will run
524    /// `usize::MAX` times which is close enough to infinite times.
525    ///
526    /// The closure `tx_fn` provided should not have side effects outside of the
527    /// database transaction provided, or if it does these should be
528    /// idempotent, since the closure might be run multiple times.
529    ///
530    /// # Lifetime Parameters
531    ///
532    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
533    /// mutable reference to the database transaction ensures that the
534    /// reference lives as least as long as the returned future of the
535    /// closure.
536    ///
537    /// Further, the reference to self (`'s`) must outlive the
538    /// `DatabaseTransaction<'dt>`. In other words, the
539    /// `DatabaseTransaction` must live as least as long as `self` and that is
540    /// true as the `DatabaseTransaction` is only dropped at the end of the
541    /// `loop{}`.
542    ///
543    /// # Panics
544    ///
545    /// This function panics when the given number of maximum attempts is zero.
546    /// `max_attempts` must be greater or equal to one.
547    pub async fn autocommit<'s, 'dbtx, F, T, E>(
548        &'s self,
549        tx_fn: F,
550        max_attempts: Option<usize>,
551    ) -> Result<T, AutocommitError<E>>
552    where
553        's: 'dbtx,
554        for<'r, 'o> F: Fn(
555            &'r mut DatabaseTransaction<'o>,
556            PhantomBound<'dbtx, 'o>,
557        ) -> BoxFuture<'r, Result<T, E>>,
558    {
559        assert_ne!(max_attempts, Some(0));
560        let mut curr_attempts: usize = 0;
561
562        loop {
563            // The `checked_add()` function is used to catch the `usize` overflow.
564            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
565            // after ~50 days. But if that's the case, something else must be wrong.
566            // With `usize=64bit` it would take much longer, obviously.
567            curr_attempts = curr_attempts
568                .checked_add(1)
569                .expect("db autocommit attempt counter overflowed");
570
571            let mut dbtx = self.begin_transaction().await;
572
573            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
574            let val = match tx_fn_res {
575                Ok(val) => val,
576                Err(err) => {
577                    dbtx.ignore_uncommitted();
578                    return Err(AutocommitError::ClosureError {
579                        attempts: curr_attempts,
580                        error: err,
581                    });
582                }
583            };
584
585            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
586
587            match dbtx.commit_tx_result().await {
588                Ok(()) => {
589                    return Ok(val);
590                }
591                Err(err) => {
592                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
593                        warn!(
594                            target: LOG_DB,
595                            curr_attempts,
596                            err = %err.fmt_compact_anyhow(),
597                            "Database commit failed in an autocommit block - terminating"
598                        );
599                        return Err(AutocommitError::CommitFailed {
600                            attempts: curr_attempts,
601                            last_error: err,
602                        });
603                    }
604
605                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
606                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
607                    warn!(
608                        target: LOG_DB,
609                        curr_attempts,
610                        err = %err.fmt_compact_anyhow(),
611                        delay_ms = %delay,
612                        "Database commit failed in an autocommit block - retrying"
613                    );
614                    crate::runtime::sleep(Duration::from_millis(delay)).await;
615                }
616            }
617        }
618    }
619
620    /// Waits for key to be notified.
621    ///
622    /// Calls the `checker` when value of the key may have changed.
623    /// Returns the value when `checker` returns a `Some(T)`.
624    pub async fn wait_key_check<'a, K, T>(
625        &'a self,
626        key: &K,
627        checker: impl Fn(Option<K::Value>) -> Option<T>,
628    ) -> (T, DatabaseTransaction<'a, Committable>)
629    where
630        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
631    {
632        let key_bytes = key.to_bytes();
633        loop {
634            // register for notification
635            let notify = self.inner.register(&key_bytes);
636
637            // check for value in db
638            let mut tx = self.inner.begin_transaction().await;
639
640            let maybe_value_bytes = tx
641                .raw_get_bytes(&key_bytes)
642                .await
643                .expect("Unrecoverable error when reading from database")
644                .map(|value_bytes| {
645                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
646                });
647
648            if let Some(value) = checker(maybe_value_bytes) {
649                return (
650                    value,
651                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
652                );
653            }
654
655            // key not found, try again
656            notify.await;
657            // if miss a notification between await and next register, it is
658            // fine. because we are going check the database
659        }
660    }
661
662    /// Waits for key to be present in database.
663    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
664    where
665        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
666    {
667        self.wait_key_check(key, std::convert::identity).await.0
668    }
669}
670
671fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
672    let mut bytes = vec![MODULE_GLOBAL_PREFIX];
673    bytes.append(&mut module_instance_id.consensus_encode_to_vec());
674    bytes
675}
676
677/// A database that wraps an `inner` one and adds a prefix to all operations,
678/// effectively creating an isolated partition.
679#[derive(Clone, Debug)]
680struct PrefixDatabase<Inner>
681where
682    Inner: Debug,
683{
684    prefix: Vec<u8>,
685    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
686    inner: Inner,
687}
688
689impl<Inner> PrefixDatabase<Inner>
690where
691    Inner: Debug,
692{
693    // TODO: we should optimize these concatenations, maybe by having an internal
694    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
695    // something
696    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
697        let mut full_key = self.prefix.clone();
698        full_key.extend_from_slice(key);
699        full_key
700    }
701}
702
703#[apply(async_trait_maybe_send!)]
704impl<Inner> IDatabase for PrefixDatabase<Inner>
705where
706    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
707{
708    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
709        Box::new(PrefixDatabaseTransaction {
710            inner: self.inner.begin_transaction().await,
711            global_dbtx_access_token: self.global_dbtx_access_token,
712            prefix: self.prefix.clone(),
713        })
714    }
715    async fn register(&self, key: &[u8]) {
716        self.inner.register(&self.get_full_key(key)).await;
717    }
718
719    async fn notify(&self, key: &[u8]) {
720        self.inner.notify(&self.get_full_key(key)).await;
721    }
722
723    fn is_global(&self) -> bool {
724        if self.global_dbtx_access_token.is_some() {
725            false
726        } else {
727            self.inner.is_global()
728        }
729    }
730
731    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
732        self.inner.checkpoint(backup_path)
733    }
734}
735
736/// A database transactions that wraps an `inner` one and adds a prefix to all
737/// operations, effectively creating an isolated partition.
738///
739/// Produced by [`PrefixDatabase`].
740#[derive(Debug)]
741struct PrefixDatabaseTransaction<Inner> {
742    inner: Inner,
743    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
744    prefix: Vec<u8>,
745}
746
747impl<Inner> PrefixDatabaseTransaction<Inner> {
748    // TODO: we should optimize these concatenations, maybe by having an internal
749    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
750    // something
751    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
752        let mut full_key = self.prefix.clone();
753        full_key.extend_from_slice(key);
754        full_key
755    }
756
757    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
758        Range {
759            start: self.get_full_key(range.start),
760            end: self.get_full_key(range.end),
761        }
762    }
763
764    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
765        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
766    }
767}
768
769#[apply(async_trait_maybe_send!)]
770impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
771where
772    Inner: IDatabaseTransaction,
773{
774    async fn commit_tx(&mut self) -> Result<()> {
775        self.inner.commit_tx().await
776    }
777
778    fn is_global(&self) -> bool {
779        if self.global_dbtx_access_token.is_some() {
780            false
781        } else {
782            self.inner.is_global()
783        }
784    }
785
786    fn global_dbtx(
787        &mut self,
788        access_token: GlobalDBTxAccessToken,
789    ) -> &mut dyn IDatabaseTransaction {
790        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
791            assert_eq!(
792                access_token, self_global_dbtx_access_token,
793                "Invalid access key used to access global_dbtx"
794            );
795            &mut self.inner
796        } else {
797            self.inner.global_dbtx(access_token)
798        }
799    }
800}
801
802#[apply(async_trait_maybe_send!)]
803impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
804where
805    Inner: IDatabaseTransactionOpsCore,
806{
807    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
808        let key = self.get_full_key(key);
809        self.inner.raw_insert_bytes(&key, value).await
810    }
811
812    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
813        let key = self.get_full_key(key);
814        self.inner.raw_get_bytes(&key).await
815    }
816
817    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
818        let key = self.get_full_key(key);
819        self.inner.raw_remove_entry(&key).await
820    }
821
822    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
823        let key = self.get_full_key(key_prefix);
824        let stream = self.inner.raw_find_by_prefix(&key).await?;
825        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
826    }
827
828    async fn raw_find_by_prefix_sorted_descending(
829        &mut self,
830        key_prefix: &[u8],
831    ) -> Result<PrefixStream<'_>> {
832        let key = self.get_full_key(key_prefix);
833        let stream = self
834            .inner
835            .raw_find_by_prefix_sorted_descending(&key)
836            .await?;
837        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
838    }
839
840    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
841        let range = self.get_full_range(range);
842        let stream = self
843            .inner
844            .raw_find_by_range(Range {
845                start: &range.start,
846                end: &range.end,
847            })
848            .await?;
849        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
850    }
851
852    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
853        let key = self.get_full_key(key_prefix);
854        self.inner.raw_remove_by_prefix(&key).await
855    }
856}
857
858#[apply(async_trait_maybe_send!)]
859impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
860where
861    Inner: IDatabaseTransactionOps,
862{
863    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
864        self.inner.rollback_tx_to_savepoint().await
865    }
866
867    async fn set_tx_savepoint(&mut self) -> Result<()> {
868        self.set_tx_savepoint().await
869    }
870}
871
872/// Core raw a operations database transactions supports
873///
874/// Used to enforce the same signature on all types supporting it
875#[apply(async_trait_maybe_send!)]
876pub trait IDatabaseTransactionOpsCore: MaybeSend {
877    /// Insert entry
878    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
879
880    /// Get key value
881    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
882
883    /// Remove entry by `key`
884    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
885
886    /// Returns an stream of key-value pairs with keys that start with
887    /// `key_prefix`, sorted by key.
888    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
889
890    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
891    async fn raw_find_by_prefix_sorted_descending(
892        &mut self,
893        key_prefix: &[u8],
894    ) -> Result<PrefixStream<'_>>;
895
896    /// Returns an stream of key-value pairs with keys within a `range`, sorted
897    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
898    /// exclusively above.
899    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
900
901    /// Delete keys matching prefix
902    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
903}
904
905#[apply(async_trait_maybe_send!)]
906impl<T> IDatabaseTransactionOpsCore for Box<T>
907where
908    T: IDatabaseTransactionOpsCore + ?Sized,
909{
910    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
911        (**self).raw_insert_bytes(key, value).await
912    }
913
914    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
915        (**self).raw_get_bytes(key).await
916    }
917
918    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
919        (**self).raw_remove_entry(key).await
920    }
921
922    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
923        (**self).raw_find_by_prefix(key_prefix).await
924    }
925
926    async fn raw_find_by_prefix_sorted_descending(
927        &mut self,
928        key_prefix: &[u8],
929    ) -> Result<PrefixStream<'_>> {
930        (**self)
931            .raw_find_by_prefix_sorted_descending(key_prefix)
932            .await
933    }
934
935    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
936        (**self).raw_find_by_range(range).await
937    }
938
939    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
940        (**self).raw_remove_by_prefix(key_prefix).await
941    }
942}
943
944#[apply(async_trait_maybe_send!)]
945impl<T> IDatabaseTransactionOpsCore for &mut T
946where
947    T: IDatabaseTransactionOpsCore + ?Sized,
948{
949    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
950        (**self).raw_insert_bytes(key, value).await
951    }
952
953    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
954        (**self).raw_get_bytes(key).await
955    }
956
957    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
958        (**self).raw_remove_entry(key).await
959    }
960
961    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
962        (**self).raw_find_by_prefix(key_prefix).await
963    }
964
965    async fn raw_find_by_prefix_sorted_descending(
966        &mut self,
967        key_prefix: &[u8],
968    ) -> Result<PrefixStream<'_>> {
969        (**self)
970            .raw_find_by_prefix_sorted_descending(key_prefix)
971            .await
972    }
973
974    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
975        (**self).raw_find_by_range(range).await
976    }
977
978    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
979        (**self).raw_remove_by_prefix(key_prefix).await
980    }
981}
982
983/// Additional operations (only some) database transactions expose, on top of
984/// [`IDatabaseTransactionOpsCore`]
985///
986/// In certain contexts exposing these operations would be a problem, so they
987/// are moved to a separate trait.
988#[apply(async_trait_maybe_send!)]
989pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
990    /// Create a savepoint during the transaction that can be rolled back to
991    /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
992    /// atomically remove the writes that were applied since the savepoint
993    /// was created.
994    ///
995    /// Warning: Avoid using this in fedimint client code as not all database
996    /// transaction implementations will support setting a savepoint during
997    /// a transaction.
998    async fn set_tx_savepoint(&mut self) -> Result<()>;
999
1000    async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
1001}
1002
1003#[apply(async_trait_maybe_send!)]
1004impl<T> IDatabaseTransactionOps for Box<T>
1005where
1006    T: IDatabaseTransactionOps + ?Sized,
1007{
1008    async fn set_tx_savepoint(&mut self) -> Result<()> {
1009        (**self).set_tx_savepoint().await
1010    }
1011
1012    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1013        (**self).rollback_tx_to_savepoint().await
1014    }
1015}
1016
1017#[apply(async_trait_maybe_send!)]
1018impl<T> IDatabaseTransactionOps for &mut T
1019where
1020    T: IDatabaseTransactionOps + ?Sized,
1021{
1022    async fn set_tx_savepoint(&mut self) -> Result<()> {
1023        (**self).set_tx_savepoint().await
1024    }
1025
1026    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1027        (**self).rollback_tx_to_savepoint().await
1028    }
1029}
1030
1031/// Like [`IDatabaseTransactionOpsCore`], but typed
1032///
1033/// Implemented via blanket impl for everything that implements
1034/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1035/// [`WithDecoders`]).
1036#[apply(async_trait_maybe_send!)]
1037pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1038    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1039    where
1040        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1041
1042    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1043    where
1044        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1045        K::Value: MaybeSend + MaybeSync;
1046
1047    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1048    where
1049        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1050        K::Value: MaybeSend + MaybeSync;
1051
1052    async fn find_by_range<K>(
1053        &mut self,
1054        key_range: Range<K>,
1055    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1056    where
1057        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1058        K::Value: MaybeSend + MaybeSync;
1059
1060    async fn find_by_prefix<KP>(
1061        &mut self,
1062        key_prefix: &KP,
1063    ) -> Pin<
1064        Box<
1065            maybe_add_send!(
1066                dyn Stream<
1067                        Item = (
1068                            KP::Record,
1069                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070                        ),
1071                    > + '_
1072            ),
1073        >,
1074    >
1075    where
1076        KP: DatabaseLookup + MaybeSend + MaybeSync,
1077        KP::Record: DatabaseKey;
1078
1079    async fn find_by_prefix_sorted_descending<KP>(
1080        &mut self,
1081        key_prefix: &KP,
1082    ) -> Pin<
1083        Box<
1084            maybe_add_send!(
1085                dyn Stream<
1086                        Item = (
1087                            KP::Record,
1088                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1089                        ),
1090                    > + '_
1091            ),
1092        >,
1093    >
1094    where
1095        KP: DatabaseLookup + MaybeSend + MaybeSync,
1096        KP::Record: DatabaseKey;
1097
1098    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1099    where
1100        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1101
1102    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1103    where
1104        KP: DatabaseLookup + MaybeSend + MaybeSync;
1105}
1106
1107// blanket implementation of typed ops for anything that implements raw ops and
1108// has decoders
1109#[apply(async_trait_maybe_send!)]
1110impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1111where
1112    T: IDatabaseTransactionOpsCore + WithDecoders,
1113{
1114    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1115    where
1116        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1117    {
1118        let key_bytes = key.to_bytes();
1119        let raw = self
1120            .raw_get_bytes(&key_bytes)
1121            .await
1122            .expect("Unrecoverable error occurred while reading and entry from the database");
1123        raw.map(|value_bytes| {
1124            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1125        })
1126    }
1127
1128    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1129    where
1130        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1131        K::Value: MaybeSend + MaybeSync,
1132    {
1133        let key_bytes = key.to_bytes();
1134        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1135            .await
1136            .expect("Unrecoverable error occurred while inserting entry into the database")
1137            .map(|value_bytes| {
1138                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1139            })
1140    }
1141
1142    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1143    where
1144        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1145        K::Value: MaybeSend + MaybeSync,
1146    {
1147        if let Some(prev) = self.insert_entry(key, value).await {
1148            panic!(
1149                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1150            );
1151        }
1152    }
1153
1154    async fn find_by_range<K>(
1155        &mut self,
1156        key_range: Range<K>,
1157    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1158    where
1159        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1160        K::Value: MaybeSend + MaybeSync,
1161    {
1162        let decoders = self.decoders().clone();
1163        Box::pin(
1164            self.raw_find_by_range(Range {
1165                start: &key_range.start.to_bytes(),
1166                end: &key_range.end.to_bytes(),
1167            })
1168            .await
1169            .expect("Unrecoverable error occurred while listing entries from the database")
1170            .map(move |(key_bytes, value_bytes)| {
1171                let key = decode_key_expect(&key_bytes, &decoders);
1172                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1173                (key, value)
1174            }),
1175        )
1176    }
1177
1178    async fn find_by_prefix<KP>(
1179        &mut self,
1180        key_prefix: &KP,
1181    ) -> Pin<
1182        Box<
1183            maybe_add_send!(
1184                dyn Stream<
1185                        Item = (
1186                            KP::Record,
1187                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1188                        ),
1189                    > + '_
1190            ),
1191        >,
1192    >
1193    where
1194        KP: DatabaseLookup + MaybeSend + MaybeSync,
1195        KP::Record: DatabaseKey,
1196    {
1197        let decoders = self.decoders().clone();
1198        Box::pin(
1199            self.raw_find_by_prefix(&key_prefix.to_bytes())
1200                .await
1201                .expect("Unrecoverable error occurred while listing entries from the database")
1202                .map(move |(key_bytes, value_bytes)| {
1203                    let key = decode_key_expect(&key_bytes, &decoders);
1204                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1205                    (key, value)
1206                }),
1207        )
1208    }
1209
1210    async fn find_by_prefix_sorted_descending<KP>(
1211        &mut self,
1212        key_prefix: &KP,
1213    ) -> Pin<
1214        Box<
1215            maybe_add_send!(
1216                dyn Stream<
1217                        Item = (
1218                            KP::Record,
1219                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1220                        ),
1221                    > + '_
1222            ),
1223        >,
1224    >
1225    where
1226        KP: DatabaseLookup + MaybeSend + MaybeSync,
1227        KP::Record: DatabaseKey,
1228    {
1229        let decoders = self.decoders().clone();
1230        Box::pin(
1231            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1232                .await
1233                .expect("Unrecoverable error occurred while listing entries from the database")
1234                .map(move |(key_bytes, value_bytes)| {
1235                    let key = decode_key_expect(&key_bytes, &decoders);
1236                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1237                    (key, value)
1238                }),
1239        )
1240    }
1241    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1242    where
1243        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1244    {
1245        let key_bytes = key.to_bytes();
1246        self.raw_remove_entry(&key_bytes)
1247            .await
1248            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1249            .map(|value_bytes| {
1250                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1251            })
1252    }
1253    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1254    where
1255        KP: DatabaseLookup + MaybeSend + MaybeSync,
1256    {
1257        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1258            .await
1259            .expect("Unrecoverable error when removing entries from the database");
1260    }
1261}
1262
1263/// A database type that has decoders, which allows it to implement
1264/// [`IDatabaseTransactionOpsCoreTyped`]
1265pub trait WithDecoders {
1266    fn decoders(&self) -> &ModuleDecoderRegistry;
1267}
1268
1269/// Raw database transaction (e.g. rocksdb implementation)
1270#[apply(async_trait_maybe_send!)]
1271pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1272    async fn commit_tx(self) -> Result<()>;
1273}
1274
1275/// Fedimint database transaction
1276///
1277/// See [`IDatabase`] for more info.
1278#[apply(async_trait_maybe_send!)]
1279pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1280    /// Commit the transaction
1281    async fn commit_tx(&mut self) -> Result<()>;
1282
1283    /// Is global database
1284    fn is_global(&self) -> bool;
1285
1286    /// Get the global database tx from a module-prefixed database transaction
1287    ///
1288    /// Meant to be called only by core internals, and module developers should
1289    /// not call it directly.
1290    #[doc(hidden)]
1291    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1292    -> &mut dyn IDatabaseTransaction;
1293}
1294
1295#[apply(async_trait_maybe_send!)]
1296impl<T> IDatabaseTransaction for Box<T>
1297where
1298    T: IDatabaseTransaction + ?Sized,
1299{
1300    async fn commit_tx(&mut self) -> Result<()> {
1301        (**self).commit_tx().await
1302    }
1303
1304    fn is_global(&self) -> bool {
1305        (**self).is_global()
1306    }
1307
1308    fn global_dbtx(
1309        &mut self,
1310        access_token: GlobalDBTxAccessToken,
1311    ) -> &mut dyn IDatabaseTransaction {
1312        (**self).global_dbtx(access_token)
1313    }
1314}
1315
1316#[apply(async_trait_maybe_send!)]
1317impl<'a, T> IDatabaseTransaction for &'a mut T
1318where
1319    T: IDatabaseTransaction + ?Sized,
1320{
1321    async fn commit_tx(&mut self) -> Result<()> {
1322        (**self).commit_tx().await
1323    }
1324
1325    fn is_global(&self) -> bool {
1326        (**self).is_global()
1327    }
1328
1329    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1330        (**self).global_dbtx(access_key)
1331    }
1332}
1333
1334/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1335/// easier in other structs since it does not consumed `self` by move.
1336struct BaseDatabaseTransaction<Tx> {
1337    // TODO: merge options
1338    raw: Option<Tx>,
1339    notify_queue: Option<NotifyQueue>,
1340    notifications: Arc<Notifications>,
1341}
1342
1343impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1344where
1345    Tx: fmt::Debug,
1346{
1347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1348        f.write_fmt(format_args!(
1349            "BaseDatabaseTransaction{{ raw={:?} }}",
1350            self.raw
1351        ))
1352    }
1353}
1354impl<Tx> BaseDatabaseTransaction<Tx>
1355where
1356    Tx: IRawDatabaseTransaction,
1357{
1358    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1359        Self {
1360            raw: Some(dbtx),
1361            notifications,
1362            notify_queue: Some(NotifyQueue::new()),
1363        }
1364    }
1365
1366    fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1367        self.notify_queue
1368            .as_mut()
1369            .context("can not call add_notification_key after commit")?
1370            .add(&key);
1371        Ok(())
1372    }
1373}
1374
1375#[apply(async_trait_maybe_send!)]
1376impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1377    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1378        self.add_notification_key(key)?;
1379        self.raw
1380            .as_mut()
1381            .context("Cannot insert into already consumed transaction")?
1382            .raw_insert_bytes(key, value)
1383            .await
1384    }
1385
1386    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1387        self.raw
1388            .as_mut()
1389            .context("Cannot retrieve from already consumed transaction")?
1390            .raw_get_bytes(key)
1391            .await
1392    }
1393
1394    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1395        self.add_notification_key(key)?;
1396        self.raw
1397            .as_mut()
1398            .context("Cannot remove from already consumed transaction")?
1399            .raw_remove_entry(key)
1400            .await
1401    }
1402
1403    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1404        self.raw
1405            .as_mut()
1406            .context("Cannot retrieve from already consumed transaction")?
1407            .raw_find_by_range(key_range)
1408            .await
1409    }
1410
1411    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1412        self.raw
1413            .as_mut()
1414            .context("Cannot retrieve from already consumed transaction")?
1415            .raw_find_by_prefix(key_prefix)
1416            .await
1417    }
1418
1419    async fn raw_find_by_prefix_sorted_descending(
1420        &mut self,
1421        key_prefix: &[u8],
1422    ) -> Result<PrefixStream<'_>> {
1423        self.raw
1424            .as_mut()
1425            .context("Cannot retrieve from already consumed transaction")?
1426            .raw_find_by_prefix_sorted_descending(key_prefix)
1427            .await
1428    }
1429
1430    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1431        self.raw
1432            .as_mut()
1433            .context("Cannot remove from already consumed transaction")?
1434            .raw_remove_by_prefix(key_prefix)
1435            .await
1436    }
1437}
1438
1439#[apply(async_trait_maybe_send!)]
1440impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1441    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1442        self.raw
1443            .as_mut()
1444            .context("Cannot rollback to a savepoint on an already consumed transaction")?
1445            .rollback_tx_to_savepoint()
1446            .await?;
1447        Ok(())
1448    }
1449
1450    async fn set_tx_savepoint(&mut self) -> Result<()> {
1451        self.raw
1452            .as_mut()
1453            .context("Cannot set a tx savepoint on an already consumed transaction")?
1454            .set_tx_savepoint()
1455            .await?;
1456        Ok(())
1457    }
1458}
1459
1460#[apply(async_trait_maybe_send!)]
1461impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1462    for BaseDatabaseTransaction<Tx>
1463{
1464    async fn commit_tx(&mut self) -> Result<()> {
1465        self.raw
1466            .take()
1467            .context("Cannot commit an already committed transaction")?
1468            .commit_tx()
1469            .await?;
1470        self.notifications.submit_queue(
1471            &self
1472                .notify_queue
1473                .take()
1474                .expect("commit must be called only once"),
1475        );
1476        Ok(())
1477    }
1478
1479    fn is_global(&self) -> bool {
1480        true
1481    }
1482
1483    fn global_dbtx(
1484        &mut self,
1485        _access_token: GlobalDBTxAccessToken,
1486    ) -> &mut dyn IDatabaseTransaction {
1487        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1488    }
1489}
1490
1491/// A helper for tracking and logging on `Drop` any instances of uncommitted
1492/// writes
1493#[derive(Clone)]
1494struct CommitTracker {
1495    /// Is the dbtx committed
1496    is_committed: bool,
1497    /// Does the dbtx have any writes
1498    has_writes: bool,
1499    /// Don't warn-log uncommitted writes
1500    ignore_uncommitted: bool,
1501}
1502
1503impl Drop for CommitTracker {
1504    fn drop(&mut self) {
1505        if self.has_writes && !self.is_committed {
1506            if self.ignore_uncommitted {
1507                trace!(
1508                    target: LOG_DB,
1509                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1510                );
1511            } else {
1512                warn!(
1513                    target: LOG_DB,
1514                    location = ?backtrace::Backtrace::new(),
1515                    "DatabaseTransaction has writes and has not called commit."
1516                );
1517            }
1518        }
1519    }
1520}
1521
1522enum MaybeRef<'a, T> {
1523    Owned(T),
1524    Borrowed(&'a mut T),
1525}
1526
1527impl<T> ops::Deref for MaybeRef<'_, T> {
1528    type Target = T;
1529
1530    fn deref(&self) -> &Self::Target {
1531        match self {
1532            MaybeRef::Owned(o) => o,
1533            MaybeRef::Borrowed(r) => r,
1534        }
1535    }
1536}
1537
1538impl<T> ops::DerefMut for MaybeRef<'_, T> {
1539    fn deref_mut(&mut self) -> &mut Self::Target {
1540        match self {
1541            MaybeRef::Owned(o) => o,
1542            MaybeRef::Borrowed(r) => r,
1543        }
1544    }
1545}
1546
1547/// Session type for [`DatabaseTransaction`] that is allowed to commit
1548///
1549/// Opposite of [`NonCommittable`].
1550pub struct Committable;
1551
1552/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1553///
1554/// Opposite of [`Committable`].
1555pub struct NonCommittable;
1556
1557/// A high level database transaction handle
1558///
1559/// `Cap` is a session type
1560pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1561    tx: Box<dyn IDatabaseTransaction + 'tx>,
1562    decoders: ModuleDecoderRegistry,
1563    commit_tracker: MaybeRef<'tx, CommitTracker>,
1564    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1565    capability: marker::PhantomData<Cap>,
1566}
1567
1568impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1570        f.write_fmt(format_args!(
1571            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1572            self.tx, self.decoders
1573        ))
1574    }
1575}
1576
1577impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1578    fn decoders(&self) -> &ModuleDecoderRegistry {
1579        &self.decoders
1580    }
1581}
1582
1583#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1584fn decode_value<V: DatabaseValue>(
1585    value_bytes: &[u8],
1586    decoders: &ModuleDecoderRegistry,
1587) -> Result<V, DecodingError> {
1588    trace!(
1589        bytes = %AbbreviateHexBytes(value_bytes),
1590        "decoding value",
1591    );
1592    V::from_bytes(value_bytes, decoders)
1593}
1594
1595#[track_caller]
1596fn decode_value_expect<V: DatabaseValue>(
1597    value_bytes: &[u8],
1598    decoders: &ModuleDecoderRegistry,
1599    key_bytes: &[u8],
1600) -> V {
1601    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1602        panic!(
1603            "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1604            any::type_name::<V>(),
1605            err,
1606            AbbreviateHexBytes(key_bytes),
1607            AbbreviateHexBytes(value_bytes),
1608        )
1609    })
1610}
1611
1612#[track_caller]
1613fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1614    trace!(
1615        bytes = %AbbreviateHexBytes(key_bytes),
1616        "decoding key",
1617    );
1618    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1619        panic!(
1620            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1621            any::type_name::<K>(),
1622            err,
1623            AbbreviateHexBytes(key_bytes)
1624        )
1625    })
1626}
1627
1628impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1629    /// Convert into a non-committable version
1630    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1631        DatabaseTransaction {
1632            tx: self.tx,
1633            decoders: self.decoders,
1634            commit_tracker: self.commit_tracker,
1635            on_commit_hooks: self.on_commit_hooks,
1636            capability: PhantomData::<NonCommittable>,
1637        }
1638    }
1639
1640    /// Get a reference to a non-committeable version
1641    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1642    where
1643        's: 'a,
1644    {
1645        self.to_ref().into_nc()
1646    }
1647
1648    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1649    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1650    where
1651        'tx: 'a,
1652    {
1653        DatabaseTransaction {
1654            tx: Box::new(PrefixDatabaseTransaction {
1655                inner: self.tx,
1656                global_dbtx_access_token: None,
1657                prefix,
1658            }),
1659            decoders: self.decoders,
1660            commit_tracker: self.commit_tracker,
1661            on_commit_hooks: self.on_commit_hooks,
1662            capability: self.capability,
1663        }
1664    }
1665
1666    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1667    /// `module_instance_id`, allowing the module to access global_dbtx
1668    /// with the right access token.
1669    pub fn with_prefix_module_id<'a: 'tx>(
1670        self,
1671        module_instance_id: ModuleInstanceId,
1672    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1673    where
1674        'tx: 'a,
1675    {
1676        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1677        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1678        (
1679            DatabaseTransaction {
1680                tx: Box::new(PrefixDatabaseTransaction {
1681                    inner: self.tx,
1682                    global_dbtx_access_token: Some(global_dbtx_access_token),
1683                    prefix,
1684                }),
1685                decoders: self.decoders,
1686                commit_tracker: self.commit_tracker,
1687                on_commit_hooks: self.on_commit_hooks,
1688                capability: self.capability,
1689            },
1690            global_dbtx_access_token,
1691        )
1692    }
1693
1694    /// Get [`DatabaseTransaction`] to `self`
1695    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1696    where
1697        's: 'a,
1698    {
1699        let decoders = self.decoders.clone();
1700
1701        DatabaseTransaction {
1702            tx: Box::new(&mut self.tx),
1703            decoders,
1704            commit_tracker: match self.commit_tracker {
1705                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1706                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1707            },
1708            on_commit_hooks: match self.on_commit_hooks {
1709                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1710                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1711            },
1712            capability: self.capability,
1713        }
1714    }
1715
1716    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1717    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1718    where
1719        'tx: 'a,
1720    {
1721        DatabaseTransaction {
1722            tx: Box::new(PrefixDatabaseTransaction {
1723                inner: &mut self.tx,
1724                global_dbtx_access_token: None,
1725                prefix,
1726            }),
1727            decoders: self.decoders.clone(),
1728            commit_tracker: match self.commit_tracker {
1729                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1730                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1731            },
1732            on_commit_hooks: match self.on_commit_hooks {
1733                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1734                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1735            },
1736            capability: self.capability,
1737        }
1738    }
1739
1740    pub fn to_ref_with_prefix_module_id<'a>(
1741        &'a mut self,
1742        module_instance_id: ModuleInstanceId,
1743    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1744    where
1745        'tx: 'a,
1746    {
1747        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1748        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1749        (
1750            DatabaseTransaction {
1751                tx: Box::new(PrefixDatabaseTransaction {
1752                    inner: &mut self.tx,
1753                    global_dbtx_access_token: Some(global_dbtx_access_token),
1754                    prefix,
1755                }),
1756                decoders: self.decoders.clone(),
1757                commit_tracker: match self.commit_tracker {
1758                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1759                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1760                },
1761                on_commit_hooks: match self.on_commit_hooks {
1762                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1763                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1764                },
1765                capability: self.capability,
1766            },
1767            global_dbtx_access_token,
1768        )
1769    }
1770
1771    /// Is this `Database` a global, unpartitioned `Database`
1772    pub fn is_global(&self) -> bool {
1773        self.tx.is_global()
1774    }
1775
1776    /// `Err` if [`Self::is_global`] is not true
1777    pub fn ensure_global(&self) -> Result<()> {
1778        if !self.is_global() {
1779            bail!("Database instance not global");
1780        }
1781
1782        Ok(())
1783    }
1784
1785    /// `Err` if [`Self::is_global`] is true
1786    pub fn ensure_isolated(&self) -> Result<()> {
1787        if self.is_global() {
1788            bail!("Database instance not isolated");
1789        }
1790
1791        Ok(())
1792    }
1793
1794    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1795    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1796        self.commit_tracker.ignore_uncommitted = true;
1797        self
1798    }
1799
1800    /// Create warnings about uncommitted writes
1801    pub fn warn_uncommitted(&mut self) -> &mut Self {
1802        self.commit_tracker.ignore_uncommitted = false;
1803        self
1804    }
1805
1806    /// Register a hook that will be run after commit succeeds.
1807    #[instrument(target = LOG_DB, level = "trace", skip_all)]
1808    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1809        self.on_commit_hooks.push(Box::new(f));
1810    }
1811
1812    pub fn global_dbtx<'a>(
1813        &'a mut self,
1814        access_token: GlobalDBTxAccessToken,
1815    ) -> DatabaseTransaction<'a, Cap>
1816    where
1817        'tx: 'a,
1818    {
1819        let decoders = self.decoders.clone();
1820
1821        DatabaseTransaction {
1822            tx: Box::new(self.tx.global_dbtx(access_token)),
1823            decoders,
1824            commit_tracker: match self.commit_tracker {
1825                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1826                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1827            },
1828            on_commit_hooks: match self.on_commit_hooks {
1829                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1830                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1831            },
1832            capability: self.capability,
1833        }
1834    }
1835}
1836
1837/// Code used to access `global_dbtx`
1838#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1839pub struct GlobalDBTxAccessToken(u32);
1840
1841impl GlobalDBTxAccessToken {
1842    /// Calculate an access code for accessing global_dbtx from a prefixed
1843    /// database tx
1844    ///
1845    /// Since we need to do it at runtime, we want the user modules not to be
1846    /// able to call `global_dbtx` too easily. But at the same time we don't
1847    /// need to be paranoid.
1848    ///
1849    /// This must be deterministic during whole instance of the software running
1850    /// (because it's being rederived independently in multiple codepahs) , but
1851    /// it could be somewhat randomized between different runs and releases.
1852    fn from_prefix(prefix: &[u8]) -> Self {
1853        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1854    }
1855}
1856
1857impl<'tx> DatabaseTransaction<'tx, Committable> {
1858    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1859        Self {
1860            tx: dbtx,
1861            decoders,
1862            commit_tracker: MaybeRef::Owned(CommitTracker {
1863                is_committed: false,
1864                has_writes: false,
1865                ignore_uncommitted: false,
1866            }),
1867            on_commit_hooks: MaybeRef::Owned(vec![]),
1868            capability: PhantomData,
1869        }
1870    }
1871
1872    pub async fn commit_tx_result(mut self) -> Result<()> {
1873        self.commit_tracker.is_committed = true;
1874        let commit_result = self.tx.commit_tx().await;
1875
1876        // Run commit hooks in case commit was successful
1877        if commit_result.is_ok() {
1878            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1879                hook();
1880            }
1881        }
1882
1883        commit_result
1884    }
1885
1886    pub async fn commit_tx(mut self) {
1887        self.commit_tracker.is_committed = true;
1888        self.commit_tx_result()
1889            .await
1890            .expect("Unrecoverable error occurred while committing to the database.");
1891    }
1892}
1893
1894#[apply(async_trait_maybe_send!)]
1895impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1896where
1897    Cap: Send,
1898{
1899    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1900        self.commit_tracker.has_writes = true;
1901        self.tx.raw_insert_bytes(key, value).await
1902    }
1903
1904    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1905        self.tx.raw_get_bytes(key).await
1906    }
1907
1908    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1909        self.tx.raw_remove_entry(key).await
1910    }
1911
1912    async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1913        self.tx.raw_find_by_range(key_range).await
1914    }
1915
1916    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1917        self.tx.raw_find_by_prefix(key_prefix).await
1918    }
1919
1920    async fn raw_find_by_prefix_sorted_descending(
1921        &mut self,
1922        key_prefix: &[u8],
1923    ) -> Result<PrefixStream<'_>> {
1924        self.tx
1925            .raw_find_by_prefix_sorted_descending(key_prefix)
1926            .await
1927    }
1928
1929    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1930        self.commit_tracker.has_writes = true;
1931        self.tx.raw_remove_by_prefix(key_prefix).await
1932    }
1933}
1934#[apply(async_trait_maybe_send!)]
1935impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {
1936    async fn set_tx_savepoint(&mut self) -> Result<()> {
1937        self.tx.set_tx_savepoint().await
1938    }
1939
1940    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1941        self.tx.rollback_tx_to_savepoint().await
1942    }
1943}
1944
1945impl<T> DatabaseKeyPrefix for T
1946where
1947    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1948{
1949    fn to_bytes(&self) -> Vec<u8> {
1950        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1951        data.append(&mut self.consensus_encode_to_vec());
1952        data
1953    }
1954}
1955
1956impl<T> DatabaseKey for T
1957where
1958    // Note: key can only be `T` that can be decoded without modules (even if
1959    // module type is `()`)
1960    T: DatabaseRecord + crate::encoding::Decodable + Sized,
1961{
1962    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1963    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1964        if data.is_empty() {
1965            // TODO: build better coding errors, pretty useless right now
1966            return Err(DecodingError::wrong_length(1, 0));
1967        }
1968
1969        if data[0] != Self::DB_PREFIX {
1970            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1971        }
1972
1973        <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1974            .map_err(|decode_error| DecodingError::Other(decode_error.0))
1975    }
1976}
1977
1978impl<T> DatabaseValue for T
1979where
1980    T: Debug + Encodable + Decodable,
1981{
1982    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1983        T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1984    }
1985
1986    fn to_bytes(&self) -> Vec<u8> {
1987        self.consensus_encode_to_vec()
1988    }
1989}
1990
1991/// This is a helper macro that generates the implementations of
1992/// `DatabaseRecord` necessary for reading/writing to the
1993/// database and fetching by prefix.
1994///
1995/// - `key`: This is the type of struct that will be used as the key into the
1996///   database
1997/// - `value`: This is the type of struct that will be used as the value into
1998///   the database
1999/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
2000///   prepended to this key
2001/// - `query_prefix`: Optional type of struct that can be passed zero or more
2002///   times. Every query prefix can be used to query the database via
2003///   `find_by_prefix`
2004///
2005/// # Examples
2006///
2007/// ```
2008/// use fedimint_core::encoding::{Decodable, Encodable};
2009/// use fedimint_core::impl_db_record;
2010///
2011/// #[derive(Debug, Encodable, Decodable)]
2012/// struct MyKey;
2013///
2014/// #[derive(Debug, Encodable, Decodable)]
2015/// struct MyValue;
2016///
2017/// #[repr(u8)]
2018/// #[derive(Clone, Debug)]
2019/// pub enum DbKeyPrefix {
2020///     MyKey = 0x50,
2021/// }
2022///
2023/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2024/// ```
2025///
2026/// Use the required parameters and specify one `query_prefix`
2027///
2028/// ```
2029/// use fedimint_core::encoding::{Decodable, Encodable};
2030/// use fedimint_core::{impl_db_lookup, impl_db_record};
2031///
2032/// #[derive(Debug, Encodable, Decodable)]
2033/// struct MyKey;
2034///
2035/// #[derive(Debug, Encodable, Decodable)]
2036/// struct MyValue;
2037///
2038/// #[repr(u8)]
2039/// #[derive(Clone, Debug)]
2040/// pub enum DbKeyPrefix {
2041///     MyKey = 0x50,
2042/// }
2043///
2044/// #[derive(Debug, Encodable, Decodable)]
2045/// struct MyKeyPrefix;
2046///
2047/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2048///
2049/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2050/// ```
2051#[macro_export]
2052macro_rules! impl_db_record {
2053    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2054        impl $crate::db::DatabaseRecord for $key {
2055            const DB_PREFIX: u8 = $db_prefix as u8;
2056            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2057            type Key = Self;
2058            type Value = $val;
2059        }
2060        $(
2061            impl_db_record! {
2062                @impl_notify_marker key = $key, notify_on_modify = $notify
2063            }
2064        )?
2065    };
2066    // if notify is set to true
2067    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2068        impl $crate::db::DatabaseKeyWithNotify for $key {}
2069    };
2070    // if notify is set to false
2071    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2072}
2073
2074#[macro_export]
2075macro_rules! impl_db_lookup{
2076    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2077        $(
2078            impl $crate::db::DatabaseLookup for $query_prefix {
2079                type Record = $key;
2080            }
2081        )*
2082    };
2083}
2084
2085/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2086#[derive(Debug, Encodable, Decodable, Serialize)]
2087pub struct DatabaseVersionKeyV0;
2088
2089#[derive(Debug, Encodable, Decodable, Serialize)]
2090pub struct DatabaseVersionKey(pub ModuleInstanceId);
2091
2092#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2093pub struct DatabaseVersion(pub u64);
2094
2095impl_db_record!(
2096    key = DatabaseVersionKeyV0,
2097    value = DatabaseVersion,
2098    db_prefix = DbKeyPrefix::DatabaseVersion
2099);
2100
2101impl_db_record!(
2102    key = DatabaseVersionKey,
2103    value = DatabaseVersion,
2104    db_prefix = DbKeyPrefix::DatabaseVersion
2105);
2106
2107impl std::fmt::Display for DatabaseVersion {
2108    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2109        write!(f, "{}", self.0)
2110    }
2111}
2112
2113impl DatabaseVersion {
2114    pub fn increment(&self) -> Self {
2115        Self(self.0 + 1)
2116    }
2117}
2118
2119impl std::fmt::Display for DbKeyPrefix {
2120    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2121        write!(f, "{self:?}")
2122    }
2123}
2124
2125#[repr(u8)]
2126#[derive(Clone, EnumIter, Debug)]
2127pub enum DbKeyPrefix {
2128    DatabaseVersion = 0x50,
2129    ClientBackup = 0x51,
2130}
2131
2132#[derive(Debug, Error)]
2133pub enum DecodingError {
2134    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2135    WrongPrefix { expected: u8, found: u8 },
2136    #[error("Key had a wrong length, expected {expected} but got {found}")]
2137    WrongLength { expected: usize, found: usize },
2138    #[error("Other decoding error: {0:#}")]
2139    Other(anyhow::Error),
2140}
2141
2142impl DecodingError {
2143    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2144        Self::Other(anyhow::Error::from(error))
2145    }
2146
2147    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2148        Self::WrongPrefix { expected, found }
2149    }
2150
2151    pub fn wrong_length(expected: usize, found: usize) -> Self {
2152        Self::WrongLength { expected, found }
2153    }
2154}
2155
2156#[macro_export]
2157macro_rules! push_db_pair_items {
2158    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2159        let db_items =
2160            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2161                .await
2162                .map(|(key, val)| {
2163                    (
2164                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2165                        val,
2166                    )
2167                })
2168                .collect::<BTreeMap<String, $value_type>>()
2169                .await;
2170
2171        $map.insert($key_literal.to_string(), Box::new(db_items));
2172    };
2173}
2174
2175#[macro_export]
2176macro_rules! push_db_key_items {
2177    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2178        let db_items =
2179            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2180                .await
2181                .map(|(key, _)| key)
2182                .collect::<Vec<$key_type>>()
2183                .await;
2184
2185        $map.insert($key_literal.to_string(), Box::new(db_items));
2186    };
2187}
2188
2189/// Context passed to the db migration _functions_ (pay attention to `Fn` in the
2190/// name)
2191///
2192/// Typically should not be referred to directly, and instead by a type-alias,
2193/// where the inner-context is set.
2194///
2195/// Notably it has the (optional) module id (innacessible to the modules
2196/// directly, but used internally) and an inner context `C` injected by the
2197/// outer-layer.
2198///
2199/// `C` is generic, as in different layers / scopes (server vs client, etc.) a
2200/// different (module-typed, type erased, server/client, etc.) contexts might be
2201/// needed, while the database migration logic is kind of generic over that.
2202pub struct DbMigrationFnContext<'tx, C> {
2203    dbtx: DatabaseTransaction<'tx>,
2204    module_instance_id: Option<ModuleInstanceId>,
2205    ctx: C,
2206    __please_use_constructor: (),
2207}
2208
2209impl<'tx, C> DbMigrationFnContext<'tx, C> {
2210    pub fn new(
2211        dbtx: DatabaseTransaction<'tx>,
2212        module_instance_id: Option<ModuleInstanceId>,
2213        ctx: C,
2214    ) -> Self {
2215        dbtx.ensure_global().expect("Must pass global dbtx");
2216        Self {
2217            dbtx,
2218            module_instance_id,
2219            ctx,
2220            // this is a constructor
2221            __please_use_constructor: (),
2222        }
2223    }
2224
2225    pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2226        DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2227    }
2228
2229    // TODO: this method is currently visible to the module itself, and it shouldn't
2230    #[doc(hidden)]
2231    pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2232        let Self { dbtx, ctx, .. } = self;
2233
2234        (dbtx, ctx)
2235    }
2236
2237    pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2238        if let Some(module_instance_id) = self.module_instance_id {
2239            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2240        } else {
2241            self.dbtx.to_ref_nc()
2242        }
2243    }
2244
2245    // TODO: this method is currently visible to the module itself, and it shouldn't
2246    #[doc(hidden)]
2247    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2248        self.module_instance_id
2249    }
2250}
2251
2252/// [`DbMigrationFn`] with no extra context (ATM gateway)
2253pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2254pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2255
2256/// [`DbMigrationFn`] used by core client
2257///
2258/// NOTE: client _module_ migrations are handled using separate structs due to
2259/// state machine migrations
2260pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2261pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2262
2263/// `CoreMigrationFn` that modules can implement to "migrate" the database
2264/// to the next database version.
2265///
2266/// It is parametrized over `C` (contents), which is extra data/type/interface
2267/// custom for different part of the codebase, e.g.:
2268///
2269/// * server core
2270/// * server modules
2271/// * client core
2272/// * gateway core
2273pub type DbMigrationFn<C> = Box<
2274    maybe_add_send_sync!(
2275        dyn for<'tx> Fn(
2276            DbMigrationFnContext<'tx, C>,
2277        ) -> Pin<
2278            Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2279        >
2280    ),
2281>;
2282
2283/// Verifies that all database migrations are defined contiguously and returns
2284/// the "current" database version, which is one greater than the last key in
2285/// the map.
2286pub fn get_current_database_version<F>(
2287    migrations: &BTreeMap<DatabaseVersion, F>,
2288) -> DatabaseVersion {
2289    let versions = migrations.keys().copied().collect::<Vec<_>>();
2290
2291    // Verify that all database migrations are defined contiguously. If there is a
2292    // gap, this indicates a programming error and we should panic.
2293    if !versions
2294        .windows(2)
2295        .all(|window| window[0].increment() == window[1])
2296    {
2297        panic!("Database Migrations are not defined contiguously");
2298    }
2299
2300    versions
2301        .last()
2302        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2303}
2304
2305pub async fn apply_migrations<C>(
2306    db: &Database,
2307    ctx: C,
2308    kind: String,
2309    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2310    module_instance_id: Option<ModuleInstanceId>,
2311    // When used in client side context, we can/should ignore keys that external app
2312    // is allowed to use, and but since this function is shared, we make it optional argument
2313    external_prefixes_above: Option<u8>,
2314) -> Result<(), anyhow::Error>
2315where
2316    C: Clone,
2317{
2318    let mut dbtx = db.begin_transaction().await;
2319    apply_migrations_dbtx(
2320        &mut dbtx.to_ref_nc(),
2321        ctx,
2322        kind,
2323        migrations,
2324        module_instance_id,
2325        external_prefixes_above,
2326    )
2327    .await?;
2328
2329    dbtx.commit_tx_result().await
2330}
2331/// `apply_migrations` iterates from the on disk database version for the
2332/// module.
2333///
2334/// `apply_migrations` iterates from the on disk database version for the module
2335/// up to `target_db_version` and executes all of the migrations that exist in
2336/// the migrations map. Each migration in migrations map updates the
2337/// database to have the correct on-disk structures that the code is expecting.
2338/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2339/// happen atomically). This function is called before the module is initialized
2340/// and as long as the correct migrations are supplied in the migrations map,
2341/// the module will be able to read and write from the database successfully.
2342pub async fn apply_migrations_dbtx<C>(
2343    global_dbtx: &mut DatabaseTransaction<'_>,
2344    ctx: C,
2345    kind: String,
2346    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2347    module_instance_id: Option<ModuleInstanceId>,
2348    // When used in client side context, we can/should ignore keys that external app
2349    // is allowed to use, and but since this function is shared, we make it optional argument
2350    external_prefixes_above: Option<u8>,
2351) -> Result<(), anyhow::Error>
2352where
2353    C: Clone,
2354{
2355    // Newly created databases will not have any data since they have just been
2356    // instantiated.
2357    let is_new_db = global_dbtx
2358        .raw_find_by_prefix(&[])
2359        .await?
2360        .filter(|(key, _v)| {
2361            std::future::ready(
2362                external_prefixes_above.is_none_or(|external_prefixes_above| {
2363                    !key.is_empty() && key[0] < external_prefixes_above
2364                }),
2365            )
2366        })
2367        .next()
2368        .await
2369        .is_none();
2370
2371    let target_db_version = get_current_database_version(&migrations);
2372
2373    // First write the database version to disk if it does not exist.
2374    create_database_version_dbtx(
2375        global_dbtx,
2376        target_db_version,
2377        module_instance_id,
2378        kind.clone(),
2379        is_new_db,
2380    )
2381    .await?;
2382
2383    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2384
2385    let disk_version = global_dbtx
2386        .get_value(&DatabaseVersionKey(module_instance_id_key))
2387        .await;
2388
2389    let db_version = if let Some(disk_version) = disk_version {
2390        let mut current_db_version = disk_version;
2391
2392        if current_db_version > target_db_version {
2393            return Err(anyhow::anyhow!(format!(
2394                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2395            )));
2396        }
2397
2398        while current_db_version < target_db_version {
2399            if let Some(migration) = migrations.get(&current_db_version) {
2400                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2401                migration(DbMigrationFnContext::new(
2402                    global_dbtx.to_ref_nc(),
2403                    module_instance_id,
2404                    ctx.clone(),
2405                ))
2406                .await?;
2407            } else {
2408                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2409            }
2410
2411            current_db_version = current_db_version.increment();
2412
2413            global_dbtx
2414                .insert_entry(
2415                    &DatabaseVersionKey(module_instance_id_key),
2416                    &current_db_version,
2417                )
2418                .await;
2419        }
2420
2421        current_db_version
2422    } else {
2423        target_db_version
2424    };
2425
2426    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2427    Ok(())
2428}
2429
2430pub async fn create_database_version(
2431    db: &Database,
2432    target_db_version: DatabaseVersion,
2433    module_instance_id: Option<ModuleInstanceId>,
2434    kind: String,
2435    is_new_db: bool,
2436) -> Result<(), anyhow::Error> {
2437    let mut dbtx = db.begin_transaction().await;
2438
2439    create_database_version_dbtx(
2440        &mut dbtx.to_ref_nc(),
2441        target_db_version,
2442        module_instance_id,
2443        kind,
2444        is_new_db,
2445    )
2446    .await?;
2447
2448    dbtx.commit_tx_result().await?;
2449    Ok(())
2450}
2451
2452/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2453/// necessary, this function will migrate the legacy database version to the
2454/// expected `DatabaseVersionKey`.
2455pub async fn create_database_version_dbtx(
2456    global_dbtx: &mut DatabaseTransaction<'_>,
2457    target_db_version: DatabaseVersion,
2458    module_instance_id: Option<ModuleInstanceId>,
2459    kind: String,
2460    is_new_db: bool,
2461) -> Result<(), anyhow::Error> {
2462    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2463
2464    // First check if the module has a `DatabaseVersion` written to
2465    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2466    // nothing to do.
2467    if global_dbtx
2468        .get_value(&DatabaseVersionKey(key_module_instance_id))
2469        .await
2470        .is_none()
2471    {
2472        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2473        // in the module's isolated namespace (but not for fedimint-server or
2474        // fedimint-client).
2475        //
2476        // Otherwise, if the previous database contains data and no legacy database
2477        // version, use `DatabaseVersion(0)` so that all database migrations are
2478        // run. Otherwise, this database can assumed to be new and can use
2479        // `target_db_version` to skip the database migrations.
2480        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2481            remove_current_db_version_if_exists(
2482                &mut global_dbtx
2483                    .to_ref_with_prefix_module_id(module_instance_id)
2484                    .0
2485                    .into_nc(),
2486                is_new_db,
2487                target_db_version,
2488            )
2489            .await
2490        } else {
2491            remove_current_db_version_if_exists(
2492                &mut global_dbtx.to_ref().into_nc(),
2493                is_new_db,
2494                target_db_version,
2495            )
2496            .await
2497        };
2498
2499        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2500        debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2501        global_dbtx
2502            .insert_new_entry(
2503                &DatabaseVersionKey(key_module_instance_id),
2504                &current_version_in_module,
2505            )
2506            .await;
2507    }
2508
2509    Ok(())
2510}
2511
2512/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2513/// returns the current database version. If the current version does not
2514/// exist, use `target_db_version` if the database is new. Otherwise, return
2515/// `DatabaseVersion(0)` to ensure all migrations are run.
2516async fn remove_current_db_version_if_exists(
2517    version_dbtx: &mut DatabaseTransaction<'_>,
2518    is_new_db: bool,
2519    target_db_version: DatabaseVersion,
2520) -> DatabaseVersion {
2521    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2522    // exist, just use the 0 for the version so that all of the migrations are
2523    // executed.
2524    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2525    match current_version_in_module {
2526        Some(database_version) => database_version,
2527        None if is_new_db => target_db_version,
2528        None => DatabaseVersion(0),
2529    }
2530}
2531
2532/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2533/// return 0xff for the global namespace.
2534fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2535    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2536    module_instance_id.map_or_else(
2537        || MODULE_GLOBAL_PREFIX.into(),
2538        |module_instance_id| module_instance_id,
2539    )
2540}
2541#[allow(unused_imports)]
2542mod test_utils {
2543    use std::collections::BTreeMap;
2544    use std::time::Duration;
2545
2546    use fedimint_core::db::DbMigrationFnContext;
2547    use futures::future::ready;
2548    use futures::{Future, FutureExt, StreamExt};
2549    use rand::Rng;
2550    use tokio::join;
2551
2552    use super::{
2553        Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2554        DbMigrationFn, apply_migrations,
2555    };
2556    use crate::core::ModuleKind;
2557    use crate::db::mem_impl::MemDatabase;
2558    use crate::db::{
2559        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2560    };
2561    use crate::encoding::{Decodable, Encodable};
2562    use crate::module::registry::ModuleDecoderRegistry;
2563
2564    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2565        crate::runtime::timeout(Duration::from_millis(10), fut)
2566            .await
2567            .ok()
2568    }
2569
2570    #[repr(u8)]
2571    #[derive(Clone)]
2572    pub enum TestDbKeyPrefix {
2573        Test = 0x42,
2574        AltTest = 0x43,
2575        PercentTestKey = 0x25,
2576    }
2577
2578    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2579    pub(super) struct TestKey(pub u64);
2580
2581    #[derive(Debug, Encodable, Decodable)]
2582    struct DbPrefixTestPrefix;
2583
2584    impl_db_record!(
2585        key = TestKey,
2586        value = TestVal,
2587        db_prefix = TestDbKeyPrefix::Test,
2588        notify_on_modify = true,
2589    );
2590    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2591
2592    #[derive(Debug, Encodable, Decodable)]
2593    struct TestKeyV0(u64, u64);
2594
2595    #[derive(Debug, Encodable, Decodable)]
2596    struct DbPrefixTestPrefixV0;
2597
2598    impl_db_record!(
2599        key = TestKeyV0,
2600        value = TestVal,
2601        db_prefix = TestDbKeyPrefix::Test,
2602    );
2603    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2604
2605    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2606    struct AltTestKey(u64);
2607
2608    #[derive(Debug, Encodable, Decodable)]
2609    struct AltDbPrefixTestPrefix;
2610
2611    impl_db_record!(
2612        key = AltTestKey,
2613        value = TestVal,
2614        db_prefix = TestDbKeyPrefix::AltTest,
2615    );
2616    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2617
2618    #[derive(Debug, Encodable, Decodable)]
2619    struct PercentTestKey(u64);
2620
2621    #[derive(Debug, Encodable, Decodable)]
2622    struct PercentPrefixTestPrefix;
2623
2624    impl_db_record!(
2625        key = PercentTestKey,
2626        value = TestVal,
2627        db_prefix = TestDbKeyPrefix::PercentTestKey,
2628    );
2629
2630    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2631    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2632    pub(super) struct TestVal(pub u64);
2633
2634    const TEST_MODULE_PREFIX: u16 = 1;
2635    const ALT_MODULE_PREFIX: u16 = 2;
2636
2637    pub async fn verify_insert_elements(db: Database) {
2638        let mut dbtx = db.begin_transaction().await;
2639        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2640        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2641        dbtx.commit_tx().await;
2642
2643        // Test values were persisted
2644        let mut dbtx = db.begin_transaction().await;
2645        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2646        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2647        dbtx.commit_tx().await;
2648
2649        // Test overwrites work as expected
2650        let mut dbtx = db.begin_transaction().await;
2651        assert_eq!(
2652            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2653            Some(TestVal(2))
2654        );
2655        assert_eq!(
2656            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2657            Some(TestVal(3))
2658        );
2659        dbtx.commit_tx().await;
2660
2661        let mut dbtx = db.begin_transaction().await;
2662        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2663        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2664        dbtx.commit_tx().await;
2665    }
2666
2667    pub async fn verify_remove_nonexisting(db: Database) {
2668        let mut dbtx = db.begin_transaction().await;
2669        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2670        let removed = dbtx.remove_entry(&TestKey(1)).await;
2671        assert!(removed.is_none());
2672
2673        // Commit to suppress the warning message
2674        dbtx.commit_tx().await;
2675    }
2676
2677    pub async fn verify_remove_existing(db: Database) {
2678        let mut dbtx = db.begin_transaction().await;
2679
2680        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2681
2682        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2683
2684        let removed = dbtx.remove_entry(&TestKey(1)).await;
2685        assert_eq!(removed, Some(TestVal(2)));
2686        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2687
2688        // Commit to suppress the warning message
2689        dbtx.commit_tx().await;
2690    }
2691
2692    pub async fn verify_read_own_writes(db: Database) {
2693        let mut dbtx = db.begin_transaction().await;
2694
2695        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2696
2697        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2698
2699        // Commit to suppress the warning message
2700        dbtx.commit_tx().await;
2701    }
2702
2703    pub async fn verify_prevent_dirty_reads(db: Database) {
2704        let mut dbtx = db.begin_transaction().await;
2705
2706        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2707
2708        // dbtx2 should not be able to see uncommitted changes
2709        let mut dbtx2 = db.begin_transaction().await;
2710        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2711
2712        // Commit to suppress the warning message
2713        dbtx.commit_tx().await;
2714    }
2715
2716    pub async fn verify_find_by_range(db: Database) {
2717        let mut dbtx = db.begin_transaction().await;
2718        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2719        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2720        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2721
2722        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2723        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2724
2725        {
2726            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2727            module_dbtx
2728                .insert_entry(&TestKey(300), &TestVal(3000))
2729                .await;
2730        }
2731
2732        dbtx.commit_tx().await;
2733
2734        // Verify finding by prefix returns the correct set of key pairs
2735        let mut dbtx = db.begin_transaction_nc().await;
2736
2737        let returned_keys = dbtx
2738            .find_by_range(TestKey(55)..TestKey(56))
2739            .await
2740            .collect::<Vec<_>>()
2741            .await;
2742
2743        let expected = vec![(TestKey(55), TestVal(9999))];
2744
2745        assert_eq!(returned_keys, expected);
2746
2747        let returned_keys = dbtx
2748            .find_by_range(TestKey(54)..TestKey(56))
2749            .await
2750            .collect::<Vec<_>>()
2751            .await;
2752
2753        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2754        assert_eq!(returned_keys, expected);
2755
2756        let returned_keys = dbtx
2757            .find_by_range(TestKey(54)..TestKey(57))
2758            .await
2759            .collect::<Vec<_>>()
2760            .await;
2761
2762        let expected = vec![
2763            (TestKey(54), TestVal(8888)),
2764            (TestKey(55), TestVal(9999)),
2765            (TestKey(56), TestVal(7777)),
2766        ];
2767        assert_eq!(returned_keys, expected);
2768
2769        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2770        let test_range = module_dbtx
2771            .find_by_range(TestKey(300)..TestKey(301))
2772            .await
2773            .collect::<Vec<_>>()
2774            .await;
2775        assert!(test_range.len() == 1);
2776    }
2777
2778    pub async fn verify_find_by_prefix(db: Database) {
2779        let mut dbtx = db.begin_transaction().await;
2780        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2781        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2782
2783        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2784        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2785        dbtx.commit_tx().await;
2786
2787        // Verify finding by prefix returns the correct set of key pairs
2788        let mut dbtx = db.begin_transaction().await;
2789
2790        let returned_keys = dbtx
2791            .find_by_prefix(&DbPrefixTestPrefix)
2792            .await
2793            .collect::<Vec<_>>()
2794            .await;
2795
2796        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2797        assert_eq!(returned_keys, expected);
2798
2799        let reversed = dbtx
2800            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2801            .await
2802            .collect::<Vec<_>>()
2803            .await;
2804        let mut reversed_expected = expected;
2805        reversed_expected.reverse();
2806        assert_eq!(reversed, reversed_expected);
2807
2808        let returned_keys = dbtx
2809            .find_by_prefix(&AltDbPrefixTestPrefix)
2810            .await
2811            .collect::<Vec<_>>()
2812            .await;
2813
2814        let expected = vec![
2815            (AltTestKey(54), TestVal(6666)),
2816            (AltTestKey(55), TestVal(7777)),
2817        ];
2818        assert_eq!(returned_keys, expected);
2819
2820        let reversed = dbtx
2821            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2822            .await
2823            .collect::<Vec<_>>()
2824            .await;
2825        let mut reversed_expected = expected;
2826        reversed_expected.reverse();
2827        assert_eq!(reversed, reversed_expected);
2828    }
2829
2830    pub async fn verify_commit(db: Database) {
2831        let mut dbtx = db.begin_transaction().await;
2832
2833        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2834        dbtx.commit_tx().await;
2835
2836        // Verify dbtx2 can see committed transactions
2837        let mut dbtx2 = db.begin_transaction().await;
2838        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2839    }
2840
2841    pub async fn verify_rollback_to_savepoint(db: Database) {
2842        let mut dbtx_rollback = db.begin_transaction().await;
2843
2844        dbtx_rollback
2845            .insert_entry(&TestKey(20), &TestVal(2000))
2846            .await;
2847
2848        dbtx_rollback
2849            .set_tx_savepoint()
2850            .await
2851            .expect("Error setting transaction savepoint");
2852
2853        dbtx_rollback
2854            .insert_entry(&TestKey(21), &TestVal(2001))
2855            .await;
2856
2857        assert_eq!(
2858            dbtx_rollback.get_value(&TestKey(20)).await,
2859            Some(TestVal(2000))
2860        );
2861        assert_eq!(
2862            dbtx_rollback.get_value(&TestKey(21)).await,
2863            Some(TestVal(2001))
2864        );
2865
2866        dbtx_rollback
2867            .rollback_tx_to_savepoint()
2868            .await
2869            .expect("Error setting transaction savepoint");
2870
2871        assert_eq!(
2872            dbtx_rollback.get_value(&TestKey(20)).await,
2873            Some(TestVal(2000))
2874        );
2875
2876        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2877
2878        // Commit to suppress the warning message
2879        dbtx_rollback.commit_tx().await;
2880    }
2881
2882    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2883        let mut dbtx = db.begin_transaction().await;
2884        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2885
2886        let mut dbtx2 = db.begin_transaction().await;
2887
2888        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2889
2890        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2891
2892        dbtx2.commit_tx().await;
2893
2894        // dbtx should still read None because it is operating over a snapshot
2895        // of the data when the transaction started
2896        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2897
2898        let expected_keys = 0;
2899        let returned_keys = dbtx
2900            .find_by_prefix(&DbPrefixTestPrefix)
2901            .await
2902            .fold(0, |returned_keys, (key, value)| async move {
2903                if key == TestKey(100) {
2904                    assert!(value.eq(&TestVal(101)));
2905                }
2906                returned_keys + 1
2907            })
2908            .await;
2909
2910        assert_eq!(returned_keys, expected_keys);
2911    }
2912
2913    pub async fn verify_snapshot_isolation(db: Database) {
2914        async fn random_yield() {
2915            let times = if rand::thread_rng().gen_bool(0.5) {
2916                0
2917            } else {
2918                10
2919            };
2920            for _ in 0..times {
2921                tokio::task::yield_now().await;
2922            }
2923        }
2924
2925        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2926        for i in 0..1000 {
2927            let base_key = i * 2;
2928            let tx_accepted_key = base_key;
2929            let spent_input_key = base_key + 1;
2930
2931            join!(
2932                async {
2933                    random_yield().await;
2934                    let mut dbtx = db.begin_transaction().await;
2935
2936                    random_yield().await;
2937                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2938                    random_yield().await;
2939                    // we have 4 operations that can give you the db key,
2940                    // try all of them
2941                    let s = match i % 5 {
2942                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2943                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2944                        2 => {
2945                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2946                                .await
2947                        }
2948                        3 => {
2949                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
2950                                .await
2951                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2952                                .map(|(_k, v)| v)
2953                                .next()
2954                                .await
2955                        }
2956                        4 => {
2957                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2958                                .await
2959                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2960                                .map(|(_k, v)| v)
2961                                .next()
2962                                .await
2963                        }
2964                        _ => {
2965                            panic!("woot?");
2966                        }
2967                    };
2968
2969                    match (a, s) {
2970                        (None, None) | (Some(_), Some(_)) => {}
2971                        (None, Some(_)) => panic!("none some?! {i}"),
2972                        (Some(_), None) => panic!("some none?! {i}"),
2973                    }
2974                },
2975                async {
2976                    random_yield().await;
2977
2978                    let mut dbtx = db.begin_transaction().await;
2979                    random_yield().await;
2980                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2981
2982                    random_yield().await;
2983                    assert_eq!(
2984                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2985                            .await,
2986                        None
2987                    );
2988
2989                    random_yield().await;
2990                    assert_eq!(
2991                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2992                            .await,
2993                        None
2994                    );
2995                    random_yield().await;
2996                    dbtx.commit_tx().await;
2997                }
2998            );
2999        }
3000    }
3001
3002    pub async fn verify_phantom_entry(db: Database) {
3003        let mut dbtx = db.begin_transaction().await;
3004
3005        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3006
3007        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3008
3009        dbtx.commit_tx().await;
3010
3011        let mut dbtx = db.begin_transaction().await;
3012        let expected_keys = 2;
3013        let returned_keys = dbtx
3014            .find_by_prefix(&DbPrefixTestPrefix)
3015            .await
3016            .fold(0, |returned_keys, (key, value)| async move {
3017                match key {
3018                    TestKey(100) => {
3019                        assert!(value.eq(&TestVal(101)));
3020                    }
3021                    TestKey(101) => {
3022                        assert!(value.eq(&TestVal(102)));
3023                    }
3024                    _ => {}
3025                }
3026                returned_keys + 1
3027            })
3028            .await;
3029
3030        assert_eq!(returned_keys, expected_keys);
3031
3032        let mut dbtx2 = db.begin_transaction().await;
3033
3034        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3035
3036        dbtx2.commit_tx().await;
3037
3038        let returned_keys = dbtx
3039            .find_by_prefix(&DbPrefixTestPrefix)
3040            .await
3041            .fold(0, |returned_keys, (key, value)| async move {
3042                match key {
3043                    TestKey(100) => {
3044                        assert!(value.eq(&TestVal(101)));
3045                    }
3046                    TestKey(101) => {
3047                        assert!(value.eq(&TestVal(102)));
3048                    }
3049                    _ => {}
3050                }
3051                returned_keys + 1
3052            })
3053            .await;
3054
3055        assert_eq!(returned_keys, expected_keys);
3056    }
3057
3058    pub async fn expect_write_conflict(db: Database) {
3059        let mut dbtx = db.begin_transaction().await;
3060        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3061        dbtx.commit_tx().await;
3062
3063        let mut dbtx2 = db.begin_transaction().await;
3064        let mut dbtx3 = db.begin_transaction().await;
3065
3066        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3067
3068        // Depending on if the database implementation supports optimistic or
3069        // pessimistic transactions, this test should generate an error here
3070        // (pessimistic) or at commit time (optimistic)
3071        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3072
3073        dbtx2.commit_tx().await;
3074        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3075    }
3076
3077    pub async fn verify_string_prefix(db: Database) {
3078        let mut dbtx = db.begin_transaction().await;
3079        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3080
3081        assert_eq!(
3082            dbtx.get_value(&PercentTestKey(100)).await,
3083            Some(TestVal(101))
3084        );
3085
3086        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3087
3088        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3089
3090        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3091
3092        // If the wildcard character ('%') is not handled properly, this will make
3093        // find_by_prefix return 5 results instead of 4
3094        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3095
3096        let expected_keys = 4;
3097        let returned_keys = dbtx
3098            .find_by_prefix(&PercentPrefixTestPrefix)
3099            .await
3100            .fold(0, |returned_keys, (key, value)| async move {
3101                if matches!(key, PercentTestKey(101)) {
3102                    assert!(value.eq(&TestVal(100)));
3103                }
3104                returned_keys + 1
3105            })
3106            .await;
3107
3108        assert_eq!(returned_keys, expected_keys);
3109    }
3110
3111    pub async fn verify_remove_by_prefix(db: Database) {
3112        let mut dbtx = db.begin_transaction().await;
3113
3114        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3115
3116        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3117
3118        dbtx.commit_tx().await;
3119
3120        let mut remove_dbtx = db.begin_transaction().await;
3121        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3122        remove_dbtx.commit_tx().await;
3123
3124        let mut dbtx = db.begin_transaction().await;
3125        let expected_keys = 0;
3126        let returned_keys = dbtx
3127            .find_by_prefix(&DbPrefixTestPrefix)
3128            .await
3129            .fold(0, |returned_keys, (key, value)| async move {
3130                match key {
3131                    TestKey(100) => {
3132                        assert!(value.eq(&TestVal(101)));
3133                    }
3134                    TestKey(101) => {
3135                        assert!(value.eq(&TestVal(102)));
3136                    }
3137                    _ => {}
3138                }
3139                returned_keys + 1
3140            })
3141            .await;
3142
3143        assert_eq!(returned_keys, expected_keys);
3144    }
3145
3146    pub async fn verify_module_db(db: Database, module_db: Database) {
3147        let mut dbtx = db.begin_transaction().await;
3148
3149        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3150
3151        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3152
3153        dbtx.commit_tx().await;
3154
3155        // verify module_dbtx can only read key/value pairs from its own module
3156        let mut module_dbtx = module_db.begin_transaction().await;
3157        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3158
3159        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3160
3161        // verify module_dbtx can read key/value pairs that it wrote
3162        let mut dbtx = db.begin_transaction().await;
3163        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3164
3165        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3166
3167        let mut module_dbtx = module_db.begin_transaction().await;
3168
3169        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3170
3171        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3172
3173        module_dbtx.commit_tx().await;
3174
3175        let expected_keys = 2;
3176        let mut dbtx = db.begin_transaction().await;
3177        let returned_keys = dbtx
3178            .find_by_prefix(&DbPrefixTestPrefix)
3179            .await
3180            .fold(0, |returned_keys, (key, value)| async move {
3181                match key {
3182                    TestKey(100) => {
3183                        assert!(value.eq(&TestVal(101)));
3184                    }
3185                    TestKey(101) => {
3186                        assert!(value.eq(&TestVal(102)));
3187                    }
3188                    _ => {}
3189                }
3190                returned_keys + 1
3191            })
3192            .await;
3193
3194        assert_eq!(returned_keys, expected_keys);
3195
3196        let removed = dbtx.remove_entry(&TestKey(100)).await;
3197        assert_eq!(removed, Some(TestVal(101)));
3198        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3199
3200        let mut module_dbtx = module_db.begin_transaction().await;
3201        assert_eq!(
3202            module_dbtx.get_value(&TestKey(100)).await,
3203            Some(TestVal(103))
3204        );
3205    }
3206
3207    pub async fn verify_module_prefix(db: Database) {
3208        let mut test_dbtx = db.begin_transaction().await;
3209        {
3210            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3211
3212            test_module_dbtx
3213                .insert_entry(&TestKey(100), &TestVal(101))
3214                .await;
3215
3216            test_module_dbtx
3217                .insert_entry(&TestKey(101), &TestVal(102))
3218                .await;
3219        }
3220
3221        test_dbtx.commit_tx().await;
3222
3223        let mut alt_dbtx = db.begin_transaction().await;
3224        {
3225            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3226
3227            alt_module_dbtx
3228                .insert_entry(&TestKey(100), &TestVal(103))
3229                .await;
3230
3231            alt_module_dbtx
3232                .insert_entry(&TestKey(101), &TestVal(104))
3233                .await;
3234        }
3235
3236        alt_dbtx.commit_tx().await;
3237
3238        // verify test_module_dbtx can only see key/value pairs from its own module
3239        let mut test_dbtx = db.begin_transaction().await;
3240        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3241        assert_eq!(
3242            test_module_dbtx.get_value(&TestKey(100)).await,
3243            Some(TestVal(101))
3244        );
3245
3246        assert_eq!(
3247            test_module_dbtx.get_value(&TestKey(101)).await,
3248            Some(TestVal(102))
3249        );
3250
3251        let expected_keys = 2;
3252        let returned_keys = test_module_dbtx
3253            .find_by_prefix(&DbPrefixTestPrefix)
3254            .await
3255            .fold(0, |returned_keys, (key, value)| async move {
3256                match key {
3257                    TestKey(100) => {
3258                        assert!(value.eq(&TestVal(101)));
3259                    }
3260                    TestKey(101) => {
3261                        assert!(value.eq(&TestVal(102)));
3262                    }
3263                    _ => {}
3264                }
3265                returned_keys + 1
3266            })
3267            .await;
3268
3269        assert_eq!(returned_keys, expected_keys);
3270
3271        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3272        assert_eq!(removed, Some(TestVal(101)));
3273        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3274
3275        // test_dbtx on its own wont find the key because it does not use a module
3276        // prefix
3277        let mut test_dbtx = db.begin_transaction().await;
3278        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3279
3280        test_dbtx.commit_tx().await;
3281    }
3282
3283    #[cfg(test)]
3284    #[tokio::test]
3285    pub async fn verify_test_migration() {
3286        // Insert a bunch of old dummy data that needs to be migrated to a new version
3287        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3288        let expected_test_keys_size: usize = 100;
3289        let mut dbtx = db.begin_transaction().await;
3290        for i in 0..expected_test_keys_size {
3291            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3292                .await;
3293        }
3294
3295        // Will also be migrated to `DatabaseVersionKey`
3296        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3297            .await;
3298        dbtx.commit_tx().await;
3299
3300        let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3301
3302        migrations.insert(
3303            DatabaseVersion(0),
3304            Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3305        );
3306
3307        apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3308            .await
3309            .expect("Error applying migrations for TestModule");
3310
3311        // Verify that the migrations completed successfully
3312        let mut dbtx = db.begin_transaction().await;
3313
3314        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3315        // to `DatabaseVersionKey`
3316        assert!(
3317            dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3318                .await
3319                .is_some()
3320        );
3321
3322        // Verify Dummy module migration
3323        let test_keys = dbtx
3324            .find_by_prefix(&DbPrefixTestPrefix)
3325            .await
3326            .collect::<Vec<_>>()
3327            .await;
3328        let test_keys_size = test_keys.len();
3329        assert_eq!(test_keys_size, expected_test_keys_size);
3330        for (key, val) in test_keys {
3331            assert_eq!(key.0, val.0 + 1);
3332        }
3333    }
3334
3335    #[allow(dead_code)]
3336    async fn migrate_test_db_version_0(
3337        mut ctx: DbMigrationFnContext<'_, ()>,
3338    ) -> Result<(), anyhow::Error> {
3339        let mut dbtx = ctx.dbtx();
3340        let example_keys_v0 = dbtx
3341            .find_by_prefix(&DbPrefixTestPrefixV0)
3342            .await
3343            .collect::<Vec<_>>()
3344            .await;
3345        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3346        for (key, val) in example_keys_v0 {
3347            let key_v2 = TestKey(key.1);
3348            dbtx.insert_new_entry(&key_v2, &val).await;
3349        }
3350        Ok(())
3351    }
3352
3353    #[cfg(test)]
3354    #[tokio::test]
3355    async fn test_autocommit() {
3356        use std::marker::PhantomData;
3357        use std::ops::Range;
3358        use std::path::Path;
3359
3360        use anyhow::anyhow;
3361        use async_trait::async_trait;
3362
3363        use crate::ModuleDecoderRegistry;
3364        use crate::db::{
3365            AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3366            IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3367            IRawDatabaseTransaction,
3368        };
3369
3370        #[derive(Debug)]
3371        struct FakeDatabase;
3372
3373        #[async_trait]
3374        impl IRawDatabase for FakeDatabase {
3375            type Transaction<'a> = FakeTransaction<'a>;
3376            async fn begin_transaction(&self) -> FakeTransaction {
3377                FakeTransaction(PhantomData)
3378            }
3379
3380            fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3381                Ok(())
3382            }
3383        }
3384
3385        #[derive(Debug)]
3386        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3387
3388        #[async_trait]
3389        impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3390            async fn raw_insert_bytes(
3391                &mut self,
3392                _key: &[u8],
3393                _value: &[u8],
3394            ) -> anyhow::Result<Option<Vec<u8>>> {
3395                unimplemented!()
3396            }
3397
3398            async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3399                unimplemented!()
3400            }
3401
3402            async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3403                unimplemented!()
3404            }
3405
3406            async fn raw_find_by_range(
3407                &mut self,
3408                _key_range: Range<&[u8]>,
3409            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3410                unimplemented!()
3411            }
3412
3413            async fn raw_find_by_prefix(
3414                &mut self,
3415                _key_prefix: &[u8],
3416            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3417                unimplemented!()
3418            }
3419
3420            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3421                unimplemented!()
3422            }
3423
3424            async fn raw_find_by_prefix_sorted_descending(
3425                &mut self,
3426                _key_prefix: &[u8],
3427            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3428                unimplemented!()
3429            }
3430        }
3431
3432        #[async_trait]
3433        impl IDatabaseTransactionOps for FakeTransaction<'_> {
3434            async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3435                unimplemented!()
3436            }
3437
3438            async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3439                unimplemented!()
3440            }
3441        }
3442
3443        #[async_trait]
3444        impl IRawDatabaseTransaction for FakeTransaction<'_> {
3445            async fn commit_tx(self) -> anyhow::Result<()> {
3446                Err(anyhow!("Can't commit!"))
3447            }
3448        }
3449
3450        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3451        let err = db
3452            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3453            .await
3454            .unwrap_err();
3455
3456        match err {
3457            AutocommitError::CommitFailed {
3458                attempts: failed_attempts,
3459                ..
3460            } => {
3461                assert_eq!(failed_attempts, 5);
3462            }
3463            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3464        }
3465    }
3466}
3467
3468pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3469    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3470    decoders: ModuleDecoderRegistry,
3471    key_prefix: &KP,
3472) -> impl Stream<
3473    Item = (
3474        KP::Record,
3475        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3476    ),
3477>
3478+ 'r
3479+ use<'r, KP>
3480where
3481    'inner: 'r,
3482    KP: DatabaseLookup,
3483    KP::Record: DatabaseKey,
3484{
3485    debug!(target: LOG_DB, "find by prefix sorted descending");
3486    let prefix_bytes = key_prefix.to_bytes();
3487    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3488        .await
3489        .expect("Error doing prefix search in database")
3490        .map(move |(key_bytes, value_bytes)| {
3491            let key = decode_key_expect(&key_bytes, &decoders);
3492            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3493            (key, value)
3494        })
3495}
3496
3497pub async fn verify_module_db_integrity_dbtx(
3498    dbtx: &mut DatabaseTransaction<'_>,
3499    module_id: ModuleInstanceId,
3500    module_kind: ModuleKind,
3501    prefixes: &BTreeSet<u8>,
3502) {
3503    let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3504    if module_id < 250 {
3505        assert_eq!(module_db_prefix.len(), 2);
3506    }
3507    let mut records = dbtx
3508        .raw_find_by_prefix(&module_db_prefix)
3509        .await
3510        .expect("DB fail");
3511    while let Some((k, v)) = records.next().await {
3512        assert!(
3513            prefixes.contains(&k[module_db_prefix.len()]),
3514            "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3515            k.as_hex(),
3516            v.as_hex()
3517        );
3518    }
3519}
3520
3521#[cfg(test)]
3522mod tests;