fedimint_core/db/
mod.rs

1//! Core Fedimint database traits and types
2//!
3//! This module provides the core key-value database for Fedimint.
4//!
5//! # Usage
6//!
7//! To use the database, you typically follow these steps:
8//!
9//! 1. Create a `Database` instance
10//! 2. Begin a transaction
11//! 3. Perform operations within the transaction
12//! 4. Commit the transaction
13//!
14//! ## Example
15//!
16//! ```rust
17//! use fedimint_core::db::mem_impl::MemDatabase;
18//! use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19//! use fedimint_core::encoding::{Decodable, Encodable};
20//! use fedimint_core::impl_db_record;
21//! use fedimint_core::module::registry::ModuleDecoderRegistry;
22//!
23//! #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
24//! pub struct TestKey(pub u64);
25//! #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
26//! pub struct TestVal(pub u64);
27//!
28//! #[repr(u8)]
29//! #[derive(Clone)]
30//! pub enum TestDbKeyPrefix {
31//!     Test = 0x42,
32//! }
33//!
34//! impl_db_record!(
35//!     key = TestKey,
36//!     value = TestVal,
37//!     db_prefix = TestDbKeyPrefix::Test,
38//! );
39//!
40//! # async fn example() {
41//! // Create a new in-memory database
42//! let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
43//!
44//! // Begin a transaction
45//! let mut tx = db.begin_transaction().await;
46//!
47//! // Perform operations
48//! tx.insert_entry(&TestKey(1), &TestVal(100)).await;
49//! let value = tx.get_value(&TestKey(1)).await;
50//!
51//! // Commit the transaction
52//! tx.commit_tx().await;
53//!
54//! // For operations that may need to be retried due to conflicts, use the
55//! // `autocommit` function:
56//!
57//! db.autocommit(
58//!     |dbtx, _| {
59//!         Box::pin(async move {
60//!             dbtx.insert_entry(&TestKey(1), &TestVal(100)).await;
61//!             anyhow::Ok(())
62//!         })
63//!     },
64//!     None,
65//! )
66//! .await
67//! .unwrap();
68//! # }
69//! ```
70//!
71//! # Isolation of database transactions
72//!
73//! Fedimint requires that the database implementation implement Snapshot
74//! Isolation. Snapshot Isolation is a database isolation level that guarantees
75//! consistent reads from the time that the snapshot was created (at transaction
76//! creation time). Transactions with Snapshot Isolation level will only commit
77//! if there has been no write to the modified keys since the snapshot (i.e.
78//! write-write conflicts are prevented).
79//!
80//! Specifically, Fedimint expects the database implementation to prevent the
81//! following anomalies:
82//!
83//! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
84//! at time (t + i)
85//!
86//! Dirty Read: TX1 is able to read TX2's uncommitted writes.
87//!
88//! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
89//! time (t + i) where V1 != V2.
90//!
91//! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
92//! retrieves Y number of records for the same prefix at time (t + i).
93//!
94//! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
95//! overwrites V1 as the value for K1 (write-write conflict).
96//!
97//! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
98//! Record | Lost Writes | | -------- | ------------------ | ---------- |
99//! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
100//! | Prevented  | Prevented           | Prevented      | Possible    |
101//! | RocksDB  | Prevented          | Prevented  | Prevented           |
102//! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
103//! | Prevented           | Prevented      | Prevented   |
104
105use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use bitcoin::hex::DisplayHex as _;
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use fedimint_util_error::FmtCompact as _;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144/// Result type for database operations
145pub type DatabaseResult<T> = std::result::Result<T, DatabaseError>;
146
147pub trait DatabaseKeyPrefix: Debug {
148    fn to_bytes(&self) -> Vec<u8>;
149}
150
151/// A key + value pair in the database with a unique prefix
152/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
153pub trait DatabaseRecord: DatabaseKeyPrefix {
154    const DB_PREFIX: u8;
155    const NOTIFY_ON_MODIFY: bool = false;
156    type Key: DatabaseKey + Debug;
157    type Value: DatabaseValue + Debug;
158}
159
160/// A key that can be used to query one or more `DatabaseRecord`
161/// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
162pub trait DatabaseLookup: DatabaseKeyPrefix {
163    type Record: DatabaseRecord;
164}
165
166// Every `DatabaseRecord` is automatically a `DatabaseLookup`
167impl<Record> DatabaseLookup for Record
168where
169    Record: DatabaseRecord + Debug + Decodable + Encodable,
170{
171    type Record = Record;
172}
173
174/// `DatabaseKey` that represents the lookup structure for retrieving key/value
175/// pairs from the database.
176pub trait DatabaseKey: Sized {
177    /// Send a notification to tasks waiting to be notified if the value of
178    /// `DatabaseKey` is modified
179    ///
180    /// For instance, this can be used to be notified when a key in the
181    /// database is created. It is also possible to run a closure with the
182    /// value of the `DatabaseKey` as parameter to verify some changes to
183    /// that value.
184    const NOTIFY_ON_MODIFY: bool = false;
185    fn from_bytes(
186        data: &[u8],
187        modules: &ModuleDecoderRegistry,
188    ) -> std::result::Result<Self, DecodingError>;
189}
190
191/// Marker trait for `DatabaseKey`s where `NOTIFY` is true
192pub trait DatabaseKeyWithNotify {}
193
194/// `DatabaseValue` that represents the value structure of database records.
195pub trait DatabaseValue: Sized + Debug {
196    fn from_bytes(
197        data: &[u8],
198        modules: &ModuleDecoderRegistry,
199    ) -> std::result::Result<Self, DecodingError>;
200    fn to_bytes(&self) -> Vec<u8>;
201}
202
203pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
204
205/// Just ignore this type, it's only there to make compiler happy
206///
207/// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
208pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
209
210/// Error returned when the autocommit function fails
211#[derive(Debug, Error)]
212pub enum AutocommitError<E> {
213    /// Committing the transaction failed too many times, giving up
214    #[error("Commit Failed: {last_error}")]
215    CommitFailed {
216        /// Number of attempts
217        attempts: usize,
218        /// Last error on commit
219        last_error: DatabaseError,
220    },
221    /// Error returned by the closure provided to `autocommit`. If returned no
222    /// commit was attempted in that round
223    #[error("Closure error: {error}")]
224    ClosureError {
225        /// The attempt on which the closure returned an error
226        ///
227        /// Values other than 0 typically indicate a logic error since the
228        /// closure given to `autocommit` should not have side effects
229        /// and thus keep succeeding if it succeeded once.
230        attempts: usize,
231        /// Error returned by the closure
232        error: E,
233    },
234}
235
236pub trait AutocommitResultExt<T, E> {
237    /// Unwraps the "commit failed" error variant. Use this in cases where
238    /// autocommit is instructed to run indefinitely and commit will thus never
239    /// fail.
240    fn unwrap_autocommit(self) -> std::result::Result<T, E>;
241}
242
243impl<T, E> AutocommitResultExt<T, E> for std::result::Result<T, AutocommitError<E>> {
244    fn unwrap_autocommit(self) -> std::result::Result<T, E> {
245        match self {
246            Ok(value) => Ok(value),
247            Err(AutocommitError::CommitFailed { .. }) => {
248                panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
249            }
250            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
251        }
252    }
253}
254
255/// Raw database implementation
256///
257/// This and [`IRawDatabaseTransaction`] are meant to be implemented
258/// by crates like `fedimint-rocksdb` to provide a concrete implementation
259/// of a database to be used by Fedimint.
260///
261/// This is in contrast of [`IDatabase`] which includes extra
262/// functionality that Fedimint needs (and adds) on top of it.
263#[apply(async_trait_maybe_send!)]
264pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
265    /// A raw database transaction type
266    type Transaction<'a>: IRawDatabaseTransaction + Debug;
267
268    /// Start a database transaction
269    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
270
271    // Checkpoint the database to a backup directory
272    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
273}
274
275#[apply(async_trait_maybe_send!)]
276impl<T> IRawDatabase for Box<T>
277where
278    T: IRawDatabase,
279{
280    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
281
282    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
283        (**self).begin_transaction().await
284    }
285
286    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287        (**self).checkpoint(backup_path)
288    }
289}
290
291/// An extension trait with convenience operations on [`IRawDatabase`]
292pub trait IRawDatabaseExt: IRawDatabase + Sized {
293    /// Convert to type implementing [`IRawDatabase`] into [`Database`].
294    ///
295    /// When type inference is not an issue, [`Into::into`] can be used instead.
296    fn into_database(self) -> Database {
297        Database::new(self, ModuleRegistry::default())
298    }
299}
300
301impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
302
303impl<T> From<T> for Database
304where
305    T: IRawDatabase,
306{
307    fn from(raw: T) -> Self {
308        Self::new(raw, ModuleRegistry::default())
309    }
310}
311
312/// A database that on top of a raw database operation, implements
313/// key notification system.
314#[apply(async_trait_maybe_send!)]
315pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
316    /// Start a database transaction
317    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
318    /// Register (and wait) for `key` updates
319    async fn register(&self, key: &[u8]);
320    /// Notify about `key` update (creation, modification, deletion)
321    async fn notify(&self, key: &[u8]);
322
323    /// The prefix len of this database refers to the global (as opposed to
324    /// module-isolated) key space
325    fn is_global(&self) -> bool;
326
327    /// Checkpoints the database to a backup directory
328    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
329}
330
331#[apply(async_trait_maybe_send!)]
332impl<T> IDatabase for Arc<T>
333where
334    T: IDatabase + ?Sized,
335{
336    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
337        (**self).begin_transaction().await
338    }
339    async fn register(&self, key: &[u8]) {
340        (**self).register(key).await;
341    }
342    async fn notify(&self, key: &[u8]) {
343        (**self).notify(key).await;
344    }
345
346    fn is_global(&self) -> bool {
347        (**self).is_global()
348    }
349
350    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
351        (**self).checkpoint(backup_path)
352    }
353}
354
355/// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
356///
357/// Mostly notification system, but also run-time single-commit handling.
358struct BaseDatabase<RawDatabase> {
359    notifications: Arc<Notifications>,
360    raw: RawDatabase,
361}
362
363impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365        f.write_str("BaseDatabase")
366    }
367}
368
369#[apply(async_trait_maybe_send!)]
370impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
371    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
372        Box::new(BaseDatabaseTransaction::new(
373            self.raw.begin_transaction().await,
374            self.notifications.clone(),
375        ))
376    }
377    async fn register(&self, key: &[u8]) {
378        self.notifications.register(key).await;
379    }
380    async fn notify(&self, key: &[u8]) {
381        self.notifications.notify(key);
382    }
383
384    fn is_global(&self) -> bool {
385        true
386    }
387
388    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
389        self.raw.checkpoint(backup_path)
390    }
391}
392
393/// A public-facing newtype over `IDatabase`
394///
395/// Notably carries set of module decoders (`ModuleDecoderRegistry`)
396/// and implements common utility function for auto-commits, db isolation,
397/// and other.
398#[derive(Clone, Debug)]
399pub struct Database {
400    inner: Arc<dyn IDatabase + 'static>,
401    module_decoders: ModuleDecoderRegistry,
402}
403
404impl Database {
405    pub fn strong_count(&self) -> usize {
406        Arc::strong_count(&self.inner)
407    }
408
409    pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
410        self.inner
411    }
412}
413
414impl Database {
415    /// Creates a new Fedimint database from any object implementing
416    /// [`IDatabase`].
417    ///
418    /// See also [`Database::new_from_arc`].
419    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
420        let inner = BaseDatabase {
421            raw,
422            notifications: Arc::new(Notifications::new()),
423        };
424        Self::new_from_arc(
425            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
426            module_decoders,
427        )
428    }
429
430    /// Create [`Database`] from an already typed-erased `IDatabase`.
431    pub fn new_from_arc(
432        inner: Arc<dyn IDatabase + 'static>,
433        module_decoders: ModuleDecoderRegistry,
434    ) -> Self {
435        Self {
436            inner,
437            module_decoders,
438        }
439    }
440
441    /// Create [`Database`] isolated to a partition with a given `prefix`
442    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
443        Self {
444            inner: Arc::new(PrefixDatabase {
445                inner: self.inner.clone(),
446                global_dbtx_access_token: None,
447                prefix,
448            }),
449            module_decoders: self.module_decoders.clone(),
450        }
451    }
452
453    /// Create [`Database`] isolated to a partition with a prefix for a given
454    /// `module_instance_id`, allowing the module to access `global_dbtx` with
455    /// the right `access_token`
456    pub fn with_prefix_module_id(
457        &self,
458        module_instance_id: ModuleInstanceId,
459    ) -> (Self, GlobalDBTxAccessToken) {
460        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
461        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
462        (
463            Self {
464                inner: Arc::new(PrefixDatabase {
465                    inner: self.inner.clone(),
466                    global_dbtx_access_token: Some(global_dbtx_access_token),
467                    prefix,
468                }),
469                module_decoders: self.module_decoders.clone(),
470            },
471            global_dbtx_access_token,
472        )
473    }
474
475    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
476        Self {
477            inner: self.inner.clone(),
478            module_decoders,
479        }
480    }
481
482    /// Is this `Database` a global, unpartitioned `Database`
483    pub fn is_global(&self) -> bool {
484        self.inner.is_global()
485    }
486
487    /// `Err` if [`Self::is_global`] is not true
488    pub fn ensure_global(&self) -> DatabaseResult<()> {
489        if !self.is_global() {
490            return Err(DatabaseError::Other(anyhow::anyhow!(
491                "Database instance not global"
492            )));
493        }
494
495        Ok(())
496    }
497
498    /// `Err` if [`Self::is_global`] is true
499    pub fn ensure_isolated(&self) -> DatabaseResult<()> {
500        if self.is_global() {
501            return Err(DatabaseError::Other(anyhow::anyhow!(
502                "Database instance not isolated"
503            )));
504        }
505
506        Ok(())
507    }
508
509    /// Begin a new committable database transaction
510    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
511    where
512        's: 'tx,
513    {
514        DatabaseTransaction::<Committable>::new(
515            self.inner.begin_transaction().await,
516            self.module_decoders.clone(),
517        )
518    }
519
520    /// Begin a new non-committable database transaction
521    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
522    where
523        's: 'tx,
524    {
525        self.begin_transaction().await.into_nc()
526    }
527
528    pub fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
529        self.inner.checkpoint(backup_path)
530    }
531
532    /// Runs a closure with a reference to a database transaction and tries to
533    /// commit the transaction if the closure returns `Ok` and rolls it back
534    /// otherwise. If committing fails the closure is run for up to
535    /// `max_attempts` times. If `max_attempts` is `None` it will run
536    /// `usize::MAX` times which is close enough to infinite times.
537    ///
538    /// The closure `tx_fn` provided should not have side effects outside of the
539    /// database transaction provided, or if it does these should be
540    /// idempotent, since the closure might be run multiple times.
541    ///
542    /// # Lifetime Parameters
543    ///
544    /// The higher rank trait bound (HRTB) `'a` that is applied to the the
545    /// mutable reference to the database transaction ensures that the
546    /// reference lives as least as long as the returned future of the
547    /// closure.
548    ///
549    /// Further, the reference to self (`'s`) must outlive the
550    /// `DatabaseTransaction<'dt>`. In other words, the
551    /// `DatabaseTransaction` must live as least as long as `self` and that is
552    /// true as the `DatabaseTransaction` is only dropped at the end of the
553    /// `loop{}`.
554    ///
555    /// # Panics
556    ///
557    /// This function panics when the given number of maximum attempts is zero.
558    /// `max_attempts` must be greater or equal to one.
559    pub async fn autocommit<'s, 'dbtx, F, T, E>(
560        &'s self,
561        tx_fn: F,
562        max_attempts: Option<usize>,
563    ) -> std::result::Result<T, AutocommitError<E>>
564    where
565        's: 'dbtx,
566        for<'r, 'o> F: Fn(
567            &'r mut DatabaseTransaction<'o>,
568            PhantomBound<'dbtx, 'o>,
569        ) -> BoxFuture<'r, std::result::Result<T, E>>,
570    {
571        assert_ne!(max_attempts, Some(0));
572        let mut curr_attempts: usize = 0;
573
574        loop {
575            // The `checked_add()` function is used to catch the `usize` overflow.
576            // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
577            // after ~50 days. But if that's the case, something else must be wrong.
578            // With `usize=64bit` it would take much longer, obviously.
579            curr_attempts = curr_attempts
580                .checked_add(1)
581                .expect("db autocommit attempt counter overflowed");
582
583            let mut dbtx = self.begin_transaction().await;
584
585            let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
586            let val = match tx_fn_res {
587                Ok(val) => val,
588                Err(err) => {
589                    dbtx.ignore_uncommitted();
590                    return Err(AutocommitError::ClosureError {
591                        attempts: curr_attempts,
592                        error: err,
593                    });
594                }
595            };
596
597            let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
598
599            match dbtx.commit_tx_result().await {
600                Ok(()) => {
601                    return Ok(val);
602                }
603                Err(err) => {
604                    if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
605                        warn!(
606                            target: LOG_DB,
607                            curr_attempts,
608                            err = %err.fmt_compact(),
609                            "Database commit failed in an autocommit block - terminating"
610                        );
611                        return Err(AutocommitError::CommitFailed {
612                            attempts: curr_attempts,
613                            last_error: err,
614                        });
615                    }
616
617                    let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
618                    let delay = rand::thread_rng().gen_range(delay..(2 * delay));
619                    warn!(
620                        target: LOG_DB,
621                        curr_attempts,
622                        err = %err.fmt_compact(),
623                        delay_ms = %delay,
624                        "Database commit failed in an autocommit block - retrying"
625                    );
626                    crate::runtime::sleep(Duration::from_millis(delay)).await;
627                }
628            }
629        }
630    }
631
632    /// Waits for key to be notified.
633    ///
634    /// Calls the `checker` when value of the key may have changed.
635    /// Returns the value when `checker` returns a `Some(T)`.
636    pub async fn wait_key_check<'a, K, T>(
637        &'a self,
638        key: &K,
639        checker: impl Fn(Option<K::Value>) -> Option<T>,
640    ) -> (T, DatabaseTransaction<'a, Committable>)
641    where
642        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
643    {
644        let key_bytes = key.to_bytes();
645        loop {
646            // register for notification
647            let notify = self.inner.register(&key_bytes);
648
649            // check for value in db
650            let mut tx = self.inner.begin_transaction().await;
651
652            let maybe_value_bytes = tx
653                .raw_get_bytes(&key_bytes)
654                .await
655                .expect("Unrecoverable error when reading from database")
656                .map(|value_bytes| {
657                    decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
658                });
659
660            if let Some(value) = checker(maybe_value_bytes) {
661                return (
662                    value,
663                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
664                );
665            }
666
667            // key not found, try again
668            notify.await;
669            // if miss a notification between await and next register, it is
670            // fine. because we are going check the database
671        }
672    }
673
674    /// Waits for key to be present in database.
675    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
676    where
677        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
678    {
679        self.wait_key_check(key, std::convert::identity).await.0
680    }
681}
682
683fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
684    let mut bytes = vec![MODULE_GLOBAL_PREFIX];
685    bytes.append(&mut module_instance_id.consensus_encode_to_vec());
686    bytes
687}
688
689/// A database that wraps an `inner` one and adds a prefix to all operations,
690/// effectively creating an isolated partition.
691#[derive(Clone, Debug)]
692struct PrefixDatabase<Inner>
693where
694    Inner: Debug,
695{
696    prefix: Vec<u8>,
697    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
698    inner: Inner,
699}
700
701impl<Inner> PrefixDatabase<Inner>
702where
703    Inner: Debug,
704{
705    // TODO: we should optimize these concatenations, maybe by having an internal
706    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
707    // something
708    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
709        let mut full_key = self.prefix.clone();
710        full_key.extend_from_slice(key);
711        full_key
712    }
713}
714
715#[apply(async_trait_maybe_send!)]
716impl<Inner> IDatabase for PrefixDatabase<Inner>
717where
718    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
719{
720    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
721        Box::new(PrefixDatabaseTransaction {
722            inner: self.inner.begin_transaction().await,
723            global_dbtx_access_token: self.global_dbtx_access_token,
724            prefix: self.prefix.clone(),
725        })
726    }
727    async fn register(&self, key: &[u8]) {
728        self.inner.register(&self.get_full_key(key)).await;
729    }
730
731    async fn notify(&self, key: &[u8]) {
732        self.inner.notify(&self.get_full_key(key)).await;
733    }
734
735    fn is_global(&self) -> bool {
736        if self.global_dbtx_access_token.is_some() {
737            false
738        } else {
739            self.inner.is_global()
740        }
741    }
742
743    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
744        self.inner.checkpoint(backup_path)
745    }
746}
747
748/// A database transactions that wraps an `inner` one and adds a prefix to all
749/// operations, effectively creating an isolated partition.
750///
751/// Produced by [`PrefixDatabase`].
752#[derive(Debug)]
753struct PrefixDatabaseTransaction<Inner> {
754    inner: Inner,
755    global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
756    prefix: Vec<u8>,
757}
758
759impl<Inner> PrefixDatabaseTransaction<Inner> {
760    // TODO: we should optimize these concatenations, maybe by having an internal
761    // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
762    // something
763    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
764        let mut full_key = self.prefix.clone();
765        full_key.extend_from_slice(key);
766        full_key
767    }
768
769    fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
770        Range {
771            start: self.get_full_key(range.start),
772            end: self.get_full_key(range.end),
773        }
774    }
775
776    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
777        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
778    }
779}
780
781#[apply(async_trait_maybe_send!)]
782impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
783where
784    Inner: IDatabaseTransaction,
785{
786    async fn commit_tx(&mut self) -> DatabaseResult<()> {
787        self.inner.commit_tx().await
788    }
789
790    fn is_global(&self) -> bool {
791        if self.global_dbtx_access_token.is_some() {
792            false
793        } else {
794            self.inner.is_global()
795        }
796    }
797
798    fn global_dbtx(
799        &mut self,
800        access_token: GlobalDBTxAccessToken,
801    ) -> &mut dyn IDatabaseTransaction {
802        if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
803            assert_eq!(
804                access_token, self_global_dbtx_access_token,
805                "Invalid access key used to access global_dbtx"
806            );
807            &mut self.inner
808        } else {
809            self.inner.global_dbtx(access_token)
810        }
811    }
812}
813
814#[apply(async_trait_maybe_send!)]
815impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
816where
817    Inner: IDatabaseTransactionOpsCore,
818{
819    async fn raw_insert_bytes(
820        &mut self,
821        key: &[u8],
822        value: &[u8],
823    ) -> DatabaseResult<Option<Vec<u8>>> {
824        let key = self.get_full_key(key);
825        self.inner.raw_insert_bytes(&key, value).await
826    }
827
828    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
829        let key = self.get_full_key(key);
830        self.inner.raw_get_bytes(&key).await
831    }
832
833    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
834        let key = self.get_full_key(key);
835        self.inner.raw_remove_entry(&key).await
836    }
837
838    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
839        let key = self.get_full_key(key_prefix);
840        let stream = self.inner.raw_find_by_prefix(&key).await?;
841        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
842    }
843
844    async fn raw_find_by_prefix_sorted_descending(
845        &mut self,
846        key_prefix: &[u8],
847    ) -> DatabaseResult<PrefixStream<'_>> {
848        let key = self.get_full_key(key_prefix);
849        let stream = self
850            .inner
851            .raw_find_by_prefix_sorted_descending(&key)
852            .await?;
853        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
854    }
855
856    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
857        let range = self.get_full_range(range);
858        let stream = self
859            .inner
860            .raw_find_by_range(Range {
861                start: &range.start,
862                end: &range.end,
863            })
864            .await?;
865        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
866    }
867
868    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
869        let key = self.get_full_key(key_prefix);
870        self.inner.raw_remove_by_prefix(&key).await
871    }
872}
873
874impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner> where
875    Inner: IDatabaseTransactionOps
876{
877}
878
879/// Core raw a operations database transactions supports
880///
881/// Used to enforce the same signature on all types supporting it
882#[apply(async_trait_maybe_send!)]
883pub trait IDatabaseTransactionOpsCore: MaybeSend {
884    /// Insert entry
885    async fn raw_insert_bytes(
886        &mut self,
887        key: &[u8],
888        value: &[u8],
889    ) -> DatabaseResult<Option<Vec<u8>>>;
890
891    /// Get key value
892    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
893
894    /// Remove entry by `key`
895    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
896
897    /// Returns an stream of key-value pairs with keys that start with
898    /// `key_prefix`, sorted by key.
899    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>>;
900
901    /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
902    async fn raw_find_by_prefix_sorted_descending(
903        &mut self,
904        key_prefix: &[u8],
905    ) -> DatabaseResult<PrefixStream<'_>>;
906
907    /// Returns an stream of key-value pairs with keys within a `range`, sorted
908    /// by key. [`Range`] is an (half-open) range bounded inclusively below and
909    /// exclusively above.
910    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>>;
911
912    /// Delete keys matching prefix
913    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()>;
914}
915
916#[apply(async_trait_maybe_send!)]
917impl<T> IDatabaseTransactionOpsCore for Box<T>
918where
919    T: IDatabaseTransactionOpsCore + ?Sized,
920{
921    async fn raw_insert_bytes(
922        &mut self,
923        key: &[u8],
924        value: &[u8],
925    ) -> DatabaseResult<Option<Vec<u8>>> {
926        (**self).raw_insert_bytes(key, value).await
927    }
928
929    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
930        (**self).raw_get_bytes(key).await
931    }
932
933    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
934        (**self).raw_remove_entry(key).await
935    }
936
937    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
938        (**self).raw_find_by_prefix(key_prefix).await
939    }
940
941    async fn raw_find_by_prefix_sorted_descending(
942        &mut self,
943        key_prefix: &[u8],
944    ) -> DatabaseResult<PrefixStream<'_>> {
945        (**self)
946            .raw_find_by_prefix_sorted_descending(key_prefix)
947            .await
948    }
949
950    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
951        (**self).raw_find_by_range(range).await
952    }
953
954    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
955        (**self).raw_remove_by_prefix(key_prefix).await
956    }
957}
958
959#[apply(async_trait_maybe_send!)]
960impl<T> IDatabaseTransactionOpsCore for &mut T
961where
962    T: IDatabaseTransactionOpsCore + ?Sized,
963{
964    async fn raw_insert_bytes(
965        &mut self,
966        key: &[u8],
967        value: &[u8],
968    ) -> DatabaseResult<Option<Vec<u8>>> {
969        (**self).raw_insert_bytes(key, value).await
970    }
971
972    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
973        (**self).raw_get_bytes(key).await
974    }
975
976    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
977        (**self).raw_remove_entry(key).await
978    }
979
980    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
981        (**self).raw_find_by_prefix(key_prefix).await
982    }
983
984    async fn raw_find_by_prefix_sorted_descending(
985        &mut self,
986        key_prefix: &[u8],
987    ) -> DatabaseResult<PrefixStream<'_>> {
988        (**self)
989            .raw_find_by_prefix_sorted_descending(key_prefix)
990            .await
991    }
992
993    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
994        (**self).raw_find_by_range(range).await
995    }
996
997    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
998        (**self).raw_remove_by_prefix(key_prefix).await
999    }
1000}
1001
1002/// Additional operations (only some) database transactions expose, on top of
1003/// [`IDatabaseTransactionOpsCore`]
1004///
1005/// In certain contexts exposing these operations would be a problem, so they
1006/// are moved to a separate trait.
1007pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {}
1008
1009impl<T> IDatabaseTransactionOps for Box<T> where T: IDatabaseTransactionOps + ?Sized {}
1010
1011impl<T> IDatabaseTransactionOps for &mut T where T: IDatabaseTransactionOps + ?Sized {}
1012
1013/// Like [`IDatabaseTransactionOpsCore`], but typed
1014///
1015/// Implemented via blanket impl for everything that implements
1016/// [`IDatabaseTransactionOpsCore`] that has decoders (implements
1017/// [`WithDecoders`]).
1018#[apply(async_trait_maybe_send!)]
1019pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1020    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1021    where
1022        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1023
1024    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1025    where
1026        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1027        K::Value: MaybeSend + MaybeSync;
1028
1029    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1030    where
1031        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1032        K::Value: MaybeSend + MaybeSync;
1033
1034    async fn find_by_range<K>(
1035        &mut self,
1036        key_range: Range<K>,
1037    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1038    where
1039        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1040        K::Value: MaybeSend + MaybeSync;
1041
1042    async fn find_by_prefix<KP>(
1043        &mut self,
1044        key_prefix: &KP,
1045    ) -> Pin<
1046        Box<
1047            maybe_add_send!(
1048                dyn Stream<
1049                        Item = (
1050                            KP::Record,
1051                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1052                        ),
1053                    > + '_
1054            ),
1055        >,
1056    >
1057    where
1058        KP: DatabaseLookup + MaybeSend + MaybeSync,
1059        KP::Record: DatabaseKey;
1060
1061    async fn find_by_prefix_sorted_descending<KP>(
1062        &mut self,
1063        key_prefix: &KP,
1064    ) -> Pin<
1065        Box<
1066            maybe_add_send!(
1067                dyn Stream<
1068                        Item = (
1069                            KP::Record,
1070                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1071                        ),
1072                    > + '_
1073            ),
1074        >,
1075    >
1076    where
1077        KP: DatabaseLookup + MaybeSend + MaybeSync,
1078        KP::Record: DatabaseKey;
1079
1080    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1081    where
1082        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1083
1084    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1085    where
1086        KP: DatabaseLookup + MaybeSend + MaybeSync;
1087}
1088
1089// blanket implementation of typed ops for anything that implements raw ops and
1090// has decoders
1091#[apply(async_trait_maybe_send!)]
1092impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1093where
1094    T: IDatabaseTransactionOpsCore + WithDecoders,
1095{
1096    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1097    where
1098        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1099    {
1100        let key_bytes = key.to_bytes();
1101        let raw = self
1102            .raw_get_bytes(&key_bytes)
1103            .await
1104            .expect("Unrecoverable error occurred while reading and entry from the database");
1105        raw.map(|value_bytes| {
1106            decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1107        })
1108    }
1109
1110    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1111    where
1112        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1113        K::Value: MaybeSend + MaybeSync,
1114    {
1115        let key_bytes = key.to_bytes();
1116        self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1117            .await
1118            .expect("Unrecoverable error occurred while inserting entry into the database")
1119            .map(|value_bytes| {
1120                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1121            })
1122    }
1123
1124    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1125    where
1126        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1127        K::Value: MaybeSend + MaybeSync,
1128    {
1129        if let Some(prev) = self.insert_entry(key, value).await {
1130            panic!(
1131                "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1132            );
1133        }
1134    }
1135
1136    async fn find_by_range<K>(
1137        &mut self,
1138        key_range: Range<K>,
1139    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1140    where
1141        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1142        K::Value: MaybeSend + MaybeSync,
1143    {
1144        let decoders = self.decoders().clone();
1145        Box::pin(
1146            self.raw_find_by_range(Range {
1147                start: &key_range.start.to_bytes(),
1148                end: &key_range.end.to_bytes(),
1149            })
1150            .await
1151            .expect("Unrecoverable error occurred while listing entries from the database")
1152            .map(move |(key_bytes, value_bytes)| {
1153                let key = decode_key_expect(&key_bytes, &decoders);
1154                let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1155                (key, value)
1156            }),
1157        )
1158    }
1159
1160    async fn find_by_prefix<KP>(
1161        &mut self,
1162        key_prefix: &KP,
1163    ) -> Pin<
1164        Box<
1165            maybe_add_send!(
1166                dyn Stream<
1167                        Item = (
1168                            KP::Record,
1169                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1170                        ),
1171                    > + '_
1172            ),
1173        >,
1174    >
1175    where
1176        KP: DatabaseLookup + MaybeSend + MaybeSync,
1177        KP::Record: DatabaseKey,
1178    {
1179        let decoders = self.decoders().clone();
1180        Box::pin(
1181            self.raw_find_by_prefix(&key_prefix.to_bytes())
1182                .await
1183                .expect("Unrecoverable error occurred while listing entries from the database")
1184                .map(move |(key_bytes, value_bytes)| {
1185                    let key = decode_key_expect(&key_bytes, &decoders);
1186                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1187                    (key, value)
1188                }),
1189        )
1190    }
1191
1192    async fn find_by_prefix_sorted_descending<KP>(
1193        &mut self,
1194        key_prefix: &KP,
1195    ) -> Pin<
1196        Box<
1197            maybe_add_send!(
1198                dyn Stream<
1199                        Item = (
1200                            KP::Record,
1201                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1202                        ),
1203                    > + '_
1204            ),
1205        >,
1206    >
1207    where
1208        KP: DatabaseLookup + MaybeSend + MaybeSync,
1209        KP::Record: DatabaseKey,
1210    {
1211        let decoders = self.decoders().clone();
1212        Box::pin(
1213            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1214                .await
1215                .expect("Unrecoverable error occurred while listing entries from the database")
1216                .map(move |(key_bytes, value_bytes)| {
1217                    let key = decode_key_expect(&key_bytes, &decoders);
1218                    let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1219                    (key, value)
1220                }),
1221        )
1222    }
1223    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1224    where
1225        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1226    {
1227        let key_bytes = key.to_bytes();
1228        self.raw_remove_entry(&key_bytes)
1229            .await
1230            .expect("Unrecoverable error occurred while inserting removing entry from the database")
1231            .map(|value_bytes| {
1232                decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1233            })
1234    }
1235    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1236    where
1237        KP: DatabaseLookup + MaybeSend + MaybeSync,
1238    {
1239        self.raw_remove_by_prefix(&key_prefix.to_bytes())
1240            .await
1241            .expect("Unrecoverable error when removing entries from the database");
1242    }
1243}
1244
1245/// A database type that has decoders, which allows it to implement
1246/// [`IDatabaseTransactionOpsCoreTyped`]
1247pub trait WithDecoders {
1248    fn decoders(&self) -> &ModuleDecoderRegistry;
1249}
1250
1251/// Raw database transaction (e.g. rocksdb implementation)
1252#[apply(async_trait_maybe_send!)]
1253pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1254    async fn commit_tx(self) -> DatabaseResult<()>;
1255}
1256
1257/// Fedimint database transaction
1258///
1259/// See [`IDatabase`] for more info.
1260#[apply(async_trait_maybe_send!)]
1261pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1262    /// Commit the transaction
1263    async fn commit_tx(&mut self) -> DatabaseResult<()>;
1264
1265    /// Is global database
1266    fn is_global(&self) -> bool;
1267
1268    /// Get the global database tx from a module-prefixed database transaction
1269    ///
1270    /// Meant to be called only by core internals, and module developers should
1271    /// not call it directly.
1272    #[doc(hidden)]
1273    fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1274    -> &mut dyn IDatabaseTransaction;
1275}
1276
1277#[apply(async_trait_maybe_send!)]
1278impl<T> IDatabaseTransaction for Box<T>
1279where
1280    T: IDatabaseTransaction + ?Sized,
1281{
1282    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1283        (**self).commit_tx().await
1284    }
1285
1286    fn is_global(&self) -> bool {
1287        (**self).is_global()
1288    }
1289
1290    fn global_dbtx(
1291        &mut self,
1292        access_token: GlobalDBTxAccessToken,
1293    ) -> &mut dyn IDatabaseTransaction {
1294        (**self).global_dbtx(access_token)
1295    }
1296}
1297
1298#[apply(async_trait_maybe_send!)]
1299impl<'a, T> IDatabaseTransaction for &'a mut T
1300where
1301    T: IDatabaseTransaction + ?Sized,
1302{
1303    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1304        (**self).commit_tx().await
1305    }
1306
1307    fn is_global(&self) -> bool {
1308        (**self).is_global()
1309    }
1310
1311    fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1312        (**self).global_dbtx(access_key)
1313    }
1314}
1315
1316/// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1317/// easier in other structs since it does not consumed `self` by move.
1318struct BaseDatabaseTransaction<Tx> {
1319    // TODO: merge options
1320    raw: Option<Tx>,
1321    notify_queue: Option<NotifyQueue>,
1322    notifications: Arc<Notifications>,
1323}
1324
1325impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1326where
1327    Tx: fmt::Debug,
1328{
1329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330        f.write_fmt(format_args!(
1331            "BaseDatabaseTransaction{{ raw={:?} }}",
1332            self.raw
1333        ))
1334    }
1335}
1336impl<Tx> BaseDatabaseTransaction<Tx>
1337where
1338    Tx: IRawDatabaseTransaction,
1339{
1340    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1341        Self {
1342            raw: Some(dbtx),
1343            notifications,
1344            notify_queue: Some(NotifyQueue::new()),
1345        }
1346    }
1347
1348    fn add_notification_key(&mut self, key: &[u8]) -> DatabaseResult<()> {
1349        self.notify_queue
1350            .as_mut()
1351            .ok_or(DatabaseError::TransactionConsumed)?
1352            .add(key);
1353        Ok(())
1354    }
1355}
1356
1357#[apply(async_trait_maybe_send!)]
1358impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1359    async fn raw_insert_bytes(
1360        &mut self,
1361        key: &[u8],
1362        value: &[u8],
1363    ) -> DatabaseResult<Option<Vec<u8>>> {
1364        self.add_notification_key(key)?;
1365        self.raw
1366            .as_mut()
1367            .ok_or(DatabaseError::TransactionConsumed)?
1368            .raw_insert_bytes(key, value)
1369            .await
1370    }
1371
1372    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1373        self.raw
1374            .as_mut()
1375            .ok_or(DatabaseError::TransactionConsumed)?
1376            .raw_get_bytes(key)
1377            .await
1378    }
1379
1380    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1381        self.add_notification_key(key)?;
1382        self.raw
1383            .as_mut()
1384            .ok_or(DatabaseError::TransactionConsumed)?
1385            .raw_remove_entry(key)
1386            .await
1387    }
1388
1389    async fn raw_find_by_range(
1390        &mut self,
1391        key_range: Range<&[u8]>,
1392    ) -> DatabaseResult<PrefixStream<'_>> {
1393        self.raw
1394            .as_mut()
1395            .ok_or(DatabaseError::TransactionConsumed)?
1396            .raw_find_by_range(key_range)
1397            .await
1398    }
1399
1400    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1401        self.raw
1402            .as_mut()
1403            .ok_or(DatabaseError::TransactionConsumed)?
1404            .raw_find_by_prefix(key_prefix)
1405            .await
1406    }
1407
1408    async fn raw_find_by_prefix_sorted_descending(
1409        &mut self,
1410        key_prefix: &[u8],
1411    ) -> DatabaseResult<PrefixStream<'_>> {
1412        self.raw
1413            .as_mut()
1414            .ok_or(DatabaseError::TransactionConsumed)?
1415            .raw_find_by_prefix_sorted_descending(key_prefix)
1416            .await
1417    }
1418
1419    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1420        self.raw
1421            .as_mut()
1422            .ok_or(DatabaseError::TransactionConsumed)?
1423            .raw_remove_by_prefix(key_prefix)
1424            .await
1425    }
1426}
1427
1428impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {}
1429
1430#[apply(async_trait_maybe_send!)]
1431impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1432    for BaseDatabaseTransaction<Tx>
1433{
1434    async fn commit_tx(&mut self) -> DatabaseResult<()> {
1435        self.raw
1436            .take()
1437            .ok_or(DatabaseError::TransactionConsumed)?
1438            .commit_tx()
1439            .await?;
1440        self.notifications.submit_queue(
1441            &self
1442                .notify_queue
1443                .take()
1444                .expect("commit must be called only once"),
1445        );
1446        Ok(())
1447    }
1448
1449    fn is_global(&self) -> bool {
1450        true
1451    }
1452
1453    fn global_dbtx(
1454        &mut self,
1455        _access_token: GlobalDBTxAccessToken,
1456    ) -> &mut dyn IDatabaseTransaction {
1457        panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1458    }
1459}
1460
1461/// A helper for tracking and logging on `Drop` any instances of uncommitted
1462/// writes
1463#[derive(Clone)]
1464struct CommitTracker {
1465    /// Is the dbtx committed
1466    is_committed: bool,
1467    /// Does the dbtx have any writes
1468    has_writes: bool,
1469    /// Don't warn-log uncommitted writes
1470    ignore_uncommitted: bool,
1471}
1472
1473impl Drop for CommitTracker {
1474    fn drop(&mut self) {
1475        if self.has_writes && !self.is_committed {
1476            if self.ignore_uncommitted {
1477                trace!(
1478                    target: LOG_DB,
1479                    "DatabaseTransaction has writes and has not called commit, but that's expected."
1480                );
1481            } else {
1482                warn!(
1483                    target: LOG_DB,
1484                    location = ?backtrace::Backtrace::new(),
1485                    "DatabaseTransaction has writes and has not called commit."
1486                );
1487            }
1488        }
1489    }
1490}
1491
1492enum MaybeRef<'a, T> {
1493    Owned(T),
1494    Borrowed(&'a mut T),
1495}
1496
1497impl<T> ops::Deref for MaybeRef<'_, T> {
1498    type Target = T;
1499
1500    fn deref(&self) -> &Self::Target {
1501        match self {
1502            MaybeRef::Owned(o) => o,
1503            MaybeRef::Borrowed(r) => r,
1504        }
1505    }
1506}
1507
1508impl<T> ops::DerefMut for MaybeRef<'_, T> {
1509    fn deref_mut(&mut self) -> &mut Self::Target {
1510        match self {
1511            MaybeRef::Owned(o) => o,
1512            MaybeRef::Borrowed(r) => r,
1513        }
1514    }
1515}
1516
1517/// Session type for [`DatabaseTransaction`] that is allowed to commit
1518///
1519/// Opposite of [`NonCommittable`].
1520pub struct Committable;
1521
1522/// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1523///
1524/// Opposite of [`Committable`].
1525pub struct NonCommittable;
1526
1527/// A high level database transaction handle
1528///
1529/// `Cap` is a session type
1530pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1531    tx: Box<dyn IDatabaseTransaction + 'tx>,
1532    decoders: ModuleDecoderRegistry,
1533    commit_tracker: MaybeRef<'tx, CommitTracker>,
1534    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1535    capability: marker::PhantomData<Cap>,
1536}
1537
1538impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1539    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540        f.write_fmt(format_args!(
1541            "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1542            self.tx, self.decoders
1543        ))
1544    }
1545}
1546
1547impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1548    fn decoders(&self) -> &ModuleDecoderRegistry {
1549        &self.decoders
1550    }
1551}
1552
1553#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1554fn decode_value<V: DatabaseValue>(
1555    value_bytes: &[u8],
1556    decoders: &ModuleDecoderRegistry,
1557) -> std::result::Result<V, DecodingError> {
1558    trace!(
1559        bytes = %AbbreviateHexBytes(value_bytes),
1560        "decoding value",
1561    );
1562    V::from_bytes(value_bytes, decoders)
1563}
1564
1565#[track_caller]
1566fn decode_value_expect<V: DatabaseValue>(
1567    value_bytes: &[u8],
1568    decoders: &ModuleDecoderRegistry,
1569    key_bytes: &[u8],
1570) -> V {
1571    decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1572        panic!(
1573            "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1574            any::type_name::<V>(),
1575            err,
1576            AbbreviateHexBytes(key_bytes),
1577            AbbreviateHexBytes(value_bytes),
1578        )
1579    })
1580}
1581
1582#[track_caller]
1583fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1584    trace!(
1585        bytes = %AbbreviateHexBytes(key_bytes),
1586        "decoding key",
1587    );
1588    K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1589        panic!(
1590            "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1591            any::type_name::<K>(),
1592            err,
1593            AbbreviateHexBytes(key_bytes)
1594        )
1595    })
1596}
1597
1598impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1599    /// Convert into a non-committable version
1600    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1601        DatabaseTransaction {
1602            tx: self.tx,
1603            decoders: self.decoders,
1604            commit_tracker: self.commit_tracker,
1605            on_commit_hooks: self.on_commit_hooks,
1606            capability: PhantomData::<NonCommittable>,
1607        }
1608    }
1609
1610    /// Get a reference to a non-committeable version
1611    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1612    where
1613        's: 'a,
1614    {
1615        self.to_ref().into_nc()
1616    }
1617
1618    /// Get [`DatabaseTransaction`] isolated to a `prefix`
1619    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1620    where
1621        'tx: 'a,
1622    {
1623        DatabaseTransaction {
1624            tx: Box::new(PrefixDatabaseTransaction {
1625                inner: self.tx,
1626                global_dbtx_access_token: None,
1627                prefix,
1628            }),
1629            decoders: self.decoders,
1630            commit_tracker: self.commit_tracker,
1631            on_commit_hooks: self.on_commit_hooks,
1632            capability: self.capability,
1633        }
1634    }
1635
1636    /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1637    /// `module_instance_id`, allowing the module to access global_dbtx
1638    /// with the right access token.
1639    pub fn with_prefix_module_id<'a: 'tx>(
1640        self,
1641        module_instance_id: ModuleInstanceId,
1642    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1643    where
1644        'tx: 'a,
1645    {
1646        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1647        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1648        (
1649            DatabaseTransaction {
1650                tx: Box::new(PrefixDatabaseTransaction {
1651                    inner: self.tx,
1652                    global_dbtx_access_token: Some(global_dbtx_access_token),
1653                    prefix,
1654                }),
1655                decoders: self.decoders,
1656                commit_tracker: self.commit_tracker,
1657                on_commit_hooks: self.on_commit_hooks,
1658                capability: self.capability,
1659            },
1660            global_dbtx_access_token,
1661        )
1662    }
1663
1664    /// Get [`DatabaseTransaction`] to `self`
1665    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1666    where
1667        's: 'a,
1668    {
1669        let decoders = self.decoders.clone();
1670
1671        DatabaseTransaction {
1672            tx: Box::new(&mut self.tx),
1673            decoders,
1674            commit_tracker: match self.commit_tracker {
1675                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1676                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1677            },
1678            on_commit_hooks: match self.on_commit_hooks {
1679                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1680                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1681            },
1682            capability: self.capability,
1683        }
1684    }
1685
1686    /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1687    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1688    where
1689        'tx: 'a,
1690    {
1691        DatabaseTransaction {
1692            tx: Box::new(PrefixDatabaseTransaction {
1693                inner: &mut self.tx,
1694                global_dbtx_access_token: None,
1695                prefix,
1696            }),
1697            decoders: self.decoders.clone(),
1698            commit_tracker: match self.commit_tracker {
1699                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1700                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1701            },
1702            on_commit_hooks: match self.on_commit_hooks {
1703                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1704                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1705            },
1706            capability: self.capability,
1707        }
1708    }
1709
1710    pub fn to_ref_with_prefix_module_id<'a>(
1711        &'a mut self,
1712        module_instance_id: ModuleInstanceId,
1713    ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1714    where
1715        'tx: 'a,
1716    {
1717        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1718        let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1719        (
1720            DatabaseTransaction {
1721                tx: Box::new(PrefixDatabaseTransaction {
1722                    inner: &mut self.tx,
1723                    global_dbtx_access_token: Some(global_dbtx_access_token),
1724                    prefix,
1725                }),
1726                decoders: self.decoders.clone(),
1727                commit_tracker: match self.commit_tracker {
1728                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1729                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1730                },
1731                on_commit_hooks: match self.on_commit_hooks {
1732                    MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1733                    MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1734                },
1735                capability: self.capability,
1736            },
1737            global_dbtx_access_token,
1738        )
1739    }
1740
1741    /// Is this `Database` a global, unpartitioned `Database`
1742    pub fn is_global(&self) -> bool {
1743        self.tx.is_global()
1744    }
1745
1746    /// `Err` if [`Self::is_global`] is not true
1747    pub fn ensure_global(&self) -> DatabaseResult<()> {
1748        if !self.is_global() {
1749            return Err(DatabaseError::Other(anyhow::anyhow!(
1750                "Database instance not global"
1751            )));
1752        }
1753
1754        Ok(())
1755    }
1756
1757    /// `Err` if [`Self::is_global`] is true
1758    pub fn ensure_isolated(&self) -> DatabaseResult<()> {
1759        if self.is_global() {
1760            return Err(DatabaseError::Other(anyhow::anyhow!(
1761                "Database instance not isolated"
1762            )));
1763        }
1764
1765        Ok(())
1766    }
1767
1768    /// Cancel the tx to avoid debugging warnings about uncommitted writes
1769    pub fn ignore_uncommitted(&mut self) -> &mut Self {
1770        self.commit_tracker.ignore_uncommitted = true;
1771        self
1772    }
1773
1774    /// Create warnings about uncommitted writes
1775    pub fn warn_uncommitted(&mut self) -> &mut Self {
1776        self.commit_tracker.ignore_uncommitted = false;
1777        self
1778    }
1779
1780    /// Register a hook that will be run after commit succeeds.
1781    #[instrument(target = LOG_DB, level = "trace", skip_all)]
1782    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1783        self.on_commit_hooks.push(Box::new(f));
1784    }
1785
1786    pub fn global_dbtx<'a>(
1787        &'a mut self,
1788        access_token: GlobalDBTxAccessToken,
1789    ) -> DatabaseTransaction<'a, Cap>
1790    where
1791        'tx: 'a,
1792    {
1793        let decoders = self.decoders.clone();
1794
1795        DatabaseTransaction {
1796            tx: Box::new(self.tx.global_dbtx(access_token)),
1797            decoders,
1798            commit_tracker: match self.commit_tracker {
1799                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1800                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1801            },
1802            on_commit_hooks: match self.on_commit_hooks {
1803                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1804                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1805            },
1806            capability: self.capability,
1807        }
1808    }
1809}
1810
1811/// Code used to access `global_dbtx`
1812#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1813pub struct GlobalDBTxAccessToken(u32);
1814
1815impl GlobalDBTxAccessToken {
1816    /// Calculate an access code for accessing global_dbtx from a prefixed
1817    /// database tx
1818    ///
1819    /// Since we need to do it at runtime, we want the user modules not to be
1820    /// able to call `global_dbtx` too easily. But at the same time we don't
1821    /// need to be paranoid.
1822    ///
1823    /// This must be deterministic during whole instance of the software running
1824    /// (because it's being rederived independently in multiple codepahs) , but
1825    /// it could be somewhat randomized between different runs and releases.
1826    fn from_prefix(prefix: &[u8]) -> Self {
1827        Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1828    }
1829}
1830
1831impl<'tx> DatabaseTransaction<'tx, Committable> {
1832    pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1833        Self {
1834            tx: dbtx,
1835            decoders,
1836            commit_tracker: MaybeRef::Owned(CommitTracker {
1837                is_committed: false,
1838                has_writes: false,
1839                ignore_uncommitted: false,
1840            }),
1841            on_commit_hooks: MaybeRef::Owned(vec![]),
1842            capability: PhantomData,
1843        }
1844    }
1845
1846    pub async fn commit_tx_result(mut self) -> DatabaseResult<()> {
1847        self.commit_tracker.is_committed = true;
1848        let commit_result = self.tx.commit_tx().await;
1849
1850        // Run commit hooks in case commit was successful
1851        if commit_result.is_ok() {
1852            for hook in self.on_commit_hooks.deref_mut().drain(..) {
1853                hook();
1854            }
1855        }
1856
1857        commit_result
1858    }
1859
1860    pub async fn commit_tx(mut self) {
1861        self.commit_tracker.is_committed = true;
1862        self.commit_tx_result()
1863            .await
1864            .expect("Unrecoverable error occurred while committing to the database.");
1865    }
1866}
1867
1868#[apply(async_trait_maybe_send!)]
1869impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1870where
1871    Cap: Send,
1872{
1873    async fn raw_insert_bytes(
1874        &mut self,
1875        key: &[u8],
1876        value: &[u8],
1877    ) -> DatabaseResult<Option<Vec<u8>>> {
1878        self.commit_tracker.has_writes = true;
1879        self.tx.raw_insert_bytes(key, value).await
1880    }
1881
1882    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1883        self.tx.raw_get_bytes(key).await
1884    }
1885
1886    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1887        self.tx.raw_remove_entry(key).await
1888    }
1889
1890    async fn raw_find_by_range(
1891        &mut self,
1892        key_range: Range<&[u8]>,
1893    ) -> DatabaseResult<PrefixStream<'_>> {
1894        self.tx.raw_find_by_range(key_range).await
1895    }
1896
1897    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1898        self.tx.raw_find_by_prefix(key_prefix).await
1899    }
1900
1901    async fn raw_find_by_prefix_sorted_descending(
1902        &mut self,
1903        key_prefix: &[u8],
1904    ) -> DatabaseResult<PrefixStream<'_>> {
1905        self.tx
1906            .raw_find_by_prefix_sorted_descending(key_prefix)
1907            .await
1908    }
1909
1910    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1911        self.commit_tracker.has_writes = true;
1912        self.tx.raw_remove_by_prefix(key_prefix).await
1913    }
1914}
1915impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {}
1916
1917impl<T> DatabaseKeyPrefix for T
1918where
1919    T: DatabaseLookup + crate::encoding::Encodable + Debug,
1920{
1921    fn to_bytes(&self) -> Vec<u8> {
1922        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1923        data.append(&mut self.consensus_encode_to_vec());
1924        data
1925    }
1926}
1927
1928impl<T> DatabaseKey for T
1929where
1930    // Note: key can only be `T` that can be decoded without modules (even if
1931    // module type is `()`)
1932    T: DatabaseRecord + crate::encoding::Decodable + Sized,
1933{
1934    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1935    fn from_bytes(
1936        data: &[u8],
1937        modules: &ModuleDecoderRegistry,
1938    ) -> std::result::Result<Self, DecodingError> {
1939        if data.is_empty() {
1940            // TODO: build better coding errors, pretty useless right now
1941            return Err(DecodingError::wrong_length(1, 0));
1942        }
1943
1944        if data[0] != Self::DB_PREFIX {
1945            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1946        }
1947
1948        <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1949            .map_err(|decode_error| DecodingError::Other(decode_error.0))
1950    }
1951}
1952
1953impl<T> DatabaseValue for T
1954where
1955    T: Debug + Encodable + Decodable,
1956{
1957    fn from_bytes(
1958        data: &[u8],
1959        modules: &ModuleDecoderRegistry,
1960    ) -> std::result::Result<Self, DecodingError> {
1961        T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1962    }
1963
1964    fn to_bytes(&self) -> Vec<u8> {
1965        self.consensus_encode_to_vec()
1966    }
1967}
1968
1969/// This is a helper macro that generates the implementations of
1970/// `DatabaseRecord` necessary for reading/writing to the
1971/// database and fetching by prefix.
1972///
1973/// - `key`: This is the type of struct that will be used as the key into the
1974///   database
1975/// - `value`: This is the type of struct that will be used as the value into
1976///   the database
1977/// - `db_prefix`: Required enum expression that is represented as a `u8` and is
1978///   prepended to this key
1979/// - `query_prefix`: Optional type of struct that can be passed zero or more
1980///   times. Every query prefix can be used to query the database via
1981///   `find_by_prefix`
1982///
1983/// # Examples
1984///
1985/// ```
1986/// use fedimint_core::encoding::{Decodable, Encodable};
1987/// use fedimint_core::impl_db_record;
1988///
1989/// #[derive(Debug, Encodable, Decodable)]
1990/// struct MyKey;
1991///
1992/// #[derive(Debug, Encodable, Decodable)]
1993/// struct MyValue;
1994///
1995/// #[repr(u8)]
1996/// #[derive(Clone, Debug)]
1997/// pub enum DbKeyPrefix {
1998///     MyKey = 0x50,
1999/// }
2000///
2001/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
2002/// ```
2003///
2004/// Use the required parameters and specify one `query_prefix`
2005///
2006/// ```
2007/// use fedimint_core::encoding::{Decodable, Encodable};
2008/// use fedimint_core::{impl_db_lookup, impl_db_record};
2009///
2010/// #[derive(Debug, Encodable, Decodable)]
2011/// struct MyKey;
2012///
2013/// #[derive(Debug, Encodable, Decodable)]
2014/// struct MyValue;
2015///
2016/// #[repr(u8)]
2017/// #[derive(Clone, Debug)]
2018/// pub enum DbKeyPrefix {
2019///     MyKey = 0x50,
2020/// }
2021///
2022/// #[derive(Debug, Encodable, Decodable)]
2023/// struct MyKeyPrefix;
2024///
2025/// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
2026///
2027/// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
2028/// ```
2029#[macro_export]
2030macro_rules! impl_db_record {
2031    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2032        impl $crate::db::DatabaseRecord for $key {
2033            const DB_PREFIX: u8 = $db_prefix as u8;
2034            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2035            type Key = Self;
2036            type Value = $val;
2037        }
2038        $(
2039            impl_db_record! {
2040                @impl_notify_marker key = $key, notify_on_modify = $notify
2041            }
2042        )?
2043    };
2044    // if notify is set to true
2045    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2046        impl $crate::db::DatabaseKeyWithNotify for $key {}
2047    };
2048    // if notify is set to false
2049    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2050}
2051
2052#[macro_export]
2053macro_rules! impl_db_lookup{
2054    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2055        $(
2056            impl $crate::db::DatabaseLookup for $query_prefix {
2057                type Record = $key;
2058            }
2059        )*
2060    };
2061}
2062
2063/// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
2064#[derive(Debug, Encodable, Decodable, Serialize)]
2065pub struct DatabaseVersionKeyV0;
2066
2067#[derive(Debug, Encodable, Decodable, Serialize)]
2068pub struct DatabaseVersionKey(pub ModuleInstanceId);
2069
2070#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2071pub struct DatabaseVersion(pub u64);
2072
2073impl_db_record!(
2074    key = DatabaseVersionKeyV0,
2075    value = DatabaseVersion,
2076    db_prefix = DbKeyPrefix::DatabaseVersion
2077);
2078
2079impl_db_record!(
2080    key = DatabaseVersionKey,
2081    value = DatabaseVersion,
2082    db_prefix = DbKeyPrefix::DatabaseVersion
2083);
2084
2085impl std::fmt::Display for DatabaseVersion {
2086    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2087        write!(f, "{}", self.0)
2088    }
2089}
2090
2091impl DatabaseVersion {
2092    pub fn increment(&self) -> Self {
2093        Self(self.0 + 1)
2094    }
2095}
2096
2097impl std::fmt::Display for DbKeyPrefix {
2098    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2099        write!(f, "{self:?}")
2100    }
2101}
2102
2103#[repr(u8)]
2104#[derive(Clone, EnumIter, Debug)]
2105pub enum DbKeyPrefix {
2106    DatabaseVersion = 0x50,
2107    ClientBackup = 0x51,
2108}
2109
2110#[derive(Debug, Error)]
2111pub enum DecodingError {
2112    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2113    WrongPrefix { expected: u8, found: u8 },
2114    #[error("Key had a wrong length, expected {expected} but got {found}")]
2115    WrongLength { expected: usize, found: usize },
2116    #[error("Other decoding error: {0:#}")]
2117    Other(anyhow::Error),
2118}
2119
2120impl DecodingError {
2121    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2122        Self::Other(anyhow::Error::from(error))
2123    }
2124
2125    pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2126        Self::WrongPrefix { expected, found }
2127    }
2128
2129    pub fn wrong_length(expected: usize, found: usize) -> Self {
2130        Self::WrongLength { expected, found }
2131    }
2132}
2133
2134/// Error type for database operations
2135#[derive(Debug, Error)]
2136pub enum DatabaseError {
2137    /// Write-write conflict during optimistic transaction commit.
2138    /// This occurs when two transactions attempt to modify the same key.
2139    #[error("Write-write conflict detected")]
2140    WriteConflict,
2141
2142    /// The transaction has already been consumed (committed or dropped).
2143    /// Operations cannot be performed on a consumed transaction.
2144    #[error("Transaction already consumed")]
2145    TransactionConsumed,
2146
2147    /// Error from the underlying database backend (e.g., RocksDB I/O errors).
2148    #[error("Database backend error: {0}")]
2149    DatabaseBackend(#[from] Box<dyn Error + Send + Sync>),
2150
2151    /// Other database error
2152    #[error("Database error: {0:#}")]
2153    Other(anyhow::Error),
2154}
2155
2156impl DatabaseError {
2157    /// Create a DatabaseError from any error type
2158    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2159        Self::Other(anyhow::Error::from(error))
2160    }
2161
2162    /// Create a DatabaseBackend error from any error type
2163    pub fn backend<E: Error + Send + Sync + 'static>(error: E) -> Self {
2164        Self::DatabaseBackend(Box::new(error))
2165    }
2166}
2167
2168impl From<anyhow::Error> for DatabaseError {
2169    fn from(error: anyhow::Error) -> Self {
2170        Self::Other(error)
2171    }
2172}
2173
2174#[macro_export]
2175macro_rules! push_db_pair_items {
2176    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2177        let db_items =
2178            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2179                .await
2180                .map(|(key, val)| {
2181                    (
2182                        $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2183                        val,
2184                    )
2185                })
2186                .collect::<BTreeMap<String, $value_type>>()
2187                .await;
2188
2189        $map.insert($key_literal.to_string(), Box::new(db_items));
2190    };
2191}
2192
2193#[macro_export]
2194macro_rules! push_db_key_items {
2195    ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2196        let db_items =
2197            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2198                .await
2199                .map(|(key, _)| key)
2200                .collect::<Vec<$key_type>>()
2201                .await;
2202
2203        $map.insert($key_literal.to_string(), Box::new(db_items));
2204    };
2205}
2206
2207/// Context passed to the db migration _functions_ (pay attention to `Fn` in the
2208/// name)
2209///
2210/// Typically should not be referred to directly, and instead by a type-alias,
2211/// where the inner-context is set.
2212///
2213/// Notably it has the (optional) module id (inaccessible to the modules
2214/// directly, but used internally) and an inner context `C` injected by the
2215/// outer-layer.
2216///
2217/// `C` is generic, as in different layers / scopes (server vs client, etc.) a
2218/// different (module-typed, type erased, server/client, etc.) contexts might be
2219/// needed, while the database migration logic is kind of generic over that.
2220pub struct DbMigrationFnContext<'tx, C> {
2221    dbtx: DatabaseTransaction<'tx>,
2222    module_instance_id: Option<ModuleInstanceId>,
2223    ctx: C,
2224    __please_use_constructor: (),
2225}
2226
2227impl<'tx, C> DbMigrationFnContext<'tx, C> {
2228    pub fn new(
2229        dbtx: DatabaseTransaction<'tx>,
2230        module_instance_id: Option<ModuleInstanceId>,
2231        ctx: C,
2232    ) -> Self {
2233        dbtx.ensure_global().expect("Must pass global dbtx");
2234        Self {
2235            dbtx,
2236            module_instance_id,
2237            ctx,
2238            // this is a constructor
2239            __please_use_constructor: (),
2240        }
2241    }
2242
2243    pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2244        DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2245    }
2246
2247    // TODO: this method is currently visible to the module itself, and it shouldn't
2248    #[doc(hidden)]
2249    pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2250        let Self { dbtx, ctx, .. } = self;
2251
2252        (dbtx, ctx)
2253    }
2254
2255    pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2256        if let Some(module_instance_id) = self.module_instance_id {
2257            self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2258        } else {
2259            self.dbtx.to_ref_nc()
2260        }
2261    }
2262
2263    // TODO: this method is currently visible to the module itself, and it shouldn't
2264    #[doc(hidden)]
2265    pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2266        self.module_instance_id
2267    }
2268}
2269
2270/// [`DbMigrationFn`] with no extra context (ATM gateway)
2271pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2272pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2273
2274/// [`DbMigrationFn`] used by core client
2275///
2276/// NOTE: client _module_ migrations are handled using separate structs due to
2277/// state machine migrations
2278pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2279pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2280
2281/// `CoreMigrationFn` that modules can implement to "migrate" the database
2282/// to the next database version.
2283///
2284/// It is parametrized over `C` (contents), which is extra data/type/interface
2285/// custom for different part of the codebase, e.g.:
2286///
2287/// * server core
2288/// * server modules
2289/// * client core
2290/// * gateway core
2291pub type DbMigrationFn<C> = Box<
2292    maybe_add_send_sync!(
2293        dyn for<'tx> Fn(
2294            DbMigrationFnContext<'tx, C>,
2295        ) -> Pin<
2296            Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2297        >
2298    ),
2299>;
2300
2301/// Verifies that all database migrations are defined contiguously and returns
2302/// the "current" database version, which is one greater than the last key in
2303/// the map.
2304pub fn get_current_database_version<F>(
2305    migrations: &BTreeMap<DatabaseVersion, F>,
2306) -> DatabaseVersion {
2307    let versions = migrations.keys().copied().collect::<Vec<_>>();
2308
2309    // Verify that all database migrations are defined contiguously. If there is a
2310    // gap, this indicates a programming error and we should panic.
2311    if !versions
2312        .windows(2)
2313        .all(|window| window[0].increment() == window[1])
2314    {
2315        panic!("Database Migrations are not defined contiguously");
2316    }
2317
2318    versions
2319        .last()
2320        .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2321}
2322
2323pub async fn apply_migrations<C>(
2324    db: &Database,
2325    ctx: C,
2326    kind: String,
2327    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2328    module_instance_id: Option<ModuleInstanceId>,
2329    // When used in client side context, we can/should ignore keys that external app
2330    // is allowed to use, and but since this function is shared, we make it optional argument
2331    external_prefixes_above: Option<u8>,
2332) -> std::result::Result<(), anyhow::Error>
2333where
2334    C: Clone,
2335{
2336    let mut dbtx = db.begin_transaction().await;
2337    apply_migrations_dbtx(
2338        &mut dbtx.to_ref_nc(),
2339        ctx,
2340        kind,
2341        migrations,
2342        module_instance_id,
2343        external_prefixes_above,
2344    )
2345    .await?;
2346
2347    dbtx.commit_tx_result()
2348        .await
2349        .map_err(|e| anyhow::Error::msg(e.to_string()))
2350}
2351/// `apply_migrations` iterates from the on disk database version for the
2352/// module.
2353///
2354/// `apply_migrations` iterates from the on disk database version for the module
2355/// up to `target_db_version` and executes all of the migrations that exist in
2356/// the migrations map. Each migration in migrations map updates the
2357/// database to have the correct on-disk structures that the code is expecting.
2358/// The entire migration process is atomic (i.e migration from 0->1 and 1->2
2359/// happen atomically). This function is called before the module is initialized
2360/// and as long as the correct migrations are supplied in the migrations map,
2361/// the module will be able to read and write from the database successfully.
2362pub async fn apply_migrations_dbtx<C>(
2363    global_dbtx: &mut DatabaseTransaction<'_>,
2364    ctx: C,
2365    kind: String,
2366    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2367    module_instance_id: Option<ModuleInstanceId>,
2368    // When used in client side context, we can/should ignore keys that external app
2369    // is allowed to use, and but since this function is shared, we make it optional argument
2370    external_prefixes_above: Option<u8>,
2371) -> std::result::Result<(), anyhow::Error>
2372where
2373    C: Clone,
2374{
2375    // Newly created databases will not have any data since they have just been
2376    // instantiated.
2377    let is_new_db = global_dbtx
2378        .raw_find_by_prefix(&[])
2379        .await?
2380        .filter(|(key, _v)| {
2381            std::future::ready(
2382                external_prefixes_above.is_none_or(|external_prefixes_above| {
2383                    !key.is_empty() && key[0] < external_prefixes_above
2384                }),
2385            )
2386        })
2387        .next()
2388        .await
2389        .is_none();
2390
2391    let target_db_version = get_current_database_version(&migrations);
2392
2393    // First write the database version to disk if it does not exist.
2394    create_database_version_dbtx(
2395        global_dbtx,
2396        target_db_version,
2397        module_instance_id,
2398        kind.clone(),
2399        is_new_db,
2400    )
2401    .await?;
2402
2403    let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2404
2405    let disk_version = global_dbtx
2406        .get_value(&DatabaseVersionKey(module_instance_id_key))
2407        .await;
2408
2409    let db_version = if let Some(disk_version) = disk_version {
2410        let mut current_db_version = disk_version;
2411
2412        if current_db_version > target_db_version {
2413            return Err(anyhow::anyhow!(format!(
2414                "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2415            )));
2416        }
2417
2418        while current_db_version < target_db_version {
2419            if let Some(migration) = migrations.get(&current_db_version) {
2420                info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2421                migration(DbMigrationFnContext::new(
2422                    global_dbtx.to_ref_nc(),
2423                    module_instance_id,
2424                    ctx.clone(),
2425                ))
2426                .await?;
2427            } else {
2428                warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2429            }
2430
2431            current_db_version = current_db_version.increment();
2432
2433            global_dbtx
2434                .insert_entry(
2435                    &DatabaseVersionKey(module_instance_id_key),
2436                    &current_db_version,
2437                )
2438                .await;
2439        }
2440
2441        current_db_version
2442    } else {
2443        target_db_version
2444    };
2445
2446    debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2447    Ok(())
2448}
2449
2450pub async fn create_database_version(
2451    db: &Database,
2452    target_db_version: DatabaseVersion,
2453    module_instance_id: Option<ModuleInstanceId>,
2454    kind: String,
2455    is_new_db: bool,
2456) -> std::result::Result<(), anyhow::Error> {
2457    let mut dbtx = db.begin_transaction().await;
2458
2459    create_database_version_dbtx(
2460        &mut dbtx.to_ref_nc(),
2461        target_db_version,
2462        module_instance_id,
2463        kind,
2464        is_new_db,
2465    )
2466    .await?;
2467
2468    dbtx.commit_tx_result().await?;
2469    Ok(())
2470}
2471
2472/// Creates the `DatabaseVersion` inside the database if it does not exist. If
2473/// necessary, this function will migrate the legacy database version to the
2474/// expected `DatabaseVersionKey`.
2475pub async fn create_database_version_dbtx(
2476    global_dbtx: &mut DatabaseTransaction<'_>,
2477    target_db_version: DatabaseVersion,
2478    module_instance_id: Option<ModuleInstanceId>,
2479    kind: String,
2480    is_new_db: bool,
2481) -> std::result::Result<(), anyhow::Error> {
2482    let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2483
2484    // First check if the module has a `DatabaseVersion` written to
2485    // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
2486    // nothing to do.
2487    if global_dbtx
2488        .get_value(&DatabaseVersionKey(key_module_instance_id))
2489        .await
2490        .is_none()
2491    {
2492        // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
2493        // in the module's isolated namespace (but not for fedimint-server or
2494        // fedimint-client).
2495        //
2496        // Otherwise, if the previous database contains data and no legacy database
2497        // version, use `DatabaseVersion(0)` so that all database migrations are
2498        // run. Otherwise, this database can assumed to be new and can use
2499        // `target_db_version` to skip the database migrations.
2500        let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2501            remove_current_db_version_if_exists(
2502                &mut global_dbtx
2503                    .to_ref_with_prefix_module_id(module_instance_id)
2504                    .0
2505                    .into_nc(),
2506                is_new_db,
2507                target_db_version,
2508            )
2509            .await
2510        } else {
2511            remove_current_db_version_if_exists(
2512                &mut global_dbtx.to_ref().into_nc(),
2513                is_new_db,
2514                target_db_version,
2515            )
2516            .await
2517        };
2518
2519        // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2520        debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2521        global_dbtx
2522            .insert_new_entry(
2523                &DatabaseVersionKey(key_module_instance_id),
2524                &current_version_in_module,
2525            )
2526            .await;
2527    }
2528
2529    Ok(())
2530}
2531
2532/// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2533/// returns the current database version. If the current version does not
2534/// exist, use `target_db_version` if the database is new. Otherwise, return
2535/// `DatabaseVersion(0)` to ensure all migrations are run.
2536async fn remove_current_db_version_if_exists(
2537    version_dbtx: &mut DatabaseTransaction<'_>,
2538    is_new_db: bool,
2539    target_db_version: DatabaseVersion,
2540) -> DatabaseVersion {
2541    // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2542    // exist, just use the 0 for the version so that all of the migrations are
2543    // executed.
2544    let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2545    match current_version_in_module {
2546        Some(database_version) => database_version,
2547        None if is_new_db => target_db_version,
2548        None => DatabaseVersion(0),
2549    }
2550}
2551
2552/// Helper function to retrieve the `module_instance_id` for modules, otherwise
2553/// return 0xff for the global namespace.
2554fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2555    // Use 0xff for fedimint-server and the `module_instance_id` for each module
2556    module_instance_id.map_or_else(
2557        || MODULE_GLOBAL_PREFIX.into(),
2558        |module_instance_id| module_instance_id,
2559    )
2560}
2561#[allow(unused_imports)]
2562mod test_utils {
2563    use std::collections::BTreeMap;
2564    use std::time::Duration;
2565
2566    use fedimint_core::db::DbMigrationFnContext;
2567    use futures::future::ready;
2568    use futures::{Future, FutureExt, StreamExt};
2569    use rand::Rng;
2570    use tokio::join;
2571
2572    use super::{
2573        Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2574        DbMigrationFn, apply_migrations,
2575    };
2576    use crate::core::ModuleKind;
2577    use crate::db::mem_impl::MemDatabase;
2578    use crate::db::{
2579        IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2580    };
2581    use crate::encoding::{Decodable, Encodable};
2582    use crate::module::registry::ModuleDecoderRegistry;
2583
2584    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2585        crate::runtime::timeout(Duration::from_millis(10), fut)
2586            .await
2587            .ok()
2588    }
2589
2590    #[repr(u8)]
2591    #[derive(Clone)]
2592    pub enum TestDbKeyPrefix {
2593        Test = 0x42,
2594        AltTest = 0x43,
2595        PercentTestKey = 0x25,
2596    }
2597
2598    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2599    pub(super) struct TestKey(pub u64);
2600
2601    #[derive(Debug, Encodable, Decodable)]
2602    struct DbPrefixTestPrefix;
2603
2604    impl_db_record!(
2605        key = TestKey,
2606        value = TestVal,
2607        db_prefix = TestDbKeyPrefix::Test,
2608        notify_on_modify = true,
2609    );
2610    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2611
2612    #[derive(Debug, Encodable, Decodable)]
2613    struct TestKeyV0(u64, u64);
2614
2615    #[derive(Debug, Encodable, Decodable)]
2616    struct DbPrefixTestPrefixV0;
2617
2618    impl_db_record!(
2619        key = TestKeyV0,
2620        value = TestVal,
2621        db_prefix = TestDbKeyPrefix::Test,
2622    );
2623    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2624
2625    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2626    struct AltTestKey(u64);
2627
2628    #[derive(Debug, Encodable, Decodable)]
2629    struct AltDbPrefixTestPrefix;
2630
2631    impl_db_record!(
2632        key = AltTestKey,
2633        value = TestVal,
2634        db_prefix = TestDbKeyPrefix::AltTest,
2635    );
2636    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2637
2638    #[derive(Debug, Encodable, Decodable)]
2639    struct PercentTestKey(u64);
2640
2641    #[derive(Debug, Encodable, Decodable)]
2642    struct PercentPrefixTestPrefix;
2643
2644    impl_db_record!(
2645        key = PercentTestKey,
2646        value = TestVal,
2647        db_prefix = TestDbKeyPrefix::PercentTestKey,
2648    );
2649
2650    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2651    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2652    pub(super) struct TestVal(pub u64);
2653
2654    const TEST_MODULE_PREFIX: u16 = 1;
2655    const ALT_MODULE_PREFIX: u16 = 2;
2656
2657    pub async fn verify_insert_elements(db: Database) {
2658        let mut dbtx = db.begin_transaction().await;
2659        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2660        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2661        dbtx.commit_tx().await;
2662
2663        // Test values were persisted
2664        let mut dbtx = db.begin_transaction().await;
2665        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2666        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2667        dbtx.commit_tx().await;
2668
2669        // Test overwrites work as expected
2670        let mut dbtx = db.begin_transaction().await;
2671        assert_eq!(
2672            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2673            Some(TestVal(2))
2674        );
2675        assert_eq!(
2676            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2677            Some(TestVal(3))
2678        );
2679        dbtx.commit_tx().await;
2680
2681        let mut dbtx = db.begin_transaction().await;
2682        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2683        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2684        dbtx.commit_tx().await;
2685    }
2686
2687    pub async fn verify_remove_nonexisting(db: Database) {
2688        let mut dbtx = db.begin_transaction().await;
2689        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2690        let removed = dbtx.remove_entry(&TestKey(1)).await;
2691        assert!(removed.is_none());
2692
2693        // Commit to suppress the warning message
2694        dbtx.commit_tx().await;
2695    }
2696
2697    pub async fn verify_remove_existing(db: Database) {
2698        let mut dbtx = db.begin_transaction().await;
2699
2700        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2701
2702        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2703
2704        let removed = dbtx.remove_entry(&TestKey(1)).await;
2705        assert_eq!(removed, Some(TestVal(2)));
2706        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2707
2708        // Commit to suppress the warning message
2709        dbtx.commit_tx().await;
2710    }
2711
2712    pub async fn verify_read_own_writes(db: Database) {
2713        let mut dbtx = db.begin_transaction().await;
2714
2715        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2716
2717        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2718
2719        // Commit to suppress the warning message
2720        dbtx.commit_tx().await;
2721    }
2722
2723    pub async fn verify_prevent_dirty_reads(db: Database) {
2724        let mut dbtx = db.begin_transaction().await;
2725
2726        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2727
2728        // dbtx2 should not be able to see uncommitted changes
2729        let mut dbtx2 = db.begin_transaction().await;
2730        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2731
2732        // Commit to suppress the warning message
2733        dbtx.commit_tx().await;
2734    }
2735
2736    pub async fn verify_find_by_range(db: Database) {
2737        let mut dbtx = db.begin_transaction().await;
2738        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2739        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2740        dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2741
2742        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2743        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2744
2745        {
2746            let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2747            module_dbtx
2748                .insert_entry(&TestKey(300), &TestVal(3000))
2749                .await;
2750        }
2751
2752        dbtx.commit_tx().await;
2753
2754        // Verify finding by prefix returns the correct set of key pairs
2755        let mut dbtx = db.begin_transaction_nc().await;
2756
2757        let returned_keys = dbtx
2758            .find_by_range(TestKey(55)..TestKey(56))
2759            .await
2760            .collect::<Vec<_>>()
2761            .await;
2762
2763        let expected = vec![(TestKey(55), TestVal(9999))];
2764
2765        assert_eq!(returned_keys, expected);
2766
2767        let returned_keys = dbtx
2768            .find_by_range(TestKey(54)..TestKey(56))
2769            .await
2770            .collect::<Vec<_>>()
2771            .await;
2772
2773        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2774        assert_eq!(returned_keys, expected);
2775
2776        let returned_keys = dbtx
2777            .find_by_range(TestKey(54)..TestKey(57))
2778            .await
2779            .collect::<Vec<_>>()
2780            .await;
2781
2782        let expected = vec![
2783            (TestKey(54), TestVal(8888)),
2784            (TestKey(55), TestVal(9999)),
2785            (TestKey(56), TestVal(7777)),
2786        ];
2787        assert_eq!(returned_keys, expected);
2788
2789        let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2790        let test_range = module_dbtx
2791            .find_by_range(TestKey(300)..TestKey(301))
2792            .await
2793            .collect::<Vec<_>>()
2794            .await;
2795        assert!(test_range.len() == 1);
2796    }
2797
2798    pub async fn verify_find_by_prefix(db: Database) {
2799        let mut dbtx = db.begin_transaction().await;
2800        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2801        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2802
2803        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2804        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2805        dbtx.commit_tx().await;
2806
2807        // Verify finding by prefix returns the correct set of key pairs
2808        let mut dbtx = db.begin_transaction().await;
2809
2810        let returned_keys = dbtx
2811            .find_by_prefix(&DbPrefixTestPrefix)
2812            .await
2813            .collect::<Vec<_>>()
2814            .await;
2815
2816        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2817        assert_eq!(returned_keys, expected);
2818
2819        let reversed = dbtx
2820            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2821            .await
2822            .collect::<Vec<_>>()
2823            .await;
2824        let mut reversed_expected = expected;
2825        reversed_expected.reverse();
2826        assert_eq!(reversed, reversed_expected);
2827
2828        let returned_keys = dbtx
2829            .find_by_prefix(&AltDbPrefixTestPrefix)
2830            .await
2831            .collect::<Vec<_>>()
2832            .await;
2833
2834        let expected = vec![
2835            (AltTestKey(54), TestVal(6666)),
2836            (AltTestKey(55), TestVal(7777)),
2837        ];
2838        assert_eq!(returned_keys, expected);
2839
2840        let reversed = dbtx
2841            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2842            .await
2843            .collect::<Vec<_>>()
2844            .await;
2845        let mut reversed_expected = expected;
2846        reversed_expected.reverse();
2847        assert_eq!(reversed, reversed_expected);
2848    }
2849
2850    pub async fn verify_commit(db: Database) {
2851        let mut dbtx = db.begin_transaction().await;
2852
2853        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2854        dbtx.commit_tx().await;
2855
2856        // Verify dbtx2 can see committed transactions
2857        let mut dbtx2 = db.begin_transaction().await;
2858        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2859    }
2860
2861    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2862        let mut dbtx = db.begin_transaction().await;
2863        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2864
2865        let mut dbtx2 = db.begin_transaction().await;
2866
2867        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2868
2869        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2870
2871        dbtx2.commit_tx().await;
2872
2873        // dbtx should still read None because it is operating over a snapshot
2874        // of the data when the transaction started
2875        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2876
2877        let expected_keys = 0;
2878        let returned_keys = dbtx
2879            .find_by_prefix(&DbPrefixTestPrefix)
2880            .await
2881            .fold(0, |returned_keys, (key, value)| async move {
2882                if key == TestKey(100) {
2883                    assert!(value.eq(&TestVal(101)));
2884                }
2885                returned_keys + 1
2886            })
2887            .await;
2888
2889        assert_eq!(returned_keys, expected_keys);
2890    }
2891
2892    pub async fn verify_snapshot_isolation(db: Database) {
2893        async fn random_yield() {
2894            let times = if rand::thread_rng().gen_bool(0.5) {
2895                0
2896            } else {
2897                10
2898            };
2899            for _ in 0..times {
2900                tokio::task::yield_now().await;
2901            }
2902        }
2903
2904        // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2905        for i in 0..1000 {
2906            let base_key = i * 2;
2907            let tx_accepted_key = base_key;
2908            let spent_input_key = base_key + 1;
2909
2910            join!(
2911                async {
2912                    random_yield().await;
2913                    let mut dbtx = db.begin_transaction().await;
2914
2915                    random_yield().await;
2916                    let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2917                    random_yield().await;
2918                    // we have 4 operations that can give you the db key,
2919                    // try all of them
2920                    let s = match i % 5 {
2921                        0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2922                        1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2923                        2 => {
2924                            dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2925                                .await
2926                        }
2927                        3 => {
2928                            dbtx.find_by_prefix(&DbPrefixTestPrefix)
2929                                .await
2930                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2931                                .map(|(_k, v)| v)
2932                                .next()
2933                                .await
2934                        }
2935                        4 => {
2936                            dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2937                                .await
2938                                .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2939                                .map(|(_k, v)| v)
2940                                .next()
2941                                .await
2942                        }
2943                        _ => {
2944                            panic!("woot?");
2945                        }
2946                    };
2947
2948                    match (a, s) {
2949                        (None, None) | (Some(_), Some(_)) => {}
2950                        (None, Some(_)) => panic!("none some?! {i}"),
2951                        (Some(_), None) => panic!("some none?! {i}"),
2952                    }
2953                },
2954                async {
2955                    random_yield().await;
2956
2957                    let mut dbtx = db.begin_transaction().await;
2958                    random_yield().await;
2959                    assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2960
2961                    random_yield().await;
2962                    assert_eq!(
2963                        dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2964                            .await,
2965                        None
2966                    );
2967
2968                    random_yield().await;
2969                    assert_eq!(
2970                        dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2971                            .await,
2972                        None
2973                    );
2974                    random_yield().await;
2975                    dbtx.commit_tx().await;
2976                }
2977            );
2978        }
2979    }
2980
2981    pub async fn verify_phantom_entry(db: Database) {
2982        let mut dbtx = db.begin_transaction().await;
2983
2984        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2985
2986        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2987
2988        dbtx.commit_tx().await;
2989
2990        let mut dbtx = db.begin_transaction().await;
2991        let expected_keys = 2;
2992        let returned_keys = dbtx
2993            .find_by_prefix(&DbPrefixTestPrefix)
2994            .await
2995            .fold(0, |returned_keys, (key, value)| async move {
2996                match key {
2997                    TestKey(100) => {
2998                        assert!(value.eq(&TestVal(101)));
2999                    }
3000                    TestKey(101) => {
3001                        assert!(value.eq(&TestVal(102)));
3002                    }
3003                    _ => {}
3004                }
3005                returned_keys + 1
3006            })
3007            .await;
3008
3009        assert_eq!(returned_keys, expected_keys);
3010
3011        let mut dbtx2 = db.begin_transaction().await;
3012
3013        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3014
3015        dbtx2.commit_tx().await;
3016
3017        let returned_keys = dbtx
3018            .find_by_prefix(&DbPrefixTestPrefix)
3019            .await
3020            .fold(0, |returned_keys, (key, value)| async move {
3021                match key {
3022                    TestKey(100) => {
3023                        assert!(value.eq(&TestVal(101)));
3024                    }
3025                    TestKey(101) => {
3026                        assert!(value.eq(&TestVal(102)));
3027                    }
3028                    _ => {}
3029                }
3030                returned_keys + 1
3031            })
3032            .await;
3033
3034        assert_eq!(returned_keys, expected_keys);
3035    }
3036
3037    pub async fn expect_write_conflict(db: Database) {
3038        let mut dbtx = db.begin_transaction().await;
3039        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3040        dbtx.commit_tx().await;
3041
3042        let mut dbtx2 = db.begin_transaction().await;
3043        let mut dbtx3 = db.begin_transaction().await;
3044
3045        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3046
3047        // Depending on if the database implementation supports optimistic or
3048        // pessimistic transactions, this test should generate an error here
3049        // (pessimistic) or at commit time (optimistic)
3050        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3051
3052        dbtx2.commit_tx().await;
3053        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3054    }
3055
3056    pub async fn verify_string_prefix(db: Database) {
3057        let mut dbtx = db.begin_transaction().await;
3058        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3059
3060        assert_eq!(
3061            dbtx.get_value(&PercentTestKey(100)).await,
3062            Some(TestVal(101))
3063        );
3064
3065        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3066
3067        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3068
3069        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3070
3071        // If the wildcard character ('%') is not handled properly, this will make
3072        // find_by_prefix return 5 results instead of 4
3073        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3074
3075        let expected_keys = 4;
3076        let returned_keys = dbtx
3077            .find_by_prefix(&PercentPrefixTestPrefix)
3078            .await
3079            .fold(0, |returned_keys, (key, value)| async move {
3080                if matches!(key, PercentTestKey(101)) {
3081                    assert!(value.eq(&TestVal(100)));
3082                }
3083                returned_keys + 1
3084            })
3085            .await;
3086
3087        assert_eq!(returned_keys, expected_keys);
3088    }
3089
3090    pub async fn verify_remove_by_prefix(db: Database) {
3091        let mut dbtx = db.begin_transaction().await;
3092
3093        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3094
3095        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3096
3097        dbtx.commit_tx().await;
3098
3099        let mut remove_dbtx = db.begin_transaction().await;
3100        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3101        remove_dbtx.commit_tx().await;
3102
3103        let mut dbtx = db.begin_transaction().await;
3104        let expected_keys = 0;
3105        let returned_keys = dbtx
3106            .find_by_prefix(&DbPrefixTestPrefix)
3107            .await
3108            .fold(0, |returned_keys, (key, value)| async move {
3109                match key {
3110                    TestKey(100) => {
3111                        assert!(value.eq(&TestVal(101)));
3112                    }
3113                    TestKey(101) => {
3114                        assert!(value.eq(&TestVal(102)));
3115                    }
3116                    _ => {}
3117                }
3118                returned_keys + 1
3119            })
3120            .await;
3121
3122        assert_eq!(returned_keys, expected_keys);
3123    }
3124
3125    pub async fn verify_module_db(db: Database, module_db: Database) {
3126        let mut dbtx = db.begin_transaction().await;
3127
3128        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3129
3130        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3131
3132        dbtx.commit_tx().await;
3133
3134        // verify module_dbtx can only read key/value pairs from its own module
3135        let mut module_dbtx = module_db.begin_transaction().await;
3136        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3137
3138        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3139
3140        // verify module_dbtx can read key/value pairs that it wrote
3141        let mut dbtx = db.begin_transaction().await;
3142        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3143
3144        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3145
3146        let mut module_dbtx = module_db.begin_transaction().await;
3147
3148        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3149
3150        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3151
3152        module_dbtx.commit_tx().await;
3153
3154        let expected_keys = 2;
3155        let mut dbtx = db.begin_transaction().await;
3156        let returned_keys = dbtx
3157            .find_by_prefix(&DbPrefixTestPrefix)
3158            .await
3159            .fold(0, |returned_keys, (key, value)| async move {
3160                match key {
3161                    TestKey(100) => {
3162                        assert!(value.eq(&TestVal(101)));
3163                    }
3164                    TestKey(101) => {
3165                        assert!(value.eq(&TestVal(102)));
3166                    }
3167                    _ => {}
3168                }
3169                returned_keys + 1
3170            })
3171            .await;
3172
3173        assert_eq!(returned_keys, expected_keys);
3174
3175        let removed = dbtx.remove_entry(&TestKey(100)).await;
3176        assert_eq!(removed, Some(TestVal(101)));
3177        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3178
3179        let mut module_dbtx = module_db.begin_transaction().await;
3180        assert_eq!(
3181            module_dbtx.get_value(&TestKey(100)).await,
3182            Some(TestVal(103))
3183        );
3184    }
3185
3186    pub async fn verify_module_prefix(db: Database) {
3187        let mut test_dbtx = db.begin_transaction().await;
3188        {
3189            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3190
3191            test_module_dbtx
3192                .insert_entry(&TestKey(100), &TestVal(101))
3193                .await;
3194
3195            test_module_dbtx
3196                .insert_entry(&TestKey(101), &TestVal(102))
3197                .await;
3198        }
3199
3200        test_dbtx.commit_tx().await;
3201
3202        let mut alt_dbtx = db.begin_transaction().await;
3203        {
3204            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3205
3206            alt_module_dbtx
3207                .insert_entry(&TestKey(100), &TestVal(103))
3208                .await;
3209
3210            alt_module_dbtx
3211                .insert_entry(&TestKey(101), &TestVal(104))
3212                .await;
3213        }
3214
3215        alt_dbtx.commit_tx().await;
3216
3217        // verify test_module_dbtx can only see key/value pairs from its own module
3218        let mut test_dbtx = db.begin_transaction().await;
3219        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3220        assert_eq!(
3221            test_module_dbtx.get_value(&TestKey(100)).await,
3222            Some(TestVal(101))
3223        );
3224
3225        assert_eq!(
3226            test_module_dbtx.get_value(&TestKey(101)).await,
3227            Some(TestVal(102))
3228        );
3229
3230        let expected_keys = 2;
3231        let returned_keys = test_module_dbtx
3232            .find_by_prefix(&DbPrefixTestPrefix)
3233            .await
3234            .fold(0, |returned_keys, (key, value)| async move {
3235                match key {
3236                    TestKey(100) => {
3237                        assert!(value.eq(&TestVal(101)));
3238                    }
3239                    TestKey(101) => {
3240                        assert!(value.eq(&TestVal(102)));
3241                    }
3242                    _ => {}
3243                }
3244                returned_keys + 1
3245            })
3246            .await;
3247
3248        assert_eq!(returned_keys, expected_keys);
3249
3250        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3251        assert_eq!(removed, Some(TestVal(101)));
3252        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3253
3254        // test_dbtx on its own wont find the key because it does not use a module
3255        // prefix
3256        let mut test_dbtx = db.begin_transaction().await;
3257        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3258
3259        test_dbtx.commit_tx().await;
3260    }
3261
3262    #[cfg(test)]
3263    #[tokio::test]
3264    pub async fn verify_test_migration() {
3265        // Insert a bunch of old dummy data that needs to be migrated to a new version
3266        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3267        let expected_test_keys_size: usize = 100;
3268        let mut dbtx = db.begin_transaction().await;
3269        for i in 0..expected_test_keys_size {
3270            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3271                .await;
3272        }
3273
3274        // Will also be migrated to `DatabaseVersionKey`
3275        dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3276            .await;
3277        dbtx.commit_tx().await;
3278
3279        let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3280
3281        migrations.insert(
3282            DatabaseVersion(0),
3283            Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3284        );
3285
3286        apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3287            .await
3288            .expect("Error applying migrations for TestModule");
3289
3290        // Verify that the migrations completed successfully
3291        let mut dbtx = db.begin_transaction().await;
3292
3293        // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
3294        // to `DatabaseVersionKey`
3295        assert!(
3296            dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3297                .await
3298                .is_some()
3299        );
3300
3301        // Verify Dummy module migration
3302        let test_keys = dbtx
3303            .find_by_prefix(&DbPrefixTestPrefix)
3304            .await
3305            .collect::<Vec<_>>()
3306            .await;
3307        let test_keys_size = test_keys.len();
3308        assert_eq!(test_keys_size, expected_test_keys_size);
3309        for (key, val) in test_keys {
3310            assert_eq!(key.0, val.0 + 1);
3311        }
3312    }
3313
3314    #[allow(dead_code)]
3315    async fn migrate_test_db_version_0(
3316        mut ctx: DbMigrationFnContext<'_, ()>,
3317    ) -> std::result::Result<(), anyhow::Error> {
3318        let mut dbtx = ctx.dbtx();
3319        let example_keys_v0 = dbtx
3320            .find_by_prefix(&DbPrefixTestPrefixV0)
3321            .await
3322            .collect::<Vec<_>>()
3323            .await;
3324        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3325        for (key, val) in example_keys_v0 {
3326            let key_v2 = TestKey(key.1);
3327            dbtx.insert_new_entry(&key_v2, &val).await;
3328        }
3329        Ok(())
3330    }
3331
3332    #[cfg(test)]
3333    #[tokio::test]
3334    async fn test_autocommit() {
3335        use std::marker::PhantomData;
3336        use std::ops::Range;
3337        use std::path::Path;
3338
3339        use anyhow::anyhow;
3340        use async_trait::async_trait;
3341
3342        use crate::ModuleDecoderRegistry;
3343        use crate::db::{
3344            AutocommitError, BaseDatabaseTransaction, DatabaseError, DatabaseResult,
3345            IDatabaseTransaction, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
3346            IRawDatabase, IRawDatabaseTransaction,
3347        };
3348
3349        #[derive(Debug)]
3350        struct FakeDatabase;
3351
3352        #[async_trait]
3353        impl IRawDatabase for FakeDatabase {
3354            type Transaction<'a> = FakeTransaction<'a>;
3355            async fn begin_transaction(&self) -> FakeTransaction {
3356                FakeTransaction(PhantomData)
3357            }
3358
3359            fn checkpoint(&self, _backup_path: &Path) -> DatabaseResult<()> {
3360                Ok(())
3361            }
3362        }
3363
3364        #[derive(Debug)]
3365        struct FakeTransaction<'a>(PhantomData<&'a ()>);
3366
3367        #[async_trait]
3368        impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3369            async fn raw_insert_bytes(
3370                &mut self,
3371                _key: &[u8],
3372                _value: &[u8],
3373            ) -> DatabaseResult<Option<Vec<u8>>> {
3374                unimplemented!()
3375            }
3376
3377            async fn raw_get_bytes(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3378                unimplemented!()
3379            }
3380
3381            async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3382                unimplemented!()
3383            }
3384
3385            async fn raw_find_by_range(
3386                &mut self,
3387                _key_range: Range<&[u8]>,
3388            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3389                unimplemented!()
3390            }
3391
3392            async fn raw_find_by_prefix(
3393                &mut self,
3394                _key_prefix: &[u8],
3395            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3396                unimplemented!()
3397            }
3398
3399            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
3400                unimplemented!()
3401            }
3402
3403            async fn raw_find_by_prefix_sorted_descending(
3404                &mut self,
3405                _key_prefix: &[u8],
3406            ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3407                unimplemented!()
3408            }
3409        }
3410
3411        impl IDatabaseTransactionOps for FakeTransaction<'_> {}
3412
3413        #[async_trait]
3414        impl IRawDatabaseTransaction for FakeTransaction<'_> {
3415            async fn commit_tx(self) -> DatabaseResult<()> {
3416                use crate::db::DatabaseError;
3417
3418                Err(DatabaseError::Other(anyhow::anyhow!("Can't commit!")))
3419            }
3420        }
3421
3422        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3423        let err = db
3424            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3425            .await
3426            .unwrap_err();
3427
3428        match err {
3429            AutocommitError::CommitFailed {
3430                attempts: failed_attempts,
3431                ..
3432            } => {
3433                assert_eq!(failed_attempts, 5);
3434            }
3435            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3436        }
3437    }
3438}
3439
3440pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3441    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3442    decoders: ModuleDecoderRegistry,
3443    key_prefix: &KP,
3444) -> impl Stream<
3445    Item = (
3446        KP::Record,
3447        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3448    ),
3449>
3450+ 'r
3451+ use<'r, KP>
3452where
3453    'inner: 'r,
3454    KP: DatabaseLookup,
3455    KP::Record: DatabaseKey,
3456{
3457    debug!(target: LOG_DB, "find by prefix sorted descending");
3458    let prefix_bytes = key_prefix.to_bytes();
3459    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3460        .await
3461        .expect("Error doing prefix search in database")
3462        .map(move |(key_bytes, value_bytes)| {
3463            let key = decode_key_expect(&key_bytes, &decoders);
3464            let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3465            (key, value)
3466        })
3467}
3468
3469pub async fn verify_module_db_integrity_dbtx(
3470    dbtx: &mut DatabaseTransaction<'_>,
3471    module_id: ModuleInstanceId,
3472    module_kind: ModuleKind,
3473    prefixes: &BTreeSet<u8>,
3474) {
3475    let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3476    if module_id < 250 {
3477        assert_eq!(module_db_prefix.len(), 2);
3478    }
3479    let mut records = dbtx
3480        .raw_find_by_prefix(&module_db_prefix)
3481        .await
3482        .expect("DB fail");
3483    while let Some((k, v)) = records.next().await {
3484        assert!(
3485            prefixes.contains(&k[module_db_prefix.len()]),
3486            "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3487            k.as_hex(),
3488            v.as_hex()
3489        );
3490    }
3491}
3492
3493#[cfg(test)]
3494mod tests;