fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use bitcoin::hex::DisplayHex as _;
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use fedimint_util_error::FmtCompact as _;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144/// Result type for database operations
145pub type DatabaseResult<T> = std::result::Result<T, DatabaseError>;
146
147pub trait DatabaseKeyPrefix: Debug {
148    fn to_bytes(&self) -> Vec<u8>;
149}
150
151/// A key + value pair in the database with a unique prefix
152/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
153pub trait DatabaseRecord: DatabaseKeyPrefix {
154    const DB_PREFIX: u8;
155    const NOTIFY_ON_MODIFY: bool = false;
156    type Key: DatabaseKey + Debug;
157    type Value: DatabaseValue + Debug;
158}
159
160/// A key that can be used to query one or more `DatabaseRecord`
161/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
162pub trait DatabaseLookup: DatabaseKeyPrefix {
163    type Record: DatabaseRecord;
164}
165
166// Every `DatabaseRecord` is automatically a `DatabaseLookup`
167impl<Record> DatabaseLookup for Record
168where
169    Record: DatabaseRecord + Debug + Decodable + Encodable,
170{
171    type Record = Record;
172}
173
174/// `DatabaseKey` that represents the lookup structure for retrieving key/value
175/// pairs from the database.
176pub trait DatabaseKey: Sized {
177    /// Send a notification to tasks waiting to be notified if the value of
178    /// `DatabaseKey` is modified
179    ///
180    /// For instance, this can be used to be notified when a key in the
181    /// database is created. It is also possible to run a closure with the
182    /// value of the `DatabaseKey` as parameter to verify some changes to
183    /// that value.
184    const NOTIFY_ON_MODIFY: bool = false;
185    fn from_bytes(
186        data: &[u8],
187        modules: &ModuleDecoderRegistry,
188    ) -> std::result::Result<Self, DecodingError>;
189}
190
191/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
192pub trait DatabaseKeyWithNotify {}
193
194/// `DatabaseValue` that represents the value structure of database records.
195pub trait DatabaseValue: Sized + Debug {
196    fn from_bytes(
197        data: &[u8],
198        modules: &ModuleDecoderRegistry,
199    ) -> std::result::Result<Self, DecodingError>;
200    fn to_bytes(&self) -> Vec<u8>;
201}
202
203pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
204
205/// Just ignore this type, it's only there to make compiler happy
206///
207/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
208pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
209
210/// Error returned when the autocommit function fails
211#[derive(Debug, Error)]
212pub enum AutocommitError<E> {
213    /// Committing the transaction failed too many times, giving up
214    #[error("Commit Failed: {last_error}")]
215    CommitFailed {
216        /// Number of attempts
217        attempts: usize,
218        /// Last error on commit
219        last_error: DatabaseError,
220    },
221    /// Error returned by the closure provided to `autocommit`. If returned no
222    /// commit was attempted in that round
223    #[error("Closure error: {error}")]
224    ClosureError {
225        /// The attempt on which the closure returned an error
226        ///
227        /// Values other than 0 typically indicate a logic error since the
228        /// closure given to `autocommit` should not have side effects
229        /// and thus keep succeeding if it succeeded once.
230        attempts: usize,
231        /// Error returned by the closure
232        error: E,
233    },
234}
235
236pub trait AutocommitResultExt<T, E> {
237    /// Unwraps the "commit failed" error variant. Use this in cases where
238    /// autocommit is instructed to run indefinitely and commit will thus never
239    /// fail.
240    fn unwrap_autocommit(self) -> std::result::Result<T, E>;
241}
242
243impl<T, E> AutocommitResultExt<T, E> for std::result::Result<T, AutocommitError<E>> {
244    fn unwrap_autocommit(self) -> std::result::Result<T, E> {
245        match self {
246            Ok(value) => Ok(value),
247            Err(AutocommitError::CommitFailed { .. }) => {
248                panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
249            }
250            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
251        }
252    }
253}
254
255/// Raw database implementation
256///
257/// This and [`IRawDatabaseTransaction`] are meant to be implemented
258/// by crates like `fedimint-rocksdb` to provide a concrete implementation
259/// of a database to be used by Fedimint.
260///
261/// This is in contrast of [`IDatabase`] which includes extra
262/// functionality that Fedimint needs (and adds) on top of it.
263#[apply(async_trait_maybe_send!)]
264pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
265    /// A raw database transaction type
266    type Transaction<'a>: IRawDatabaseTransaction + Debug;
267
268    /// Start a database transaction
269    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
270
271    // Checkpoint the database to a backup directory
272    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
273}
274
275#[apply(async_trait_maybe_send!)]
276impl<T> IRawDatabase for Box<T>
277where
278    T: IRawDatabase,
279{
280    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
281
282    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
283        (**self).begin_transaction().await
284    }
285
286    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287        (**self).checkpoint(backup_path)
288    }
289}
290
291/// An extension trait with convenience operations on [`IRawDatabase`]
292pub trait IRawDatabaseExt: IRawDatabase + Sized {
293    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
294    ///
295    /// When type inference is not an issue, [`Into::into`] can be used instead.
296    fn into_database(self) -> Database {
297        Database::new(self, ModuleRegistry::default())
298    }
299}
300
301impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
302
303impl<T> From<T> for Database
304where
305    T: IRawDatabase,
306{
307    fn from(raw: T) -> Self {
308        Self::new(raw, ModuleRegistry::default())
309    }
310}
311
312/// A database that on top of a raw database operation, implements
313/// key notification system.
314#[apply(async_trait_maybe_send!)]
315pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
316    /// Start a database transaction
317    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
318    /// Register (and wait) for `key` updates
319    async fn register(&self, key: &[u8]);
320    /// Notify about `key` update (creation, modification, deletion)
321    async fn notify(&self, key: &[u8]);
322
323    /// The prefix len of this database refers to the global (as opposed to
324    /// module-isolated) key space
325    fn is_global(&self) -> bool;
326
327    /// Checkpoints the database to a backup directory
328    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
329}
330
331#[apply(async_trait_maybe_send!)]
332impl<T> IDatabase for Arc<T>
333where
334    T: IDatabase + ?Sized,
335{
336    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
337        (**self).begin_transaction().await
338    }
339    async fn register(&self, key: &[u8]) {
340        (**self).register(key).await;
341    }
342    async fn notify(&self, key: &[u8]) {
343        (**self).notify(key).await;
344    }
345
346    fn is_global(&self) -> bool {
347        (**self).is_global()
348    }
349
350    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
351        (**self).checkpoint(backup_path)
352    }
353}
354
355/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
356///
357/// Mostly notification system, but also run-time single-commit handling.
358struct BaseDatabase<RawDatabase> {
359    notifications: Arc<Notifications>,
360    raw: RawDatabase,
361}
362
363impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365        f.write_str("BaseDatabase")
366    }
367}
368
369#[apply(async_trait_maybe_send!)]
370impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
371    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
372        Box::new(BaseDatabaseTransaction::new(
373            self.raw.begin_transaction().await,
374            self.notifications.clone(),
375        ))
376    }
377    async fn register(&self, key: &[u8]) {
378        self.notifications.register(key).await;
379    }
380    async fn notify(&self, key: &[u8]) {
381        self.notifications.notify(key);
382    }
383
384    fn is_global(&self) -> bool {
385        true
386    }
387
388    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
389        self.raw.checkpoint(backup_path)
390    }
391}
392
393/// A public-facing newtype over `IDatabase`
394///
395/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
396/// and implements common utility function for auto-commits, db isolation,
397/// and other.
398#[derive(Clone, Debug)]
399pub struct Database {
400    inner: Arc<dyn IDatabase + 'static>,
401    module_decoders: ModuleDecoderRegistry,
402}
403
404impl Database {
405    pub fn strong_count(&self) -> usize {
406        Arc::strong_count(&self.inner)
407    }
408
409    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
410        self.inner
411    }
412}
413
414impl Database {
415    /// Creates a new Fedimint database from any object implementing
416    /// [`IDatabase`].
417    ///
418    /// See also [`Database::new_from_arc`].
419    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
420        let inner = BaseDatabase {
421            raw,
422            notifications: Arc::new(Notifications::new()),
423        };
424        Self::new_from_arc(
425            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
426            module_decoders,
427        )
428    }
429
430    /// Create [`Database`] from an already typed-erased `IDatabase`.
431    pub fn new_from_arc(
432        inner: Arc<dyn IDatabase + 'static>,
433        module_decoders: ModuleDecoderRegistry,
434    ) -> Self {
435        Self {
436            inner,
437            module_decoders,
438        }
439    }
440
441    /// Create [`Database`] isolated to a partition with a given `prefix`
442    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
443        Self {
444            inner: Arc::new(PrefixDatabase {
445                inner: self.inner.clone(),
446                global_dbtx_access_token: None,
447                prefix,
448            }),
449            module_decoders: self.module_decoders.clone(),
450        }
451    }
452
453    /// Create [`Database`] isolated to a partition with a prefix for a given
454    /// `module_instance_id`, allowing the module to access `global_dbtx` with
455    /// the right `access_token`
456    pub fn with_prefix_module_id(
457        &self,
458        module_instance_id: ModuleInstanceId,
459    ) -> (Self, GlobalDBTxAccessToken) {
460        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
461        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
462        (
463            Self {
464                inner: Arc::new(PrefixDatabase {
465                    inner: self.inner.clone(),
466                    global_dbtx_access_token: Some(global_dbtx_access_token),
467                    prefix,
468                }),
469                module_decoders: self.module_decoders.clone(),
470            },
471            global_dbtx_access_token,
472        )
473    }
474
475    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
476        Self {
477            inner: self.inner.clone(),
478            module_decoders,
479        }
480    }
481
482    /// Is this `Database` a global, unpartitioned `Database`
483    pub fn is_global(&self) -> bool {
484        self.inner.is_global()
485    }
486
487    /// `Err` if [`Self::is_global`] is not true
488    pub fn ensure_global(&self) -> DatabaseResult<()> {
489        if !self.is_global() {
490            return Err(DatabaseError::Other(anyhow::anyhow!(
491                "Database instance not global"
492            )));
493        }
494
495        Ok(())
496    }
497
498    /// `Err` if [`Self::is_global`] is true
499    pub fn ensure_isolated(&self) -> DatabaseResult<()> {
500        if self.is_global() {
501            return Err(DatabaseError::Other(anyhow::anyhow!(
502                "Database instance not isolated"
503            )));
504        }
505
506        Ok(())
507    }
508
509    /// Begin a new committable database transaction
510    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
511    where
512        's: 'tx,
513    {
514        DatabaseTransaction::<Committable>::new(
515            self.inner.begin_transaction().await,
516            self.module_decoders.clone(),
517        )
518    }
519
520    /// Begin a new non-committable database transaction
521    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
522    where
523        's: 'tx,
524    {
525        self.begin_transaction().await.into_nc()
526    }
527
528    pub fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
529        self.inner.checkpoint(backup_path)
530    }
531
532    /// Runs a closure with a reference to a database transaction and tries to
533    /// commit the transaction if the closure returns `Ok` and rolls it back
534    /// otherwise. If committing fails the closure is run for up to
535    /// `max_attempts` times. If `max_attempts` is `None` it will run
536    /// `usize::MAX` times which is close enough to infinite times.
537    ///
538    /// The closure `tx_fn` provided should not have side effects outside of the
539    /// database transaction provided, or if it does these should be
540    /// idempotent, since the closure might be run multiple times.
541    ///
542    /// # Lifetime Parameters
543    ///
544    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
545    /// mutable reference to the database transaction ensures that the
546    /// reference lives as least as long as the returned future of the
547    /// closure.
548    ///
549    /// Further, the reference to self (`'s`) must outlive the
550    /// `DatabaseTransaction<'dt>`. In other words, the
551    /// `DatabaseTransaction` must live as least as long as `self` and that is
552    /// true as the `DatabaseTransaction` is only dropped at the end of the
553    /// `loop{}`.
554    ///
555    /// # Panics
556    ///
557    /// This function panics when the given number of maximum attempts is zero.
558    /// `max_attempts` must be greater or equal to one.
559    pub async fn autocommit<'s, 'dbtx, F, T, E>(
560        &'s self,
561        tx_fn: F,
562        max_attempts: Option<usize>,
563    ) -> std::result::Result<T, AutocommitError<E>>
564    where
565        's: 'dbtx,
566        for<'r, 'o> F: Fn(
567            &'r mut DatabaseTransaction<'o>,
568            PhantomBound<'dbtx, 'o>,
569        ) -> BoxFuture<'r, std::result::Result<T, E>>,
570    {
571        assert_ne!(max_attempts, Some(0));
572        let mut curr_attempts: usize = 0;
573
574        loop {
575            // The `checked_add()` function is used to catch the `usize` overflow.
576            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
577            // after ~50 days. But if that's the case, something else must be wrong.
578            // With `usize=64bit` it would take much longer, obviously.
579            curr_attempts = curr_attempts
580                .checked_add(1)
581                .expect("db autocommit attempt counter overflowed");
582
583            let mut dbtx = self.begin_transaction().await;
584
585            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
586            let val = match tx_fn_res {
587                Ok(val) => val,
588                Err(err) => {
589                    dbtx.ignore_uncommitted();
590                    return Err(AutocommitError::ClosureError {
591                        attempts: curr_attempts,
592                        error: err,
593                    });
594                }
595            };
596
597            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
598
599            match dbtx.commit_tx_result().await {
600                Ok(()) => {
601                    return Ok(val);
602                }
603                Err(err) => {
604                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
605                        warn!(
606                            target: LOG_DB,
607                            curr_attempts,
608                            err = %err.fmt_compact(),
609                            "Database commit failed in an autocommit block - terminating"
610                        );
611                        return Err(AutocommitError::CommitFailed {
612                            attempts: curr_attempts,
613                            last_error: err,
614                        });
615                    }
616
617                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
618                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
619                    warn!(
620                        target: LOG_DB,
621                        curr_attempts,
622                        err = %err.fmt_compact(),
623                        delay_ms = %delay,
624                        "Database commit failed in an autocommit block - retrying"
625                    );
626                    crate::runtime::sleep(Duration::from_millis(delay)).await;
627                }
628            }
629        }
630    }
631
632    /// Waits for key to be notified.
633    ///
634    /// Calls the `checker` when value of the key may have changed.
635    /// Returns the value when `checker` returns a `Some(T)`.
636    pub async fn wait_key_check<'a, K, T>(
637        &'a self,
638        key: &K,
639        checker: impl Fn(Option<K::Value>) -> Option<T>,
640    ) -> (T, DatabaseTransaction<'a, Committable>)
641    where
642        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
643    {
644        let key_bytes = key.to_bytes();
645        loop {
646            // register for notification
647            let notify = self.inner.register(&key_bytes);
648
649            // check for value in db
650            let mut tx = self.inner.begin_transaction().await;
651
652            let maybe_value_bytes = tx
653                .raw_get_bytes(&key_bytes)
654                .await
655                .expect("Unrecoverable error when reading from database")
656                .map(|value_bytes| {
657                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
658                });
659
660            if let Some(value) = checker(maybe_value_bytes) {
661                return (
662                    value,
663                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
664                );
665            }
666
667            // key not found, try again
668            notify.await;
669            // if miss a notification between await and next register, it is
670            // fine. because we are going check the database
671        }
672    }
673
674    /// Waits for key to be present in database.
675    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
676    where
677        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
678    {
679        self.wait_key_check(key, std::convert::identity).await.0
680    }
681}
682
683fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
684    let mut bytes = vec![MODULE_GLOBAL_PREFIX];
685    bytes.append(&mut module_instance_id.consensus_encode_to_vec());
686    bytes
687}
688
689/// A database that wraps an `inner` one and adds a prefix to all operations,
690/// effectively creating an isolated partition.
691#[derive(Clone, Debug)]
692struct PrefixDatabase<Inner>
693where
694    Inner: Debug,
695{
696    prefix: Vec<u8>,
697    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
698    inner: Inner,
699}
700
701impl<Inner> PrefixDatabase<Inner>
702where
703    Inner: Debug,
704{
705    // TODO: we should optimize these concatenations, maybe by having an internal
706    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
707    // something
708    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
709        let mut full_key = self.prefix.clone();
710        full_key.extend_from_slice(key);
711        full_key
712    }
713}
714
715#[apply(async_trait_maybe_send!)]
716impl<Inner> IDatabase for PrefixDatabase<Inner>
717where
718    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
719{
720    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
721        Box::new(PrefixDatabaseTransaction {
722            inner: self.inner.begin_transaction().await,
723            global_dbtx_access_token: self.global_dbtx_access_token,
724            prefix: self.prefix.clone(),
725        })
726    }
727    async fn register(&self, key: &[u8]) {
728        self.inner.register(&self.get_full_key(key)).await;
729    }
730
731    async fn notify(&self, key: &[u8]) {
732        self.inner.notify(&self.get_full_key(key)).await;
733    }
734
735    fn is_global(&self) -> bool {
736        if self.global_dbtx_access_token.is_some() {
737            false
738        } else {
739            self.inner.is_global()
740        }
741    }
742
743    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
744        self.inner.checkpoint(backup_path)
745    }
746}
747
748/// A database transactions that wraps an `inner` one and adds a prefix to all
749/// operations, effectively creating an isolated partition.
750///
751/// Produced by [`PrefixDatabase`].
752#[derive(Debug)]
753struct PrefixDatabaseTransaction<Inner> {
754    inner: Inner,
755    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
756    prefix: Vec<u8>,
757}
758
759impl<Inner> PrefixDatabaseTransaction<Inner> {
760    // TODO: we should optimize these concatenations, maybe by having an internal
761    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
762    // something
763    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
764        let mut full_key = self.prefix.clone();
765        full_key.extend_from_slice(key);
766        full_key
767    }
768
769    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
770        Range {
771            start: self.get_full_key(range.start),
772            end: self.get_full_key(range.end),
773        }
774    }
775
776    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
777        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
778    }
779}
780
781#[apply(async_trait_maybe_send!)]
782impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
783where
784    Inner: IDatabaseTransaction,
785{
786    async fn commit_tx(&mut self) -> DatabaseResult<()> {
787        self.inner.commit_tx().await
788    }
789
790    fn is_global(&self) -> bool {
791        if self.global_dbtx_access_token.is_some() {
792            false
793        } else {
794            self.inner.is_global()
795        }
796    }
797
798    fn global_dbtx(
799        &mut self,
800        access_token: GlobalDBTxAccessToken,
801    ) -> &mut dyn IDatabaseTransaction {
802        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
803            assert_eq!(
804                access_token, self_global_dbtx_access_token,
805                "Invalid access key used to access global_dbtx"
806            );
807            &mut self.inner
808        } else {
809            self.inner.global_dbtx(access_token)
810        }
811    }
812}
813
814#[apply(async_trait_maybe_send!)]
815impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
816where
817    Inner: IDatabaseTransactionOpsCore,
818{
819    async fn raw_insert_bytes(
820        &mut self,
821        key: &[u8],
822        value: &[u8],
823    ) -> DatabaseResult<Option<Vec<u8>>> {
824        let key = self.get_full_key(key);
825        self.inner.raw_insert_bytes(&key, value).await
826    }
827
828    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
829        let key = self.get_full_key(key);
830        self.inner.raw_get_bytes(&key).await
831    }
832
833    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
834        let key = self.get_full_key(key);
835        self.inner.raw_remove_entry(&key).await
836    }
837
838    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
839        let key = self.get_full_key(key_prefix);
840        let stream = self.inner.raw_find_by_prefix(&key).await?;
841        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
842    }
843
844    async fn raw_find_by_prefix_sorted_descending(
845        &mut self,
846        key_prefix: &[u8],
847    ) -> DatabaseResult<PrefixStream<'_>> {
848        let key = self.get_full_key(key_prefix);
849        let stream = self
850            .inner
851            .raw_find_by_prefix_sorted_descending(&key)
852            .await?;
853        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
854    }
855
856    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
857        let range = self.get_full_range(range);
858        let stream = self
859            .inner
860            .raw_find_by_range(Range {
861                start: &range.start,
862                end: &range.end,
863            })
864            .await?;
865        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
866    }
867
868    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
869        let key = self.get_full_key(key_prefix);
870        self.inner.raw_remove_by_prefix(&key).await
871    }
872}
873
874#[apply(async_trait_maybe_send!)]
875impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
876where
877    Inner: IDatabaseTransactionOps,
878{
879    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
880        self.inner.rollback_tx_to_savepoint().await
881    }
882
883    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
884        self.set_tx_savepoint().await
885    }
886}
887
888/// Core raw a operations database transactions supports
889///
890/// Used to enforce the same signature on all types supporting it
891#[apply(async_trait_maybe_send!)]
892pub trait IDatabaseTransactionOpsCore: MaybeSend {
893    /// Insert entry
894    async fn raw_insert_bytes(
895        &mut self,
896        key: &[u8],
897        value: &[u8],
898    ) -> DatabaseResult<Option<Vec<u8>>>;
899
900    /// Get key value
901    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
902
903    /// Remove entry by `key`
904    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
905
906    /// Returns an stream of key-value pairs with keys that start with
907    /// `key_prefix`, sorted by key.
908    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>>;
909
910    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
911    async fn raw_find_by_prefix_sorted_descending(
912        &mut self,
913        key_prefix: &[u8],
914    ) -> DatabaseResult<PrefixStream<'_>>;
915
916    /// Returns an stream of key-value pairs with keys within a `range`, sorted
917    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
918    /// exclusively above.
919    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>>;
920
921    /// Delete keys matching prefix
922    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()>;
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for Box<T>
927where
928    T: IDatabaseTransactionOpsCore + ?Sized,
929{
930    async fn raw_insert_bytes(
931        &mut self,
932        key: &[u8],
933        value: &[u8],
934    ) -> DatabaseResult<Option<Vec<u8>>> {
935        (**self).raw_insert_bytes(key, value).await
936    }
937
938    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
939        (**self).raw_get_bytes(key).await
940    }
941
942    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
943        (**self).raw_remove_entry(key).await
944    }
945
946    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
947        (**self).raw_find_by_prefix(key_prefix).await
948    }
949
950    async fn raw_find_by_prefix_sorted_descending(
951        &mut self,
952        key_prefix: &[u8],
953    ) -> DatabaseResult<PrefixStream<'_>> {
954        (**self)
955            .raw_find_by_prefix_sorted_descending(key_prefix)
956            .await
957    }
958
959    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
960        (**self).raw_find_by_range(range).await
961    }
962
963    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
964        (**self).raw_remove_by_prefix(key_prefix).await
965    }
966}
967
968#[apply(async_trait_maybe_send!)]
969impl<T> IDatabaseTransactionOpsCore for &mut T
970where
971    T: IDatabaseTransactionOpsCore + ?Sized,
972{
973    async fn raw_insert_bytes(
974        &mut self,
975        key: &[u8],
976        value: &[u8],
977    ) -> DatabaseResult<Option<Vec<u8>>> {
978        (**self).raw_insert_bytes(key, value).await
979    }
980
981    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
982        (**self).raw_get_bytes(key).await
983    }
984
985    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
986        (**self).raw_remove_entry(key).await
987    }
988
989    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
990        (**self).raw_find_by_prefix(key_prefix).await
991    }
992
993    async fn raw_find_by_prefix_sorted_descending(
994        &mut self,
995        key_prefix: &[u8],
996    ) -> DatabaseResult<PrefixStream<'_>> {
997        (**self)
998            .raw_find_by_prefix_sorted_descending(key_prefix)
999            .await
1000    }
1001
1002    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
1003        (**self).raw_find_by_range(range).await
1004    }
1005
1006    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1007        (**self).raw_remove_by_prefix(key_prefix).await
1008    }
1009}
1010
1011/// Additional operations (only some) database transactions expose, on top of
1012/// [`IDatabaseTransactionOpsCore`]
1013///
1014/// In certain contexts exposing these operations would be a problem, so they
1015/// are moved to a separate trait.
1016#[apply(async_trait_maybe_send!)]
1017pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
1018    /// Create a savepoint during the transaction that can be rolled back to
1019    /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
1020    /// atomically remove the writes that were applied since the savepoint
1021    /// was created.
1022    ///
1023    /// Warning: Avoid using this in fedimint client code as not all database
1024    /// transaction implementations will support setting a savepoint during
1025    /// a transaction.
1026    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()>;
1027
1028    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()>;
1029}
1030
1031#[apply(async_trait_maybe_send!)]
1032impl<T> IDatabaseTransactionOps for Box<T>
1033where
1034    T: IDatabaseTransactionOps + ?Sized,
1035{
1036    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1037        (**self).set_tx_savepoint().await
1038    }
1039
1040    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1041        (**self).rollback_tx_to_savepoint().await
1042    }
1043}
1044
1045#[apply(async_trait_maybe_send!)]
1046impl<T> IDatabaseTransactionOps for &mut T
1047where
1048    T: IDatabaseTransactionOps + ?Sized,
1049{
1050    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1051        (**self).set_tx_savepoint().await
1052    }
1053
1054    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1055        (**self).rollback_tx_to_savepoint().await
1056    }
1057}
1058
1059/// Like [`IDatabaseTransactionOpsCore`], but typed
1060///
1061/// Implemented via blanket impl for everything that implements
1062/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1063/// [`WithDecoders`]).
1064#[apply(async_trait_maybe_send!)]
1065pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1066    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1067    where
1068        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1069
1070    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1071    where
1072        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1073        K::Value: MaybeSend + MaybeSync;
1074
1075    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1076    where
1077        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1078        K::Value: MaybeSend + MaybeSync;
1079
1080    async fn find_by_range<K>(
1081        &mut self,
1082        key_range: Range<K>,
1083    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1084    where
1085        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1086        K::Value: MaybeSend + MaybeSync;
1087
1088    async fn find_by_prefix<KP>(
1089        &mut self,
1090        key_prefix: &KP,
1091    ) -> Pin<
1092        Box<
1093            maybe_add_send!(
1094                dyn Stream<
1095                        Item = (
1096                            KP::Record,
1097                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1098                        ),
1099                    > + '_
1100            ),
1101        >,
1102    >
1103    where
1104        KP: DatabaseLookup + MaybeSend + MaybeSync,
1105        KP::Record: DatabaseKey;
1106
1107    async fn find_by_prefix_sorted_descending<KP>(
1108        &mut self,
1109        key_prefix: &KP,
1110    ) -> Pin<
1111        Box<
1112            maybe_add_send!(
1113                dyn Stream<
1114                        Item = (
1115                            KP::Record,
1116                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1117                        ),
1118                    > + '_
1119            ),
1120        >,
1121    >
1122    where
1123        KP: DatabaseLookup + MaybeSend + MaybeSync,
1124        KP::Record: DatabaseKey;
1125
1126    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1127    where
1128        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1129
1130    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1131    where
1132        KP: DatabaseLookup + MaybeSend + MaybeSync;
1133}
1134
1135// blanket implementation of typed ops for anything that implements raw ops and
1136// has decoders
1137#[apply(async_trait_maybe_send!)]
1138impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1139where
1140    T: IDatabaseTransactionOpsCore + WithDecoders,
1141{
1142    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1143    where
1144        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1145    {
1146        let key_bytes = key.to_bytes();
1147        let raw = self
1148            .raw_get_bytes(&key_bytes)
1149            .await
1150            .expect("Unrecoverable error occurred while reading and entry from the database");
1151        raw.map(|value_bytes| {
1152            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1153        })
1154    }
1155
1156    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1157    where
1158        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1159        K::Value: MaybeSend + MaybeSync,
1160    {
1161        let key_bytes = key.to_bytes();
1162        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1163            .await
1164            .expect("Unrecoverable error occurred while inserting entry into the database")
1165            .map(|value_bytes| {
1166                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1167            })
1168    }
1169
1170    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1171    where
1172        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1173        K::Value: MaybeSend + MaybeSync,
1174    {
1175        if let Some(prev) = self.insert_entry(key, value).await {
1176            panic!(
1177                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1178            );
1179        }
1180    }
1181
1182    async fn find_by_range<K>(
1183        &mut self,
1184        key_range: Range<K>,
1185    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1186    where
1187        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1188        K::Value: MaybeSend + MaybeSync,
1189    {
1190        let decoders = self.decoders().clone();
1191        Box::pin(
1192            self.raw_find_by_range(Range {
1193                start: &key_range.start.to_bytes(),
1194                end: &key_range.end.to_bytes(),
1195            })
1196            .await
1197            .expect("Unrecoverable error occurred while listing entries from the database")
1198            .map(move |(key_bytes, value_bytes)| {
1199                let key = decode_key_expect(&key_bytes, &decoders);
1200                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1201                (key, value)
1202            }),
1203        )
1204    }
1205
1206    async fn find_by_prefix<KP>(
1207        &mut self,
1208        key_prefix: &KP,
1209    ) -> Pin<
1210        Box<
1211            maybe_add_send!(
1212                dyn Stream<
1213                        Item = (
1214                            KP::Record,
1215                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1216                        ),
1217                    > + '_
1218            ),
1219        >,
1220    >
1221    where
1222        KP: DatabaseLookup + MaybeSend + MaybeSync,
1223        KP::Record: DatabaseKey,
1224    {
1225        let decoders = self.decoders().clone();
1226        Box::pin(
1227            self.raw_find_by_prefix(&key_prefix.to_bytes())
1228                .await
1229                .expect("Unrecoverable error occurred while listing entries from the database")
1230                .map(move |(key_bytes, value_bytes)| {
1231                    let key = decode_key_expect(&key_bytes, &decoders);
1232                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1233                    (key, value)
1234                }),
1235        )
1236    }
1237
1238    async fn find_by_prefix_sorted_descending<KP>(
1239        &mut self,
1240        key_prefix: &KP,
1241    ) -> Pin<
1242        Box<
1243            maybe_add_send!(
1244                dyn Stream<
1245                        Item = (
1246                            KP::Record,
1247                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1248                        ),
1249                    > + '_
1250            ),
1251        >,
1252    >
1253    where
1254        KP: DatabaseLookup + MaybeSend + MaybeSync,
1255        KP::Record: DatabaseKey,
1256    {
1257        let decoders = self.decoders().clone();
1258        Box::pin(
1259            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1260                .await
1261                .expect("Unrecoverable error occurred while listing entries from the database")
1262                .map(move |(key_bytes, value_bytes)| {
1263                    let key = decode_key_expect(&key_bytes, &decoders);
1264                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1265                    (key, value)
1266                }),
1267        )
1268    }
1269    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1270    where
1271        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1272    {
1273        let key_bytes = key.to_bytes();
1274        self.raw_remove_entry(&key_bytes)
1275            .await
1276            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1277            .map(|value_bytes| {
1278                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1279            })
1280    }
1281    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1282    where
1283        KP: DatabaseLookup + MaybeSend + MaybeSync,
1284    {
1285        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1286            .await
1287            .expect("Unrecoverable error when removing entries from the database");
1288    }
1289}
1290
1291/// A database type that has decoders, which allows it to implement
1292/// [`IDatabaseTransactionOpsCoreTyped`]
1293pub trait WithDecoders {
1294    fn decoders(&self) -> &ModuleDecoderRegistry;
1295}
1296
1297/// Raw database transaction (e.g. rocksdb implementation)
1298#[apply(async_trait_maybe_send!)]
1299pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1300    async fn commit_tx(self) -> DatabaseResult<()>;
1301}
1302
1303/// Fedimint database transaction
1304///
1305/// See [`IDatabase`] for more info.
1306#[apply(async_trait_maybe_send!)]
1307pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1308    /// Commit the transaction
1309    async fn commit_tx(&mut self) -> DatabaseResult<()>;
1310
1311    /// Is global database
1312    fn is_global(&self) -> bool;
1313
1314    /// Get the global database tx from a module-prefixed database transaction
1315    ///
1316    /// Meant to be called only by core internals, and module developers should
1317    /// not call it directly.
1318    #[doc(hidden)]
1319    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1320    -> &mut dyn IDatabaseTransaction;
1321}
1322
1323#[apply(async_trait_maybe_send!)]
1324impl<T> IDatabaseTransaction for Box<T>
1325where
1326    T: IDatabaseTransaction + ?Sized,
1327{
1328    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1329        (**self).commit_tx().await
1330    }
1331
1332    fn is_global(&self) -> bool {
1333        (**self).is_global()
1334    }
1335
1336    fn global_dbtx(
1337        &mut self,
1338        access_token: GlobalDBTxAccessToken,
1339    ) -> &mut dyn IDatabaseTransaction {
1340        (**self).global_dbtx(access_token)
1341    }
1342}
1343
1344#[apply(async_trait_maybe_send!)]
1345impl<'a, T> IDatabaseTransaction for &'a mut T
1346where
1347    T: IDatabaseTransaction + ?Sized,
1348{
1349    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1350        (**self).commit_tx().await
1351    }
1352
1353    fn is_global(&self) -> bool {
1354        (**self).is_global()
1355    }
1356
1357    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1358        (**self).global_dbtx(access_key)
1359    }
1360}
1361
1362/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1363/// easier in other structs since it does not consumed `self` by move.
1364struct BaseDatabaseTransaction<Tx> {
1365    // TODO: merge options
1366    raw: Option<Tx>,
1367    notify_queue: Option<NotifyQueue>,
1368    notifications: Arc<Notifications>,
1369}
1370
1371impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1372where
1373    Tx: fmt::Debug,
1374{
1375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1376        f.write_fmt(format_args!(
1377            "BaseDatabaseTransaction{{ raw={:?} }}",
1378            self.raw
1379        ))
1380    }
1381}
1382impl<Tx> BaseDatabaseTransaction<Tx>
1383where
1384    Tx: IRawDatabaseTransaction,
1385{
1386    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1387        Self {
1388            raw: Some(dbtx),
1389            notifications,
1390            notify_queue: Some(NotifyQueue::new()),
1391        }
1392    }
1393
1394    fn add_notification_key(&mut self, key: &[u8]) -> DatabaseResult<()> {
1395        self.notify_queue
1396            .as_mut()
1397            .ok_or(DatabaseError::TransactionConsumed)?
1398            .add(key);
1399        Ok(())
1400    }
1401}
1402
1403#[apply(async_trait_maybe_send!)]
1404impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1405    async fn raw_insert_bytes(
1406        &mut self,
1407        key: &[u8],
1408        value: &[u8],
1409    ) -> DatabaseResult<Option<Vec<u8>>> {
1410        self.add_notification_key(key)?;
1411        self.raw
1412            .as_mut()
1413            .ok_or(DatabaseError::TransactionConsumed)?
1414            .raw_insert_bytes(key, value)
1415            .await
1416    }
1417
1418    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1419        self.raw
1420            .as_mut()
1421            .ok_or(DatabaseError::TransactionConsumed)?
1422            .raw_get_bytes(key)
1423            .await
1424    }
1425
1426    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1427        self.add_notification_key(key)?;
1428        self.raw
1429            .as_mut()
1430            .ok_or(DatabaseError::TransactionConsumed)?
1431            .raw_remove_entry(key)
1432            .await
1433    }
1434
1435    async fn raw_find_by_range(
1436        &mut self,
1437        key_range: Range<&[u8]>,
1438    ) -> DatabaseResult<PrefixStream<'_>> {
1439        self.raw
1440            .as_mut()
1441            .ok_or(DatabaseError::TransactionConsumed)?
1442            .raw_find_by_range(key_range)
1443            .await
1444    }
1445
1446    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1447        self.raw
1448            .as_mut()
1449            .ok_or(DatabaseError::TransactionConsumed)?
1450            .raw_find_by_prefix(key_prefix)
1451            .await
1452    }
1453
1454    async fn raw_find_by_prefix_sorted_descending(
1455        &mut self,
1456        key_prefix: &[u8],
1457    ) -> DatabaseResult<PrefixStream<'_>> {
1458        self.raw
1459            .as_mut()
1460            .ok_or(DatabaseError::TransactionConsumed)?
1461            .raw_find_by_prefix_sorted_descending(key_prefix)
1462            .await
1463    }
1464
1465    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1466        self.raw
1467            .as_mut()
1468            .ok_or(DatabaseError::TransactionConsumed)?
1469            .raw_remove_by_prefix(key_prefix)
1470            .await
1471    }
1472}
1473
1474#[apply(async_trait_maybe_send!)]
1475impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1476    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1477        self.raw
1478            .as_mut()
1479            .ok_or(DatabaseError::TransactionConsumed)?
1480            .rollback_tx_to_savepoint()
1481            .await?;
1482        Ok(())
1483    }
1484
1485    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1486        self.raw
1487            .as_mut()
1488            .ok_or(DatabaseError::TransactionConsumed)?
1489            .set_tx_savepoint()
1490            .await?;
1491        Ok(())
1492    }
1493}
1494
1495#[apply(async_trait_maybe_send!)]
1496impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1497    for BaseDatabaseTransaction<Tx>
1498{
1499    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1500        self.raw
1501            .take()
1502            .ok_or(DatabaseError::TransactionConsumed)?
1503            .commit_tx()
1504            .await?;
1505        self.notifications.submit_queue(
1506            &self
1507                .notify_queue
1508                .take()
1509                .expect("commit must be called only once"),
1510        );
1511        Ok(())
1512    }
1513
1514    fn is_global(&self) -> bool {
1515        true
1516    }
1517
1518    fn global_dbtx(
1519        &mut self,
1520        _access_token: GlobalDBTxAccessToken,
1521    ) -> &mut dyn IDatabaseTransaction {
1522        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1523    }
1524}
1525
1526/// A helper for tracking and logging on `Drop` any instances of uncommitted
1527/// writes
1528#[derive(Clone)]
1529struct CommitTracker {
1530    /// Is the dbtx committed
1531    is_committed: bool,
1532    /// Does the dbtx have any writes
1533    has_writes: bool,
1534    /// Don't warn-log uncommitted writes
1535    ignore_uncommitted: bool,
1536}
1537
1538impl Drop for CommitTracker {
1539    fn drop(&mut self) {
1540        if self.has_writes && !self.is_committed {
1541            if self.ignore_uncommitted {
1542                trace!(
1543                    target: LOG_DB,
1544                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1545                );
1546            } else {
1547                warn!(
1548                    target: LOG_DB,
1549                    location = ?backtrace::Backtrace::new(),
1550                    "DatabaseTransaction has writes and has not called commit."
1551                );
1552            }
1553        }
1554    }
1555}
1556
1557enum MaybeRef<'a, T> {
1558    Owned(T),
1559    Borrowed(&'a mut T),
1560}
1561
1562impl<T> ops::Deref for MaybeRef<'_, T> {
1563    type Target = T;
1564
1565    fn deref(&self) -> &Self::Target {
1566        match self {
1567            MaybeRef::Owned(o) => o,
1568            MaybeRef::Borrowed(r) => r,
1569        }
1570    }
1571}
1572
1573impl<T> ops::DerefMut for MaybeRef<'_, T> {
1574    fn deref_mut(&mut self) -> &mut Self::Target {
1575        match self {
1576            MaybeRef::Owned(o) => o,
1577            MaybeRef::Borrowed(r) => r,
1578        }
1579    }
1580}
1581
1582/// Session type for [`DatabaseTransaction`] that is allowed to commit
1583///
1584/// Opposite of [`NonCommittable`].
1585pub struct Committable;
1586
1587/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1588///
1589/// Opposite of [`Committable`].
1590pub struct NonCommittable;
1591
1592/// A high level database transaction handle
1593///
1594/// `Cap` is a session type
1595pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1596    tx: Box<dyn IDatabaseTransaction + 'tx>,
1597    decoders: ModuleDecoderRegistry,
1598    commit_tracker: MaybeRef<'tx, CommitTracker>,
1599    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1600    capability: marker::PhantomData<Cap>,
1601}
1602
1603impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1604    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1605        f.write_fmt(format_args!(
1606            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1607            self.tx, self.decoders
1608        ))
1609    }
1610}
1611
1612impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1613    fn decoders(&self) -> &ModuleDecoderRegistry {
1614        &self.decoders
1615    }
1616}
1617
1618#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1619fn decode_value<V: DatabaseValue>(
1620    value_bytes: &[u8],
1621    decoders: &ModuleDecoderRegistry,
1622) -> std::result::Result<V, DecodingError> {
1623    trace!(
1624        bytes = %AbbreviateHexBytes(value_bytes),
1625        "decoding value",
1626    );
1627    V::from_bytes(value_bytes, decoders)
1628}
1629
1630#[track_caller]
1631fn decode_value_expect<V: DatabaseValue>(
1632    value_bytes: &[u8],
1633    decoders: &ModuleDecoderRegistry,
1634    key_bytes: &[u8],
1635) -> V {
1636    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1637        panic!(
1638            "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1639            any::type_name::<V>(),
1640            err,
1641            AbbreviateHexBytes(key_bytes),
1642            AbbreviateHexBytes(value_bytes),
1643        )
1644    })
1645}
1646
1647#[track_caller]
1648fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1649    trace!(
1650        bytes = %AbbreviateHexBytes(key_bytes),
1651        "decoding key",
1652    );
1653    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1654        panic!(
1655            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1656            any::type_name::<K>(),
1657            err,
1658            AbbreviateHexBytes(key_bytes)
1659        )
1660    })
1661}
1662
1663impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1664    /// Convert into a non-committable version
1665    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1666        DatabaseTransaction {
1667            tx: self.tx,
1668            decoders: self.decoders,
1669            commit_tracker: self.commit_tracker,
1670            on_commit_hooks: self.on_commit_hooks,
1671            capability: PhantomData::<NonCommittable>,
1672        }
1673    }
1674
1675    /// Get a reference to a non-committeable version
1676    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1677    where
1678        's: 'a,
1679    {
1680        self.to_ref().into_nc()
1681    }
1682
1683    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1684    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1685    where
1686        'tx: 'a,
1687    {
1688        DatabaseTransaction {
1689            tx: Box::new(PrefixDatabaseTransaction {
1690                inner: self.tx,
1691                global_dbtx_access_token: None,
1692                prefix,
1693            }),
1694            decoders: self.decoders,
1695            commit_tracker: self.commit_tracker,
1696            on_commit_hooks: self.on_commit_hooks,
1697            capability: self.capability,
1698        }
1699    }
1700
1701    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1702    /// `module_instance_id`, allowing the module to access global_dbtx
1703    /// with the right access token.
1704    pub fn with_prefix_module_id<'a: 'tx>(
1705        self,
1706        module_instance_id: ModuleInstanceId,
1707    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1708    where
1709        'tx: 'a,
1710    {
1711        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1712        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1713        (
1714            DatabaseTransaction {
1715                tx: Box::new(PrefixDatabaseTransaction {
1716                    inner: self.tx,
1717                    global_dbtx_access_token: Some(global_dbtx_access_token),
1718                    prefix,
1719                }),
1720                decoders: self.decoders,
1721                commit_tracker: self.commit_tracker,
1722                on_commit_hooks: self.on_commit_hooks,
1723                capability: self.capability,
1724            },
1725            global_dbtx_access_token,
1726        )
1727    }
1728
1729    /// Get [`DatabaseTransaction`] to `self`
1730    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1731    where
1732        's: 'a,
1733    {
1734        let decoders = self.decoders.clone();
1735
1736        DatabaseTransaction {
1737            tx: Box::new(&mut self.tx),
1738            decoders,
1739            commit_tracker: match self.commit_tracker {
1740                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1741                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1742            },
1743            on_commit_hooks: match self.on_commit_hooks {
1744                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1745                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1746            },
1747            capability: self.capability,
1748        }
1749    }
1750
1751    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1752    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1753    where
1754        'tx: 'a,
1755    {
1756        DatabaseTransaction {
1757            tx: Box::new(PrefixDatabaseTransaction {
1758                inner: &mut self.tx,
1759                global_dbtx_access_token: None,
1760                prefix,
1761            }),
1762            decoders: self.decoders.clone(),
1763            commit_tracker: match self.commit_tracker {
1764                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1765                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1766            },
1767            on_commit_hooks: match self.on_commit_hooks {
1768                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1769                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1770            },
1771            capability: self.capability,
1772        }
1773    }
1774
1775    pub fn to_ref_with_prefix_module_id<'a>(
1776        &'a mut self,
1777        module_instance_id: ModuleInstanceId,
1778    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1779    where
1780        'tx: 'a,
1781    {
1782        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1783        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1784        (
1785            DatabaseTransaction {
1786                tx: Box::new(PrefixDatabaseTransaction {
1787                    inner: &mut self.tx,
1788                    global_dbtx_access_token: Some(global_dbtx_access_token),
1789                    prefix,
1790                }),
1791                decoders: self.decoders.clone(),
1792                commit_tracker: match self.commit_tracker {
1793                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1794                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1795                },
1796                on_commit_hooks: match self.on_commit_hooks {
1797                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1798                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1799                },
1800                capability: self.capability,
1801            },
1802            global_dbtx_access_token,
1803        )
1804    }
1805
1806    /// Is this `Database` a global, unpartitioned `Database`
1807    pub fn is_global(&self) -> bool {
1808        self.tx.is_global()
1809    }
1810
1811    /// `Err` if [`Self::is_global`] is not true
1812    pub fn ensure_global(&self) -> DatabaseResult<()> {
1813        if !self.is_global() {
1814            return Err(DatabaseError::Other(anyhow::anyhow!(
1815                "Database instance not global"
1816            )));
1817        }
1818
1819        Ok(())
1820    }
1821
1822    /// `Err` if [`Self::is_global`] is true
1823    pub fn ensure_isolated(&self) -> DatabaseResult<()> {
1824        if self.is_global() {
1825            return Err(DatabaseError::Other(anyhow::anyhow!(
1826                "Database instance not isolated"
1827            )));
1828        }
1829
1830        Ok(())
1831    }
1832
1833    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1834    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1835        self.commit_tracker.ignore_uncommitted = true;
1836        self
1837    }
1838
1839    /// Create warnings about uncommitted writes
1840    pub fn warn_uncommitted(&mut self) -> &mut Self {
1841        self.commit_tracker.ignore_uncommitted = false;
1842        self
1843    }
1844
1845    /// Register a hook that will be run after commit succeeds.
1846    #[instrument(target = LOG_DB, level = "trace", skip_all)]
1847    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1848        self.on_commit_hooks.push(Box::new(f));
1849    }
1850
1851    pub fn global_dbtx<'a>(
1852        &'a mut self,
1853        access_token: GlobalDBTxAccessToken,
1854    ) -> DatabaseTransaction<'a, Cap>
1855    where
1856        'tx: 'a,
1857    {
1858        let decoders = self.decoders.clone();
1859
1860        DatabaseTransaction {
1861            tx: Box::new(self.tx.global_dbtx(access_token)),
1862            decoders,
1863            commit_tracker: match self.commit_tracker {
1864                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1865                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1866            },
1867            on_commit_hooks: match self.on_commit_hooks {
1868                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1869                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1870            },
1871            capability: self.capability,
1872        }
1873    }
1874}
1875
1876/// Code used to access `global_dbtx`
1877#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1878pub struct GlobalDBTxAccessToken(u32);
1879
1880impl GlobalDBTxAccessToken {
1881    /// Calculate an access code for accessing global_dbtx from a prefixed
1882    /// database tx
1883    ///
1884    /// Since we need to do it at runtime, we want the user modules not to be
1885    /// able to call `global_dbtx` too easily. But at the same time we don't
1886    /// need to be paranoid.
1887    ///
1888    /// This must be deterministic during whole instance of the software running
1889    /// (because it's being rederived independently in multiple codepahs) , but
1890    /// it could be somewhat randomized between different runs and releases.
1891    fn from_prefix(prefix: &[u8]) -> Self {
1892        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1893    }
1894}
1895
1896impl<'tx> DatabaseTransaction<'tx, Committable> {
1897    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1898        Self {
1899            tx: dbtx,
1900            decoders,
1901            commit_tracker: MaybeRef::Owned(CommitTracker {
1902                is_committed: false,
1903                has_writes: false,
1904                ignore_uncommitted: false,
1905            }),
1906            on_commit_hooks: MaybeRef::Owned(vec![]),
1907            capability: PhantomData,
1908        }
1909    }
1910
1911    pub async fn commit_tx_result(mut self) -> DatabaseResult<()> {
1912        self.commit_tracker.is_committed = true;
1913        let commit_result = self.tx.commit_tx().await;
1914
1915        // Run commit hooks in case commit was successful
1916        if commit_result.is_ok() {
1917            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1918                hook();
1919            }
1920        }
1921
1922        commit_result
1923    }
1924
1925    pub async fn commit_tx(mut self) {
1926        self.commit_tracker.is_committed = true;
1927        self.commit_tx_result()
1928            .await
1929            .expect("Unrecoverable error occurred while committing to the database.");
1930    }
1931}
1932
1933#[apply(async_trait_maybe_send!)]
1934impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1935where
1936    Cap: Send,
1937{
1938    async fn raw_insert_bytes(
1939        &mut self,
1940        key: &[u8],
1941        value: &[u8],
1942    ) -> DatabaseResult<Option<Vec<u8>>> {
1943        self.commit_tracker.has_writes = true;
1944        self.tx.raw_insert_bytes(key, value).await
1945    }
1946
1947    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1948        self.tx.raw_get_bytes(key).await
1949    }
1950
1951    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1952        self.tx.raw_remove_entry(key).await
1953    }
1954
1955    async fn raw_find_by_range(
1956        &mut self,
1957        key_range: Range<&[u8]>,
1958    ) -> DatabaseResult<PrefixStream<'_>> {
1959        self.tx.raw_find_by_range(key_range).await
1960    }
1961
1962    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1963        self.tx.raw_find_by_prefix(key_prefix).await
1964    }
1965
1966    async fn raw_find_by_prefix_sorted_descending(
1967        &mut self,
1968        key_prefix: &[u8],
1969    ) -> DatabaseResult<PrefixStream<'_>> {
1970        self.tx
1971            .raw_find_by_prefix_sorted_descending(key_prefix)
1972            .await
1973    }
1974
1975    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1976        self.commit_tracker.has_writes = true;
1977        self.tx.raw_remove_by_prefix(key_prefix).await
1978    }
1979}
1980#[apply(async_trait_maybe_send!)]
1981impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {
1982    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1983        self.tx.set_tx_savepoint().await
1984    }
1985
1986    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1987        self.tx.rollback_tx_to_savepoint().await
1988    }
1989}
1990
1991impl<T> DatabaseKeyPrefix for T
1992where
1993    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1994{
1995    fn to_bytes(&self) -> Vec<u8> {
1996        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1997        data.append(&mut self.consensus_encode_to_vec());
1998        data
1999    }
2000}
2001
2002impl<T> DatabaseKey for T
2003where
2004    // Note: key can only be `T` that can be decoded without modules (even if
2005    // module type is `()`)
2006    T: DatabaseRecord + crate::encoding::Decodable + Sized,
2007{
2008    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
2009    fn from_bytes(
2010        data: &[u8],
2011        modules: &ModuleDecoderRegistry,
2012    ) -> std::result::Result<Self, DecodingError> {
2013        if data.is_empty() {
2014            // TODO: build better coding errors, pretty useless right now
2015            return Err(DecodingError::wrong_length(1, 0));
2016        }
2017
2018        if data[0] != Self::DB_PREFIX {
2019            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
2020        }
2021
2022        <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
2023            .map_err(|decode_error| DecodingError::Other(decode_error.0))
2024    }
2025}
2026
2027impl<T> DatabaseValue for T
2028where
2029    T: Debug + Encodable + Decodable,
2030{
2031    fn from_bytes(
2032        data: &[u8],
2033        modules: &ModuleDecoderRegistry,
2034    ) -> std::result::Result<Self, DecodingError> {
2035        T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
2036    }
2037
2038    fn to_bytes(&self) -> Vec<u8> {
2039        self.consensus_encode_to_vec()
2040    }
2041}
2042
2043/// This is a helper macro that generates the implementations of
2044/// `DatabaseRecord` necessary for reading/writing to the
2045/// database and fetching by prefix.
2046///
2047/// - `key`: This is the type of struct that will be used as the key into the
2048///   database
2049/// - `value`: This is the type of struct that will be used as the value into
2050///   the database
2051/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
2052///   prepended to this key
2053/// - `query_prefix`: Optional type of struct that can be passed zero or more
2054///   times. Every query prefix can be used to query the database via
2055///   `find_by_prefix`
2056///
2057/// # Examples
2058///
2059/// ```
2060/// use fedimint_core::encoding::{Decodable, Encodable};
2061/// use fedimint_core::impl_db_record;
2062///
2063/// #[derive(Debug, Encodable, Decodable)]
2064/// struct MyKey;
2065///
2066/// #[derive(Debug, Encodable, Decodable)]
2067/// struct MyValue;
2068///
2069/// #[repr(u8)]
2070/// #[derive(Clone, Debug)]
2071/// pub enum DbKeyPrefix {
2072///     MyKey = 0x50,
2073/// }
2074///
2075/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2076/// ```
2077///
2078/// Use the required parameters and specify one `query_prefix`
2079///
2080/// ```
2081/// use fedimint_core::encoding::{Decodable, Encodable};
2082/// use fedimint_core::{impl_db_lookup, impl_db_record};
2083///
2084/// #[derive(Debug, Encodable, Decodable)]
2085/// struct MyKey;
2086///
2087/// #[derive(Debug, Encodable, Decodable)]
2088/// struct MyValue;
2089///
2090/// #[repr(u8)]
2091/// #[derive(Clone, Debug)]
2092/// pub enum DbKeyPrefix {
2093///     MyKey = 0x50,
2094/// }
2095///
2096/// #[derive(Debug, Encodable, Decodable)]
2097/// struct MyKeyPrefix;
2098///
2099/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2100///
2101/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2102/// ```
2103#[macro_export]
2104macro_rules! impl_db_record {
2105    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2106        impl $crate::db::DatabaseRecord for $key {
2107            const DB_PREFIX: u8 = $db_prefix as u8;
2108            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2109            type Key = Self;
2110            type Value = $val;
2111        }
2112        $(
2113            impl_db_record! {
2114                @impl_notify_marker key = $key, notify_on_modify = $notify
2115            }
2116        )?
2117    };
2118    // if notify is set to true
2119    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2120        impl $crate::db::DatabaseKeyWithNotify for $key {}
2121    };
2122    // if notify is set to false
2123    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2124}
2125
2126#[macro_export]
2127macro_rules! impl_db_lookup{
2128    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2129        $(
2130            impl $crate::db::DatabaseLookup for $query_prefix {
2131                type Record = $key;
2132            }
2133        )*
2134    };
2135}
2136
2137/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2138#[derive(Debug, Encodable, Decodable, Serialize)]
2139pub struct DatabaseVersionKeyV0;
2140
2141#[derive(Debug, Encodable, Decodable, Serialize)]
2142pub struct DatabaseVersionKey(pub ModuleInstanceId);
2143
2144#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2145pub struct DatabaseVersion(pub u64);
2146
2147impl_db_record!(
2148    key = DatabaseVersionKeyV0,
2149    value = DatabaseVersion,
2150    db_prefix = DbKeyPrefix::DatabaseVersion
2151);
2152
2153impl_db_record!(
2154    key = DatabaseVersionKey,
2155    value = DatabaseVersion,
2156    db_prefix = DbKeyPrefix::DatabaseVersion
2157);
2158
2159impl std::fmt::Display for DatabaseVersion {
2160    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2161        write!(f, "{}", self.0)
2162    }
2163}
2164
2165impl DatabaseVersion {
2166    pub fn increment(&self) -> Self {
2167        Self(self.0 + 1)
2168    }
2169}
2170
2171impl std::fmt::Display for DbKeyPrefix {
2172    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2173        write!(f, "{self:?}")
2174    }
2175}
2176
2177#[repr(u8)]
2178#[derive(Clone, EnumIter, Debug)]
2179pub enum DbKeyPrefix {
2180    DatabaseVersion = 0x50,
2181    ClientBackup = 0x51,
2182}
2183
2184#[derive(Debug, Error)]
2185pub enum DecodingError {
2186    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2187    WrongPrefix { expected: u8, found: u8 },
2188    #[error("Key had a wrong length, expected {expected} but got {found}")]
2189    WrongLength { expected: usize, found: usize },
2190    #[error("Other decoding error: {0:#}")]
2191    Other(anyhow::Error),
2192}
2193
2194impl DecodingError {
2195    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2196        Self::Other(anyhow::Error::from(error))
2197    }
2198
2199    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2200        Self::WrongPrefix { expected, found }
2201    }
2202
2203    pub fn wrong_length(expected: usize, found: usize) -> Self {
2204        Self::WrongLength { expected, found }
2205    }
2206}
2207
2208/// Error type for database operations
2209#[derive(Debug, Error)]
2210pub enum DatabaseError {
2211    /// Write-write conflict during optimistic transaction commit.
2212    /// This occurs when two transactions attempt to modify the same key.
2213    #[error("Write-write conflict detected")]
2214    WriteConflict,
2215
2216    /// The transaction has already been consumed (committed or dropped).
2217    /// Operations cannot be performed on a consumed transaction.
2218    #[error("Transaction already consumed")]
2219    TransactionConsumed,
2220
2221    /// Error from the underlying database backend (e.g., RocksDB I/O errors).
2222    #[error("Database backend error: {0}")]
2223    DatabaseBackend(#[from] Box<dyn Error + Send + Sync>),
2224
2225    /// Other database error
2226    #[error("Database error: {0:#}")]
2227    Other(anyhow::Error),
2228}
2229
2230impl DatabaseError {
2231    /// Create a DatabaseError from any error type
2232    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2233        Self::Other(anyhow::Error::from(error))
2234    }
2235
2236    /// Create a DatabaseBackend error from any error type
2237    pub fn backend<E: Error + Send + Sync + 'static>(error: E) -> Self {
2238        Self::DatabaseBackend(Box::new(error))
2239    }
2240}
2241
2242impl From<anyhow::Error> for DatabaseError {
2243    fn from(error: anyhow::Error) -> Self {
2244        Self::Other(error)
2245    }
2246}
2247
2248#[macro_export]
2249macro_rules! push_db_pair_items {
2250    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2251        let db_items =
2252            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2253                .await
2254                .map(|(key, val)| {
2255                    (
2256                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2257                        val,
2258                    )
2259                })
2260                .collect::<BTreeMap<String, $value_type>>()
2261                .await;
2262
2263        $map.insert($key_literal.to_string(), Box::new(db_items));
2264    };
2265}
2266
2267#[macro_export]
2268macro_rules! push_db_key_items {
2269    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2270        let db_items =
2271            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2272                .await
2273                .map(|(key, _)| key)
2274                .collect::<Vec<$key_type>>()
2275                .await;
2276
2277        $map.insert($key_literal.to_string(), Box::new(db_items));
2278    };
2279}
2280
2281/// Context passed to the db migration _functions_ (pay attention to `Fn` in the
2282/// name)
2283///
2284/// Typically should not be referred to directly, and instead by a type-alias,
2285/// where the inner-context is set.
2286///
2287/// Notably it has the (optional) module id (inaccessible to the modules
2288/// directly, but used internally) and an inner context `C` injected by the
2289/// outer-layer.
2290///
2291/// `C` is generic, as in different layers / scopes (server vs client, etc.) a
2292/// different (module-typed, type erased, server/client, etc.) contexts might be
2293/// needed, while the database migration logic is kind of generic over that.
2294pub struct DbMigrationFnContext<'tx, C> {
2295    dbtx: DatabaseTransaction<'tx>,
2296    module_instance_id: Option<ModuleInstanceId>,
2297    ctx: C,
2298    __please_use_constructor: (),
2299}
2300
2301impl<'tx, C> DbMigrationFnContext<'tx, C> {
2302    pub fn new(
2303        dbtx: DatabaseTransaction<'tx>,
2304        module_instance_id: Option<ModuleInstanceId>,
2305        ctx: C,
2306    ) -> Self {
2307        dbtx.ensure_global().expect("Must pass global dbtx");
2308        Self {
2309            dbtx,
2310            module_instance_id,
2311            ctx,
2312            // this is a constructor
2313            __please_use_constructor: (),
2314        }
2315    }
2316
2317    pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2318        DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2319    }
2320
2321    // TODO: this method is currently visible to the module itself, and it shouldn't
2322    #[doc(hidden)]
2323    pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2324        let Self { dbtx, ctx, .. } = self;
2325
2326        (dbtx, ctx)
2327    }
2328
2329    pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2330        if let Some(module_instance_id) = self.module_instance_id {
2331            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2332        } else {
2333            self.dbtx.to_ref_nc()
2334        }
2335    }
2336
2337    // TODO: this method is currently visible to the module itself, and it shouldn't
2338    #[doc(hidden)]
2339    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2340        self.module_instance_id
2341    }
2342}
2343
2344/// [`DbMigrationFn`] with no extra context (ATM gateway)
2345pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2346pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2347
2348/// [`DbMigrationFn`] used by core client
2349///
2350/// NOTE: client _module_ migrations are handled using separate structs due to
2351/// state machine migrations
2352pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2353pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2354
2355/// `CoreMigrationFn` that modules can implement to "migrate" the database
2356/// to the next database version.
2357///
2358/// It is parametrized over `C` (contents), which is extra data/type/interface
2359/// custom for different part of the codebase, e.g.:
2360///
2361/// * server core
2362/// * server modules
2363/// * client core
2364/// * gateway core
2365pub type DbMigrationFn<C> = Box<
2366    maybe_add_send_sync!(
2367        dyn for<'tx> Fn(
2368            DbMigrationFnContext<'tx, C>,
2369        ) -> Pin<
2370            Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2371        >
2372    ),
2373>;
2374
2375/// Verifies that all database migrations are defined contiguously and returns
2376/// the "current" database version, which is one greater than the last key in
2377/// the map.
2378pub fn get_current_database_version<F>(
2379    migrations: &BTreeMap<DatabaseVersion, F>,
2380) -> DatabaseVersion {
2381    let versions = migrations.keys().copied().collect::<Vec<_>>();
2382
2383    // Verify that all database migrations are defined contiguously. If there is a
2384    // gap, this indicates a programming error and we should panic.
2385    if !versions
2386        .windows(2)
2387        .all(|window| window[0].increment() == window[1])
2388    {
2389        panic!("Database Migrations are not defined contiguously");
2390    }
2391
2392    versions
2393        .last()
2394        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2395}
2396
2397pub async fn apply_migrations<C>(
2398    db: &Database,
2399    ctx: C,
2400    kind: String,
2401    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2402    module_instance_id: Option<ModuleInstanceId>,
2403    // When used in client side context, we can/should ignore keys that external app
2404    // is allowed to use, and but since this function is shared, we make it optional argument
2405    external_prefixes_above: Option<u8>,
2406) -> std::result::Result<(), anyhow::Error>
2407where
2408    C: Clone,
2409{
2410    let mut dbtx = db.begin_transaction().await;
2411    apply_migrations_dbtx(
2412        &mut dbtx.to_ref_nc(),
2413        ctx,
2414        kind,
2415        migrations,
2416        module_instance_id,
2417        external_prefixes_above,
2418    )
2419    .await?;
2420
2421    dbtx.commit_tx_result()
2422        .await
2423        .map_err(|e| anyhow::Error::msg(e.to_string()))
2424}
2425/// `apply_migrations` iterates from the on disk database version for the
2426/// module.
2427///
2428/// `apply_migrations` iterates from the on disk database version for the module
2429/// up to `target_db_version` and executes all of the migrations that exist in
2430/// the migrations map. Each migration in migrations map updates the
2431/// database to have the correct on-disk structures that the code is expecting.
2432/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2433/// happen atomically). This function is called before the module is initialized
2434/// and as long as the correct migrations are supplied in the migrations map,
2435/// the module will be able to read and write from the database successfully.
2436pub async fn apply_migrations_dbtx<C>(
2437    global_dbtx: &mut DatabaseTransaction<'_>,
2438    ctx: C,
2439    kind: String,
2440    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2441    module_instance_id: Option<ModuleInstanceId>,
2442    // When used in client side context, we can/should ignore keys that external app
2443    // is allowed to use, and but since this function is shared, we make it optional argument
2444    external_prefixes_above: Option<u8>,
2445) -> std::result::Result<(), anyhow::Error>
2446where
2447    C: Clone,
2448{
2449    // Newly created databases will not have any data since they have just been
2450    // instantiated.
2451    let is_new_db = global_dbtx
2452        .raw_find_by_prefix(&[])
2453        .await?
2454        .filter(|(key, _v)| {
2455            std::future::ready(
2456                external_prefixes_above.is_none_or(|external_prefixes_above| {
2457                    !key.is_empty() && key[0] < external_prefixes_above
2458                }),
2459            )
2460        })
2461        .next()
2462        .await
2463        .is_none();
2464
2465    let target_db_version = get_current_database_version(&migrations);
2466
2467    // First write the database version to disk if it does not exist.
2468    create_database_version_dbtx(
2469        global_dbtx,
2470        target_db_version,
2471        module_instance_id,
2472        kind.clone(),
2473        is_new_db,
2474    )
2475    .await?;
2476
2477    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2478
2479    let disk_version = global_dbtx
2480        .get_value(&DatabaseVersionKey(module_instance_id_key))
2481        .await;
2482
2483    let db_version = if let Some(disk_version) = disk_version {
2484        let mut current_db_version = disk_version;
2485
2486        if current_db_version > target_db_version {
2487            return Err(anyhow::anyhow!(format!(
2488                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2489            )));
2490        }
2491
2492        while current_db_version < target_db_version {
2493            if let Some(migration) = migrations.get(&current_db_version) {
2494                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2495                migration(DbMigrationFnContext::new(
2496                    global_dbtx.to_ref_nc(),
2497                    module_instance_id,
2498                    ctx.clone(),
2499                ))
2500                .await?;
2501            } else {
2502                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2503            }
2504
2505            current_db_version = current_db_version.increment();
2506
2507            global_dbtx
2508                .insert_entry(
2509                    &DatabaseVersionKey(module_instance_id_key),
2510                    &current_db_version,
2511                )
2512                .await;
2513        }
2514
2515        current_db_version
2516    } else {
2517        target_db_version
2518    };
2519
2520    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2521    Ok(())
2522}
2523
2524pub async fn create_database_version(
2525    db: &Database,
2526    target_db_version: DatabaseVersion,
2527    module_instance_id: Option<ModuleInstanceId>,
2528    kind: String,
2529    is_new_db: bool,
2530) -> std::result::Result<(), anyhow::Error> {
2531    let mut dbtx = db.begin_transaction().await;
2532
2533    create_database_version_dbtx(
2534        &mut dbtx.to_ref_nc(),
2535        target_db_version,
2536        module_instance_id,
2537        kind,
2538        is_new_db,
2539    )
2540    .await?;
2541
2542    dbtx.commit_tx_result().await?;
2543    Ok(())
2544}
2545
2546/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2547/// necessary, this function will migrate the legacy database version to the
2548/// expected `DatabaseVersionKey`.
2549pub async fn create_database_version_dbtx(
2550    global_dbtx: &mut DatabaseTransaction<'_>,
2551    target_db_version: DatabaseVersion,
2552    module_instance_id: Option<ModuleInstanceId>,
2553    kind: String,
2554    is_new_db: bool,
2555) -> std::result::Result<(), anyhow::Error> {
2556    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2557
2558    // First check if the module has a `DatabaseVersion` written to
2559    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2560    // nothing to do.
2561    if global_dbtx
2562        .get_value(&DatabaseVersionKey(key_module_instance_id))
2563        .await
2564        .is_none()
2565    {
2566        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2567        // in the module's isolated namespace (but not for fedimint-server or
2568        // fedimint-client).
2569        //
2570        // Otherwise, if the previous database contains data and no legacy database
2571        // version, use `DatabaseVersion(0)` so that all database migrations are
2572        // run. Otherwise, this database can assumed to be new and can use
2573        // `target_db_version` to skip the database migrations.
2574        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2575            remove_current_db_version_if_exists(
2576                &mut global_dbtx
2577                    .to_ref_with_prefix_module_id(module_instance_id)
2578                    .0
2579                    .into_nc(),
2580                is_new_db,
2581                target_db_version,
2582            )
2583            .await
2584        } else {
2585            remove_current_db_version_if_exists(
2586                &mut global_dbtx.to_ref().into_nc(),
2587                is_new_db,
2588                target_db_version,
2589            )
2590            .await
2591        };
2592
2593        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2594        debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2595        global_dbtx
2596            .insert_new_entry(
2597                &DatabaseVersionKey(key_module_instance_id),
2598                &current_version_in_module,
2599            )
2600            .await;
2601    }
2602
2603    Ok(())
2604}
2605
2606/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2607/// returns the current database version. If the current version does not
2608/// exist, use `target_db_version` if the database is new. Otherwise, return
2609/// `DatabaseVersion(0)` to ensure all migrations are run.
2610async fn remove_current_db_version_if_exists(
2611    version_dbtx: &mut DatabaseTransaction<'_>,
2612    is_new_db: bool,
2613    target_db_version: DatabaseVersion,
2614) -> DatabaseVersion {
2615    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2616    // exist, just use the 0 for the version so that all of the migrations are
2617    // executed.
2618    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2619    match current_version_in_module {
2620        Some(database_version) => database_version,
2621        None if is_new_db => target_db_version,
2622        None => DatabaseVersion(0),
2623    }
2624}
2625
2626/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2627/// return 0xff for the global namespace.
2628fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2629    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2630    module_instance_id.map_or_else(
2631        || MODULE_GLOBAL_PREFIX.into(),
2632        |module_instance_id| module_instance_id,
2633    )
2634}
2635#[allow(unused_imports)]
2636mod test_utils {
2637    use std::collections::BTreeMap;
2638    use std::time::Duration;
2639
2640    use fedimint_core::db::DbMigrationFnContext;
2641    use futures::future::ready;
2642    use futures::{Future, FutureExt, StreamExt};
2643    use rand::Rng;
2644    use tokio::join;
2645
2646    use super::{
2647        Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2648        DbMigrationFn, apply_migrations,
2649    };
2650    use crate::core::ModuleKind;
2651    use crate::db::mem_impl::MemDatabase;
2652    use crate::db::{
2653        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2654    };
2655    use crate::encoding::{Decodable, Encodable};
2656    use crate::module::registry::ModuleDecoderRegistry;
2657
2658    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2659        crate::runtime::timeout(Duration::from_millis(10), fut)
2660            .await
2661            .ok()
2662    }
2663
2664    #[repr(u8)]
2665    #[derive(Clone)]
2666    pub enum TestDbKeyPrefix {
2667        Test = 0x42,
2668        AltTest = 0x43,
2669        PercentTestKey = 0x25,
2670    }
2671
2672    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2673    pub(super) struct TestKey(pub u64);
2674
2675    #[derive(Debug, Encodable, Decodable)]
2676    struct DbPrefixTestPrefix;
2677
2678    impl_db_record!(
2679        key = TestKey,
2680        value = TestVal,
2681        db_prefix = TestDbKeyPrefix::Test,
2682        notify_on_modify = true,
2683    );
2684    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2685
2686    #[derive(Debug, Encodable, Decodable)]
2687    struct TestKeyV0(u64, u64);
2688
2689    #[derive(Debug, Encodable, Decodable)]
2690    struct DbPrefixTestPrefixV0;
2691
2692    impl_db_record!(
2693        key = TestKeyV0,
2694        value = TestVal,
2695        db_prefix = TestDbKeyPrefix::Test,
2696    );
2697    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2698
2699    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2700    struct AltTestKey(u64);
2701
2702    #[derive(Debug, Encodable, Decodable)]
2703    struct AltDbPrefixTestPrefix;
2704
2705    impl_db_record!(
2706        key = AltTestKey,
2707        value = TestVal,
2708        db_prefix = TestDbKeyPrefix::AltTest,
2709    );
2710    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2711
2712    #[derive(Debug, Encodable, Decodable)]
2713    struct PercentTestKey(u64);
2714
2715    #[derive(Debug, Encodable, Decodable)]
2716    struct PercentPrefixTestPrefix;
2717
2718    impl_db_record!(
2719        key = PercentTestKey,
2720        value = TestVal,
2721        db_prefix = TestDbKeyPrefix::PercentTestKey,
2722    );
2723
2724    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2725    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2726    pub(super) struct TestVal(pub u64);
2727
2728    const TEST_MODULE_PREFIX: u16 = 1;
2729    const ALT_MODULE_PREFIX: u16 = 2;
2730
2731    pub async fn verify_insert_elements(db: Database) {
2732        let mut dbtx = db.begin_transaction().await;
2733        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2734        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2735        dbtx.commit_tx().await;
2736
2737        // Test values were persisted
2738        let mut dbtx = db.begin_transaction().await;
2739        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2740        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2741        dbtx.commit_tx().await;
2742
2743        // Test overwrites work as expected
2744        let mut dbtx = db.begin_transaction().await;
2745        assert_eq!(
2746            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2747            Some(TestVal(2))
2748        );
2749        assert_eq!(
2750            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2751            Some(TestVal(3))
2752        );
2753        dbtx.commit_tx().await;
2754
2755        let mut dbtx = db.begin_transaction().await;
2756        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2757        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2758        dbtx.commit_tx().await;
2759    }
2760
2761    pub async fn verify_remove_nonexisting(db: Database) {
2762        let mut dbtx = db.begin_transaction().await;
2763        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2764        let removed = dbtx.remove_entry(&TestKey(1)).await;
2765        assert!(removed.is_none());
2766
2767        // Commit to suppress the warning message
2768        dbtx.commit_tx().await;
2769    }
2770
2771    pub async fn verify_remove_existing(db: Database) {
2772        let mut dbtx = db.begin_transaction().await;
2773
2774        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2775
2776        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2777
2778        let removed = dbtx.remove_entry(&TestKey(1)).await;
2779        assert_eq!(removed, Some(TestVal(2)));
2780        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2781
2782        // Commit to suppress the warning message
2783        dbtx.commit_tx().await;
2784    }
2785
2786    pub async fn verify_read_own_writes(db: Database) {
2787        let mut dbtx = db.begin_transaction().await;
2788
2789        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2790
2791        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2792
2793        // Commit to suppress the warning message
2794        dbtx.commit_tx().await;
2795    }
2796
2797    pub async fn verify_prevent_dirty_reads(db: Database) {
2798        let mut dbtx = db.begin_transaction().await;
2799
2800        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2801
2802        // dbtx2 should not be able to see uncommitted changes
2803        let mut dbtx2 = db.begin_transaction().await;
2804        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2805
2806        // Commit to suppress the warning message
2807        dbtx.commit_tx().await;
2808    }
2809
2810    pub async fn verify_find_by_range(db: Database) {
2811        let mut dbtx = db.begin_transaction().await;
2812        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2813        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2814        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2815
2816        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2817        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2818
2819        {
2820            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2821            module_dbtx
2822                .insert_entry(&TestKey(300), &TestVal(3000))
2823                .await;
2824        }
2825
2826        dbtx.commit_tx().await;
2827
2828        // Verify finding by prefix returns the correct set of key pairs
2829        let mut dbtx = db.begin_transaction_nc().await;
2830
2831        let returned_keys = dbtx
2832            .find_by_range(TestKey(55)..TestKey(56))
2833            .await
2834            .collect::<Vec<_>>()
2835            .await;
2836
2837        let expected = vec![(TestKey(55), TestVal(9999))];
2838
2839        assert_eq!(returned_keys, expected);
2840
2841        let returned_keys = dbtx
2842            .find_by_range(TestKey(54)..TestKey(56))
2843            .await
2844            .collect::<Vec<_>>()
2845            .await;
2846
2847        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2848        assert_eq!(returned_keys, expected);
2849
2850        let returned_keys = dbtx
2851            .find_by_range(TestKey(54)..TestKey(57))
2852            .await
2853            .collect::<Vec<_>>()
2854            .await;
2855
2856        let expected = vec![
2857            (TestKey(54), TestVal(8888)),
2858            (TestKey(55), TestVal(9999)),
2859            (TestKey(56), TestVal(7777)),
2860        ];
2861        assert_eq!(returned_keys, expected);
2862
2863        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2864        let test_range = module_dbtx
2865            .find_by_range(TestKey(300)..TestKey(301))
2866            .await
2867            .collect::<Vec<_>>()
2868            .await;
2869        assert!(test_range.len() == 1);
2870    }
2871
2872    pub async fn verify_find_by_prefix(db: Database) {
2873        let mut dbtx = db.begin_transaction().await;
2874        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2875        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2876
2877        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2878        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2879        dbtx.commit_tx().await;
2880
2881        // Verify finding by prefix returns the correct set of key pairs
2882        let mut dbtx = db.begin_transaction().await;
2883
2884        let returned_keys = dbtx
2885            .find_by_prefix(&DbPrefixTestPrefix)
2886            .await
2887            .collect::<Vec<_>>()
2888            .await;
2889
2890        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2891        assert_eq!(returned_keys, expected);
2892
2893        let reversed = dbtx
2894            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2895            .await
2896            .collect::<Vec<_>>()
2897            .await;
2898        let mut reversed_expected = expected;
2899        reversed_expected.reverse();
2900        assert_eq!(reversed, reversed_expected);
2901
2902        let returned_keys = dbtx
2903            .find_by_prefix(&AltDbPrefixTestPrefix)
2904            .await
2905            .collect::<Vec<_>>()
2906            .await;
2907
2908        let expected = vec![
2909            (AltTestKey(54), TestVal(6666)),
2910            (AltTestKey(55), TestVal(7777)),
2911        ];
2912        assert_eq!(returned_keys, expected);
2913
2914        let reversed = dbtx
2915            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2916            .await
2917            .collect::<Vec<_>>()
2918            .await;
2919        let mut reversed_expected = expected;
2920        reversed_expected.reverse();
2921        assert_eq!(reversed, reversed_expected);
2922    }
2923
2924    pub async fn verify_commit(db: Database) {
2925        let mut dbtx = db.begin_transaction().await;
2926
2927        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2928        dbtx.commit_tx().await;
2929
2930        // Verify dbtx2 can see committed transactions
2931        let mut dbtx2 = db.begin_transaction().await;
2932        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2933    }
2934
2935    pub async fn verify_rollback_to_savepoint(db: Database) {
2936        let mut dbtx_rollback = db.begin_transaction().await;
2937
2938        dbtx_rollback
2939            .insert_entry(&TestKey(20), &TestVal(2000))
2940            .await;
2941
2942        dbtx_rollback
2943            .set_tx_savepoint()
2944            .await
2945            .expect("Error setting transaction savepoint");
2946
2947        dbtx_rollback
2948            .insert_entry(&TestKey(21), &TestVal(2001))
2949            .await;
2950
2951        assert_eq!(
2952            dbtx_rollback.get_value(&TestKey(20)).await,
2953            Some(TestVal(2000))
2954        );
2955        assert_eq!(
2956            dbtx_rollback.get_value(&TestKey(21)).await,
2957            Some(TestVal(2001))
2958        );
2959
2960        dbtx_rollback
2961            .rollback_tx_to_savepoint()
2962            .await
2963            .expect("Error setting transaction savepoint");
2964
2965        assert_eq!(
2966            dbtx_rollback.get_value(&TestKey(20)).await,
2967            Some(TestVal(2000))
2968        );
2969
2970        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2971
2972        // Commit to suppress the warning message
2973        dbtx_rollback.commit_tx().await;
2974    }
2975
2976    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2977        let mut dbtx = db.begin_transaction().await;
2978        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2979
2980        let mut dbtx2 = db.begin_transaction().await;
2981
2982        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2983
2984        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2985
2986        dbtx2.commit_tx().await;
2987
2988        // dbtx should still read None because it is operating over a snapshot
2989        // of the data when the transaction started
2990        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2991
2992        let expected_keys = 0;
2993        let returned_keys = dbtx
2994            .find_by_prefix(&DbPrefixTestPrefix)
2995            .await
2996            .fold(0, |returned_keys, (key, value)| async move {
2997                if key == TestKey(100) {
2998                    assert!(value.eq(&TestVal(101)));
2999                }
3000                returned_keys + 1
3001            })
3002            .await;
3003
3004        assert_eq!(returned_keys, expected_keys);
3005    }
3006
3007    pub async fn verify_snapshot_isolation(db: Database) {
3008        async fn random_yield() {
3009            let times = if rand::thread_rng().gen_bool(0.5) {
3010                0
3011            } else {
3012                10
3013            };
3014            for _ in 0..times {
3015                tokio::task::yield_now().await;
3016            }
3017        }
3018
3019        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
3020        for i in 0..1000 {
3021            let base_key = i * 2;
3022            let tx_accepted_key = base_key;
3023            let spent_input_key = base_key + 1;
3024
3025            join!(
3026                async {
3027                    random_yield().await;
3028                    let mut dbtx = db.begin_transaction().await;
3029
3030                    random_yield().await;
3031                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
3032                    random_yield().await;
3033                    // we have 4 operations that can give you the db key,
3034                    // try all of them
3035                    let s = match i % 5 {
3036                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
3037                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
3038                        2 => {
3039                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
3040                                .await
3041                        }
3042                        3 => {
3043                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
3044                                .await
3045                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
3046                                .map(|(_k, v)| v)
3047                                .next()
3048                                .await
3049                        }
3050                        4 => {
3051                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
3052                                .await
3053                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
3054                                .map(|(_k, v)| v)
3055                                .next()
3056                                .await
3057                        }
3058                        _ => {
3059                            panic!("woot?");
3060                        }
3061                    };
3062
3063                    match (a, s) {
3064                        (None, None) | (Some(_), Some(_)) => {}
3065                        (None, Some(_)) => panic!("none some?! {i}"),
3066                        (Some(_), None) => panic!("some none?! {i}"),
3067                    }
3068                },
3069                async {
3070                    random_yield().await;
3071
3072                    let mut dbtx = db.begin_transaction().await;
3073                    random_yield().await;
3074                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
3075
3076                    random_yield().await;
3077                    assert_eq!(
3078                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
3079                            .await,
3080                        None
3081                    );
3082
3083                    random_yield().await;
3084                    assert_eq!(
3085                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
3086                            .await,
3087                        None
3088                    );
3089                    random_yield().await;
3090                    dbtx.commit_tx().await;
3091                }
3092            );
3093        }
3094    }
3095
3096    pub async fn verify_phantom_entry(db: Database) {
3097        let mut dbtx = db.begin_transaction().await;
3098
3099        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3100
3101        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3102
3103        dbtx.commit_tx().await;
3104
3105        let mut dbtx = db.begin_transaction().await;
3106        let expected_keys = 2;
3107        let returned_keys = dbtx
3108            .find_by_prefix(&DbPrefixTestPrefix)
3109            .await
3110            .fold(0, |returned_keys, (key, value)| async move {
3111                match key {
3112                    TestKey(100) => {
3113                        assert!(value.eq(&TestVal(101)));
3114                    }
3115                    TestKey(101) => {
3116                        assert!(value.eq(&TestVal(102)));
3117                    }
3118                    _ => {}
3119                }
3120                returned_keys + 1
3121            })
3122            .await;
3123
3124        assert_eq!(returned_keys, expected_keys);
3125
3126        let mut dbtx2 = db.begin_transaction().await;
3127
3128        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3129
3130        dbtx2.commit_tx().await;
3131
3132        let returned_keys = dbtx
3133            .find_by_prefix(&DbPrefixTestPrefix)
3134            .await
3135            .fold(0, |returned_keys, (key, value)| async move {
3136                match key {
3137                    TestKey(100) => {
3138                        assert!(value.eq(&TestVal(101)));
3139                    }
3140                    TestKey(101) => {
3141                        assert!(value.eq(&TestVal(102)));
3142                    }
3143                    _ => {}
3144                }
3145                returned_keys + 1
3146            })
3147            .await;
3148
3149        assert_eq!(returned_keys, expected_keys);
3150    }
3151
3152    pub async fn expect_write_conflict(db: Database) {
3153        let mut dbtx = db.begin_transaction().await;
3154        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3155        dbtx.commit_tx().await;
3156
3157        let mut dbtx2 = db.begin_transaction().await;
3158        let mut dbtx3 = db.begin_transaction().await;
3159
3160        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3161
3162        // Depending on if the database implementation supports optimistic or
3163        // pessimistic transactions, this test should generate an error here
3164        // (pessimistic) or at commit time (optimistic)
3165        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3166
3167        dbtx2.commit_tx().await;
3168        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3169    }
3170
3171    pub async fn verify_string_prefix(db: Database) {
3172        let mut dbtx = db.begin_transaction().await;
3173        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3174
3175        assert_eq!(
3176            dbtx.get_value(&PercentTestKey(100)).await,
3177            Some(TestVal(101))
3178        );
3179
3180        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3181
3182        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3183
3184        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3185
3186        // If the wildcard character ('%') is not handled properly, this will make
3187        // find_by_prefix return 5 results instead of 4
3188        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3189
3190        let expected_keys = 4;
3191        let returned_keys = dbtx
3192            .find_by_prefix(&PercentPrefixTestPrefix)
3193            .await
3194            .fold(0, |returned_keys, (key, value)| async move {
3195                if matches!(key, PercentTestKey(101)) {
3196                    assert!(value.eq(&TestVal(100)));
3197                }
3198                returned_keys + 1
3199            })
3200            .await;
3201
3202        assert_eq!(returned_keys, expected_keys);
3203    }
3204
3205    pub async fn verify_remove_by_prefix(db: Database) {
3206        let mut dbtx = db.begin_transaction().await;
3207
3208        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3209
3210        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3211
3212        dbtx.commit_tx().await;
3213
3214        let mut remove_dbtx = db.begin_transaction().await;
3215        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3216        remove_dbtx.commit_tx().await;
3217
3218        let mut dbtx = db.begin_transaction().await;
3219        let expected_keys = 0;
3220        let returned_keys = dbtx
3221            .find_by_prefix(&DbPrefixTestPrefix)
3222            .await
3223            .fold(0, |returned_keys, (key, value)| async move {
3224                match key {
3225                    TestKey(100) => {
3226                        assert!(value.eq(&TestVal(101)));
3227                    }
3228                    TestKey(101) => {
3229                        assert!(value.eq(&TestVal(102)));
3230                    }
3231                    _ => {}
3232                }
3233                returned_keys + 1
3234            })
3235            .await;
3236
3237        assert_eq!(returned_keys, expected_keys);
3238    }
3239
3240    pub async fn verify_module_db(db: Database, module_db: Database) {
3241        let mut dbtx = db.begin_transaction().await;
3242
3243        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3244
3245        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3246
3247        dbtx.commit_tx().await;
3248
3249        // verify module_dbtx can only read key/value pairs from its own module
3250        let mut module_dbtx = module_db.begin_transaction().await;
3251        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3252
3253        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3254
3255        // verify module_dbtx can read key/value pairs that it wrote
3256        let mut dbtx = db.begin_transaction().await;
3257        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3258
3259        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3260
3261        let mut module_dbtx = module_db.begin_transaction().await;
3262
3263        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3264
3265        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3266
3267        module_dbtx.commit_tx().await;
3268
3269        let expected_keys = 2;
3270        let mut dbtx = db.begin_transaction().await;
3271        let returned_keys = dbtx
3272            .find_by_prefix(&DbPrefixTestPrefix)
3273            .await
3274            .fold(0, |returned_keys, (key, value)| async move {
3275                match key {
3276                    TestKey(100) => {
3277                        assert!(value.eq(&TestVal(101)));
3278                    }
3279                    TestKey(101) => {
3280                        assert!(value.eq(&TestVal(102)));
3281                    }
3282                    _ => {}
3283                }
3284                returned_keys + 1
3285            })
3286            .await;
3287
3288        assert_eq!(returned_keys, expected_keys);
3289
3290        let removed = dbtx.remove_entry(&TestKey(100)).await;
3291        assert_eq!(removed, Some(TestVal(101)));
3292        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3293
3294        let mut module_dbtx = module_db.begin_transaction().await;
3295        assert_eq!(
3296            module_dbtx.get_value(&TestKey(100)).await,
3297            Some(TestVal(103))
3298        );
3299    }
3300
3301    pub async fn verify_module_prefix(db: Database) {
3302        let mut test_dbtx = db.begin_transaction().await;
3303        {
3304            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3305
3306            test_module_dbtx
3307                .insert_entry(&TestKey(100), &TestVal(101))
3308                .await;
3309
3310            test_module_dbtx
3311                .insert_entry(&TestKey(101), &TestVal(102))
3312                .await;
3313        }
3314
3315        test_dbtx.commit_tx().await;
3316
3317        let mut alt_dbtx = db.begin_transaction().await;
3318        {
3319            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3320
3321            alt_module_dbtx
3322                .insert_entry(&TestKey(100), &TestVal(103))
3323                .await;
3324
3325            alt_module_dbtx
3326                .insert_entry(&TestKey(101), &TestVal(104))
3327                .await;
3328        }
3329
3330        alt_dbtx.commit_tx().await;
3331
3332        // verify test_module_dbtx can only see key/value pairs from its own module
3333        let mut test_dbtx = db.begin_transaction().await;
3334        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3335        assert_eq!(
3336            test_module_dbtx.get_value(&TestKey(100)).await,
3337            Some(TestVal(101))
3338        );
3339
3340        assert_eq!(
3341            test_module_dbtx.get_value(&TestKey(101)).await,
3342            Some(TestVal(102))
3343        );
3344
3345        let expected_keys = 2;
3346        let returned_keys = test_module_dbtx
3347            .find_by_prefix(&DbPrefixTestPrefix)
3348            .await
3349            .fold(0, |returned_keys, (key, value)| async move {
3350                match key {
3351                    TestKey(100) => {
3352                        assert!(value.eq(&TestVal(101)));
3353                    }
3354                    TestKey(101) => {
3355                        assert!(value.eq(&TestVal(102)));
3356                    }
3357                    _ => {}
3358                }
3359                returned_keys + 1
3360            })
3361            .await;
3362
3363        assert_eq!(returned_keys, expected_keys);
3364
3365        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3366        assert_eq!(removed, Some(TestVal(101)));
3367        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3368
3369        // test_dbtx on its own wont find the key because it does not use a module
3370        // prefix
3371        let mut test_dbtx = db.begin_transaction().await;
3372        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3373
3374        test_dbtx.commit_tx().await;
3375    }
3376
3377    #[cfg(test)]
3378    #[tokio::test]
3379    pub async fn verify_test_migration() {
3380        // Insert a bunch of old dummy data that needs to be migrated to a new version
3381        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3382        let expected_test_keys_size: usize = 100;
3383        let mut dbtx = db.begin_transaction().await;
3384        for i in 0..expected_test_keys_size {
3385            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3386                .await;
3387        }
3388
3389        // Will also be migrated to `DatabaseVersionKey`
3390        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3391            .await;
3392        dbtx.commit_tx().await;
3393
3394        let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3395
3396        migrations.insert(
3397            DatabaseVersion(0),
3398            Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3399        );
3400
3401        apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3402            .await
3403            .expect("Error applying migrations for TestModule");
3404
3405        // Verify that the migrations completed successfully
3406        let mut dbtx = db.begin_transaction().await;
3407
3408        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3409        // to `DatabaseVersionKey`
3410        assert!(
3411            dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3412                .await
3413                .is_some()
3414        );
3415
3416        // Verify Dummy module migration
3417        let test_keys = dbtx
3418            .find_by_prefix(&DbPrefixTestPrefix)
3419            .await
3420            .collect::<Vec<_>>()
3421            .await;
3422        let test_keys_size = test_keys.len();
3423        assert_eq!(test_keys_size, expected_test_keys_size);
3424        for (key, val) in test_keys {
3425            assert_eq!(key.0, val.0 + 1);
3426        }
3427    }
3428
3429    #[allow(dead_code)]
3430    async fn migrate_test_db_version_0(
3431        mut ctx: DbMigrationFnContext<'_, ()>,
3432    ) -> std::result::Result<(), anyhow::Error> {
3433        let mut dbtx = ctx.dbtx();
3434        let example_keys_v0 = dbtx
3435            .find_by_prefix(&DbPrefixTestPrefixV0)
3436            .await
3437            .collect::<Vec<_>>()
3438            .await;
3439        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3440        for (key, val) in example_keys_v0 {
3441            let key_v2 = TestKey(key.1);
3442            dbtx.insert_new_entry(&key_v2, &val).await;
3443        }
3444        Ok(())
3445    }
3446
3447    #[cfg(test)]
3448    #[tokio::test]
3449    async fn test_autocommit() {
3450        use std::marker::PhantomData;
3451        use std::ops::Range;
3452        use std::path::Path;
3453
3454        use anyhow::anyhow;
3455        use async_trait::async_trait;
3456
3457        use crate::ModuleDecoderRegistry;
3458        use crate::db::{
3459            AutocommitError, BaseDatabaseTransaction, DatabaseError, DatabaseResult,
3460            IDatabaseTransaction, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
3461            IRawDatabase, IRawDatabaseTransaction,
3462        };
3463
3464        #[derive(Debug)]
3465        struct FakeDatabase;
3466
3467        #[async_trait]
3468        impl IRawDatabase for FakeDatabase {
3469            type Transaction<'a> = FakeTransaction<'a>;
3470            async fn begin_transaction(&self) -> FakeTransaction {
3471                FakeTransaction(PhantomData)
3472            }
3473
3474            fn checkpoint(&self, _backup_path: &Path) -> DatabaseResult<()> {
3475                Ok(())
3476            }
3477        }
3478
3479        #[derive(Debug)]
3480        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3481
3482        #[async_trait]
3483        impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3484            async fn raw_insert_bytes(
3485                &mut self,
3486                _key: &[u8],
3487                _value: &[u8],
3488            ) -> DatabaseResult<Option<Vec<u8>>> {
3489                unimplemented!()
3490            }
3491
3492            async fn raw_get_bytes(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3493                unimplemented!()
3494            }
3495
3496            async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3497                unimplemented!()
3498            }
3499
3500            async fn raw_find_by_range(
3501                &mut self,
3502                _key_range: Range<&[u8]>,
3503            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3504                unimplemented!()
3505            }
3506
3507            async fn raw_find_by_prefix(
3508                &mut self,
3509                _key_prefix: &[u8],
3510            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3511                unimplemented!()
3512            }
3513
3514            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
3515                unimplemented!()
3516            }
3517
3518            async fn raw_find_by_prefix_sorted_descending(
3519                &mut self,
3520                _key_prefix: &[u8],
3521            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3522                unimplemented!()
3523            }
3524        }
3525
3526        #[async_trait]
3527        impl IDatabaseTransactionOps for FakeTransaction<'_> {
3528            async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
3529                unimplemented!()
3530            }
3531
3532            async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
3533                unimplemented!()
3534            }
3535        }
3536
3537        #[async_trait]
3538        impl IRawDatabaseTransaction for FakeTransaction<'_> {
3539            async fn commit_tx(self) -> DatabaseResult<()> {
3540                use crate::db::DatabaseError;
3541
3542                Err(DatabaseError::Other(anyhow::anyhow!("Can't commit!")))
3543            }
3544        }
3545
3546        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3547        let err = db
3548            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3549            .await
3550            .unwrap_err();
3551
3552        match err {
3553            AutocommitError::CommitFailed {
3554                attempts: failed_attempts,
3555                ..
3556            } => {
3557                assert_eq!(failed_attempts, 5);
3558            }
3559            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3560        }
3561    }
3562}
3563
3564pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3565    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3566    decoders: ModuleDecoderRegistry,
3567    key_prefix: &KP,
3568) -> impl Stream<
3569    Item = (
3570        KP::Record,
3571        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3572    ),
3573>
3574+ 'r
3575+ use<'r, KP>
3576where
3577    'inner: 'r,
3578    KP: DatabaseLookup,
3579    KP::Record: DatabaseKey,
3580{
3581    debug!(target: LOG_DB, "find by prefix sorted descending");
3582    let prefix_bytes = key_prefix.to_bytes();
3583    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3584        .await
3585        .expect("Error doing prefix search in database")
3586        .map(move |(key_bytes, value_bytes)| {
3587            let key = decode_key_expect(&key_bytes, &decoders);
3588            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3589            (key, value)
3590        })
3591}
3592
3593pub async fn verify_module_db_integrity_dbtx(
3594    dbtx: &mut DatabaseTransaction<'_>,
3595    module_id: ModuleInstanceId,
3596    module_kind: ModuleKind,
3597    prefixes: &BTreeSet<u8>,
3598) {
3599    let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3600    if module_id < 250 {
3601        assert_eq!(module_db_prefix.len(), 2);
3602    }
3603    let mut records = dbtx
3604        .raw_find_by_prefix(&module_db_prefix)
3605        .await
3606        .expect("DB fail");
3607    while let Some((k, v)) = records.next().await {
3608        assert!(
3609            prefixes.contains(&k[module_db_prefix.len()]),
3610            "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3611            k.as_hex(),
3612            v.as_hex()
3613        );
3614    }
3615}
3616
3617#[cfg(test)]
3618mod tests;