1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146 fn to_bytes(&self) -> Vec<u8>;
147}
148
149pub trait DatabaseRecord: DatabaseKeyPrefix {
152 const DB_PREFIX: u8;
153 const NOTIFY_ON_MODIFY: bool = false;
154 type Key: DatabaseKey + Debug;
155 type Value: DatabaseValue + Debug;
156}
157
158pub trait DatabaseLookup: DatabaseKeyPrefix {
161 type Record: DatabaseRecord;
162}
163
164impl<Record> DatabaseLookup for Record
166where
167 Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169 type Record = Record;
170}
171
172pub trait DatabaseKey: Sized {
175 const NOTIFY_ON_MODIFY: bool = false;
183 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186pub trait DatabaseKeyWithNotify {}
188
189pub trait DatabaseValue: Sized + Debug {
191 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192 fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205 #[error("Commit Failed: {last_error}")]
207 CommitFailed {
208 attempts: usize,
210 last_error: anyhow::Error,
212 },
213 #[error("Closure error: {error}")]
216 ClosureError {
217 attempts: usize,
223 error: E,
225 },
226}
227
228pub trait AutocommitResultExt<T, E> {
229 fn unwrap_autocommit(self) -> Result<T, E>;
233}
234
235impl<T, E> AutocommitResultExt<T, E> for Result<T, AutocommitError<E>> {
236 fn unwrap_autocommit(self) -> Result<T, E> {
237 match self {
238 Ok(value) => Ok(value),
239 Err(AutocommitError::CommitFailed { .. }) => {
240 panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
241 }
242 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
243 }
244 }
245}
246
247#[apply(async_trait_maybe_send!)]
256pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
257 type Transaction<'a>: IRawDatabaseTransaction + Debug;
259
260 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
262
263 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
265}
266
267#[apply(async_trait_maybe_send!)]
268impl<T> IRawDatabase for Box<T>
269where
270 T: IRawDatabase,
271{
272 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
273
274 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
275 (**self).begin_transaction().await
276 }
277
278 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
279 (**self).checkpoint(backup_path)
280 }
281}
282
283pub trait IRawDatabaseExt: IRawDatabase + Sized {
285 fn into_database(self) -> Database {
289 Database::new(self, ModuleRegistry::default())
290 }
291}
292
293impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
294
295impl<T> From<T> for Database
296where
297 T: IRawDatabase,
298{
299 fn from(raw: T) -> Self {
300 Self::new(raw, ModuleRegistry::default())
301 }
302}
303
304#[apply(async_trait_maybe_send!)]
307pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
308 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
310 async fn register(&self, key: &[u8]);
312 async fn notify(&self, key: &[u8]);
314
315 fn is_global(&self) -> bool;
318
319 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
321}
322
323#[apply(async_trait_maybe_send!)]
324impl<T> IDatabase for Arc<T>
325where
326 T: IDatabase + ?Sized,
327{
328 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
329 (**self).begin_transaction().await
330 }
331 async fn register(&self, key: &[u8]) {
332 (**self).register(key).await;
333 }
334 async fn notify(&self, key: &[u8]) {
335 (**self).notify(key).await;
336 }
337
338 fn is_global(&self) -> bool {
339 (**self).is_global()
340 }
341
342 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
343 (**self).checkpoint(backup_path)
344 }
345}
346
347struct BaseDatabase<RawDatabase> {
351 notifications: Arc<Notifications>,
352 raw: RawDatabase,
353}
354
355impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
356 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357 f.write_str("BaseDatabase")
358 }
359}
360
361#[apply(async_trait_maybe_send!)]
362impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
363 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
364 Box::new(BaseDatabaseTransaction::new(
365 self.raw.begin_transaction().await,
366 self.notifications.clone(),
367 ))
368 }
369 async fn register(&self, key: &[u8]) {
370 self.notifications.register(key).await;
371 }
372 async fn notify(&self, key: &[u8]) {
373 self.notifications.notify(key);
374 }
375
376 fn is_global(&self) -> bool {
377 true
378 }
379
380 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
381 self.raw.checkpoint(backup_path)
382 }
383}
384
385#[derive(Clone, Debug)]
391pub struct Database {
392 inner: Arc<dyn IDatabase + 'static>,
393 module_decoders: ModuleDecoderRegistry,
394}
395
396impl Database {
397 pub fn strong_count(&self) -> usize {
398 Arc::strong_count(&self.inner)
399 }
400
401 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
402 self.inner
403 }
404}
405
406impl Database {
407 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
412 let inner = BaseDatabase {
413 raw,
414 notifications: Arc::new(Notifications::new()),
415 };
416 Self::new_from_arc(
417 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
418 module_decoders,
419 )
420 }
421
422 pub fn new_from_arc(
424 inner: Arc<dyn IDatabase + 'static>,
425 module_decoders: ModuleDecoderRegistry,
426 ) -> Self {
427 Self {
428 inner,
429 module_decoders,
430 }
431 }
432
433 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
435 Self {
436 inner: Arc::new(PrefixDatabase {
437 inner: self.inner.clone(),
438 global_dbtx_access_token: None,
439 prefix,
440 }),
441 module_decoders: self.module_decoders.clone(),
442 }
443 }
444
445 pub fn with_prefix_module_id(
449 &self,
450 module_instance_id: ModuleInstanceId,
451 ) -> (Self, GlobalDBTxAccessToken) {
452 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
453 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
454 (
455 Self {
456 inner: Arc::new(PrefixDatabase {
457 inner: self.inner.clone(),
458 global_dbtx_access_token: Some(global_dbtx_access_token),
459 prefix,
460 }),
461 module_decoders: self.module_decoders.clone(),
462 },
463 global_dbtx_access_token,
464 )
465 }
466
467 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
468 Self {
469 inner: self.inner.clone(),
470 module_decoders,
471 }
472 }
473
474 pub fn is_global(&self) -> bool {
476 self.inner.is_global()
477 }
478
479 pub fn ensure_global(&self) -> Result<()> {
481 if !self.is_global() {
482 bail!("Database instance not global");
483 }
484
485 Ok(())
486 }
487
488 pub fn ensure_isolated(&self) -> Result<()> {
490 if self.is_global() {
491 bail!("Database instance not isolated");
492 }
493
494 Ok(())
495 }
496
497 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
499 where
500 's: 'tx,
501 {
502 DatabaseTransaction::<Committable>::new(
503 self.inner.begin_transaction().await,
504 self.module_decoders.clone(),
505 )
506 }
507
508 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
510 where
511 's: 'tx,
512 {
513 self.begin_transaction().await.into_nc()
514 }
515
516 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
517 self.inner.checkpoint(backup_path)
518 }
519
520 pub async fn autocommit<'s, 'dbtx, F, T, E>(
548 &'s self,
549 tx_fn: F,
550 max_attempts: Option<usize>,
551 ) -> Result<T, AutocommitError<E>>
552 where
553 's: 'dbtx,
554 for<'r, 'o> F: Fn(
555 &'r mut DatabaseTransaction<'o>,
556 PhantomBound<'dbtx, 'o>,
557 ) -> BoxFuture<'r, Result<T, E>>,
558 {
559 assert_ne!(max_attempts, Some(0));
560 let mut curr_attempts: usize = 0;
561
562 loop {
563 curr_attempts = curr_attempts
568 .checked_add(1)
569 .expect("db autocommit attempt counter overflowed");
570
571 let mut dbtx = self.begin_transaction().await;
572
573 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
574 let val = match tx_fn_res {
575 Ok(val) => val,
576 Err(err) => {
577 dbtx.ignore_uncommitted();
578 return Err(AutocommitError::ClosureError {
579 attempts: curr_attempts,
580 error: err,
581 });
582 }
583 };
584
585 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
586
587 match dbtx.commit_tx_result().await {
588 Ok(()) => {
589 return Ok(val);
590 }
591 Err(err) => {
592 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
593 warn!(
594 target: LOG_DB,
595 curr_attempts,
596 ?err,
597 "Database commit failed in an autocommit block - terminating"
598 );
599 return Err(AutocommitError::CommitFailed {
600 attempts: curr_attempts,
601 last_error: err,
602 });
603 }
604
605 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
606 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
607 warn!(
608 target: LOG_DB,
609 curr_attempts,
610 err = %err.fmt_compact_anyhow(),
611 delay_ms = %delay,
612 "Database commit failed in an autocommit block - retrying"
613 );
614 crate::runtime::sleep(Duration::from_millis(delay)).await;
615 }
616 }
617 }
618 }
619
620 pub async fn wait_key_check<'a, K, T>(
625 &'a self,
626 key: &K,
627 checker: impl Fn(Option<K::Value>) -> Option<T>,
628 ) -> (T, DatabaseTransaction<'a, Committable>)
629 where
630 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
631 {
632 let key_bytes = key.to_bytes();
633 loop {
634 let notify = self.inner.register(&key_bytes);
636
637 let mut tx = self.inner.begin_transaction().await;
639
640 let maybe_value_bytes = tx
641 .raw_get_bytes(&key_bytes)
642 .await
643 .expect("Unrecoverable error when reading from database")
644 .map(|value_bytes| {
645 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
646 });
647
648 if let Some(value) = checker(maybe_value_bytes) {
649 return (
650 value,
651 DatabaseTransaction::new(tx, self.module_decoders.clone()),
652 );
653 }
654
655 notify.await;
657 }
660 }
661
662 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
664 where
665 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
666 {
667 self.wait_key_check(key, std::convert::identity).await.0
668 }
669}
670
671fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
672 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
673 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
674 bytes
675}
676
677#[derive(Clone, Debug)]
680struct PrefixDatabase<Inner>
681where
682 Inner: Debug,
683{
684 prefix: Vec<u8>,
685 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
686 inner: Inner,
687}
688
689impl<Inner> PrefixDatabase<Inner>
690where
691 Inner: Debug,
692{
693 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
697 let mut full_key = self.prefix.clone();
698 full_key.extend_from_slice(key);
699 full_key
700 }
701}
702
703#[apply(async_trait_maybe_send!)]
704impl<Inner> IDatabase for PrefixDatabase<Inner>
705where
706 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
707{
708 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
709 Box::new(PrefixDatabaseTransaction {
710 inner: self.inner.begin_transaction().await,
711 global_dbtx_access_token: self.global_dbtx_access_token,
712 prefix: self.prefix.clone(),
713 })
714 }
715 async fn register(&self, key: &[u8]) {
716 self.inner.register(&self.get_full_key(key)).await;
717 }
718
719 async fn notify(&self, key: &[u8]) {
720 self.inner.notify(&self.get_full_key(key)).await;
721 }
722
723 fn is_global(&self) -> bool {
724 if self.global_dbtx_access_token.is_some() {
725 false
726 } else {
727 self.inner.is_global()
728 }
729 }
730
731 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
732 self.inner.checkpoint(backup_path)
733 }
734}
735
736#[derive(Debug)]
741struct PrefixDatabaseTransaction<Inner> {
742 inner: Inner,
743 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
744 prefix: Vec<u8>,
745}
746
747impl<Inner> PrefixDatabaseTransaction<Inner> {
748 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
752 let mut full_key = self.prefix.clone();
753 full_key.extend_from_slice(key);
754 full_key
755 }
756
757 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
758 Range {
759 start: self.get_full_key(range.start),
760 end: self.get_full_key(range.end),
761 }
762 }
763
764 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
765 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
766 }
767}
768
769#[apply(async_trait_maybe_send!)]
770impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
771where
772 Inner: IDatabaseTransaction,
773{
774 async fn commit_tx(&mut self) -> Result<()> {
775 self.inner.commit_tx().await
776 }
777
778 fn is_global(&self) -> bool {
779 if self.global_dbtx_access_token.is_some() {
780 false
781 } else {
782 self.inner.is_global()
783 }
784 }
785
786 fn global_dbtx(
787 &mut self,
788 access_token: GlobalDBTxAccessToken,
789 ) -> &mut dyn IDatabaseTransaction {
790 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
791 assert_eq!(
792 access_token, self_global_dbtx_access_token,
793 "Invalid access key used to access global_dbtx"
794 );
795 &mut self.inner
796 } else {
797 self.inner.global_dbtx(access_token)
798 }
799 }
800}
801
802#[apply(async_trait_maybe_send!)]
803impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
804where
805 Inner: IDatabaseTransactionOpsCore,
806{
807 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
808 let key = self.get_full_key(key);
809 self.inner.raw_insert_bytes(&key, value).await
810 }
811
812 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
813 let key = self.get_full_key(key);
814 self.inner.raw_get_bytes(&key).await
815 }
816
817 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
818 let key = self.get_full_key(key);
819 self.inner.raw_remove_entry(&key).await
820 }
821
822 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
823 let key = self.get_full_key(key_prefix);
824 let stream = self.inner.raw_find_by_prefix(&key).await?;
825 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
826 }
827
828 async fn raw_find_by_prefix_sorted_descending(
829 &mut self,
830 key_prefix: &[u8],
831 ) -> Result<PrefixStream<'_>> {
832 let key = self.get_full_key(key_prefix);
833 let stream = self
834 .inner
835 .raw_find_by_prefix_sorted_descending(&key)
836 .await?;
837 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
838 }
839
840 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
841 let range = self.get_full_range(range);
842 let stream = self
843 .inner
844 .raw_find_by_range(Range {
845 start: &range.start,
846 end: &range.end,
847 })
848 .await?;
849 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
850 }
851
852 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
853 let key = self.get_full_key(key_prefix);
854 self.inner.raw_remove_by_prefix(&key).await
855 }
856}
857
858#[apply(async_trait_maybe_send!)]
859impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
860where
861 Inner: IDatabaseTransactionOps,
862{
863 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
864 self.inner.rollback_tx_to_savepoint().await
865 }
866
867 async fn set_tx_savepoint(&mut self) -> Result<()> {
868 self.set_tx_savepoint().await
869 }
870}
871
872#[apply(async_trait_maybe_send!)]
876pub trait IDatabaseTransactionOpsCore: MaybeSend {
877 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
879
880 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
882
883 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
885
886 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
889
890 async fn raw_find_by_prefix_sorted_descending(
892 &mut self,
893 key_prefix: &[u8],
894 ) -> Result<PrefixStream<'_>>;
895
896 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
900
901 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
903}
904
905#[apply(async_trait_maybe_send!)]
906impl<T> IDatabaseTransactionOpsCore for Box<T>
907where
908 T: IDatabaseTransactionOpsCore + ?Sized,
909{
910 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
911 (**self).raw_insert_bytes(key, value).await
912 }
913
914 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
915 (**self).raw_get_bytes(key).await
916 }
917
918 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
919 (**self).raw_remove_entry(key).await
920 }
921
922 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
923 (**self).raw_find_by_prefix(key_prefix).await
924 }
925
926 async fn raw_find_by_prefix_sorted_descending(
927 &mut self,
928 key_prefix: &[u8],
929 ) -> Result<PrefixStream<'_>> {
930 (**self)
931 .raw_find_by_prefix_sorted_descending(key_prefix)
932 .await
933 }
934
935 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
936 (**self).raw_find_by_range(range).await
937 }
938
939 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
940 (**self).raw_remove_by_prefix(key_prefix).await
941 }
942}
943
944#[apply(async_trait_maybe_send!)]
945impl<T> IDatabaseTransactionOpsCore for &mut T
946where
947 T: IDatabaseTransactionOpsCore + ?Sized,
948{
949 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
950 (**self).raw_insert_bytes(key, value).await
951 }
952
953 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
954 (**self).raw_get_bytes(key).await
955 }
956
957 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
958 (**self).raw_remove_entry(key).await
959 }
960
961 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
962 (**self).raw_find_by_prefix(key_prefix).await
963 }
964
965 async fn raw_find_by_prefix_sorted_descending(
966 &mut self,
967 key_prefix: &[u8],
968 ) -> Result<PrefixStream<'_>> {
969 (**self)
970 .raw_find_by_prefix_sorted_descending(key_prefix)
971 .await
972 }
973
974 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
975 (**self).raw_find_by_range(range).await
976 }
977
978 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
979 (**self).raw_remove_by_prefix(key_prefix).await
980 }
981}
982
983#[apply(async_trait_maybe_send!)]
989pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
990 async fn set_tx_savepoint(&mut self) -> Result<()>;
999
1000 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
1001}
1002
1003#[apply(async_trait_maybe_send!)]
1004impl<T> IDatabaseTransactionOps for Box<T>
1005where
1006 T: IDatabaseTransactionOps + ?Sized,
1007{
1008 async fn set_tx_savepoint(&mut self) -> Result<()> {
1009 (**self).set_tx_savepoint().await
1010 }
1011
1012 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1013 (**self).rollback_tx_to_savepoint().await
1014 }
1015}
1016
1017#[apply(async_trait_maybe_send!)]
1018impl<T> IDatabaseTransactionOps for &mut T
1019where
1020 T: IDatabaseTransactionOps + ?Sized,
1021{
1022 async fn set_tx_savepoint(&mut self) -> Result<()> {
1023 (**self).set_tx_savepoint().await
1024 }
1025
1026 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1027 (**self).rollback_tx_to_savepoint().await
1028 }
1029}
1030
1031#[apply(async_trait_maybe_send!)]
1037pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1038 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1039 where
1040 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1041
1042 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1043 where
1044 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1045 K::Value: MaybeSend + MaybeSync;
1046
1047 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1048 where
1049 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1050 K::Value: MaybeSend + MaybeSync;
1051
1052 async fn find_by_range<K>(
1053 &mut self,
1054 key_range: Range<K>,
1055 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1056 where
1057 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1058 K::Value: MaybeSend + MaybeSync;
1059
1060 async fn find_by_prefix<KP>(
1061 &mut self,
1062 key_prefix: &KP,
1063 ) -> Pin<
1064 Box<
1065 maybe_add_send!(
1066 dyn Stream<
1067 Item = (
1068 KP::Record,
1069 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070 ),
1071 > + '_
1072 ),
1073 >,
1074 >
1075 where
1076 KP: DatabaseLookup + MaybeSend + MaybeSync,
1077 KP::Record: DatabaseKey;
1078
1079 async fn find_by_prefix_sorted_descending<KP>(
1080 &mut self,
1081 key_prefix: &KP,
1082 ) -> Pin<
1083 Box<
1084 maybe_add_send!(
1085 dyn Stream<
1086 Item = (
1087 KP::Record,
1088 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1089 ),
1090 > + '_
1091 ),
1092 >,
1093 >
1094 where
1095 KP: DatabaseLookup + MaybeSend + MaybeSync,
1096 KP::Record: DatabaseKey;
1097
1098 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1099 where
1100 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1101
1102 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1103 where
1104 KP: DatabaseLookup + MaybeSend + MaybeSync;
1105}
1106
1107#[apply(async_trait_maybe_send!)]
1110impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1111where
1112 T: IDatabaseTransactionOpsCore + WithDecoders,
1113{
1114 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1115 where
1116 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1117 {
1118 let key_bytes = key.to_bytes();
1119 let raw = self
1120 .raw_get_bytes(&key_bytes)
1121 .await
1122 .expect("Unrecoverable error occurred while reading and entry from the database");
1123 raw.map(|value_bytes| {
1124 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1125 })
1126 }
1127
1128 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1129 where
1130 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1131 K::Value: MaybeSend + MaybeSync,
1132 {
1133 let key_bytes = key.to_bytes();
1134 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1135 .await
1136 .expect("Unrecoverable error occurred while inserting entry into the database")
1137 .map(|value_bytes| {
1138 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1139 })
1140 }
1141
1142 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1143 where
1144 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1145 K::Value: MaybeSend + MaybeSync,
1146 {
1147 if let Some(prev) = self.insert_entry(key, value).await {
1148 panic!(
1149 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1150 );
1151 }
1152 }
1153
1154 async fn find_by_range<K>(
1155 &mut self,
1156 key_range: Range<K>,
1157 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1158 where
1159 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1160 K::Value: MaybeSend + MaybeSync,
1161 {
1162 let decoders = self.decoders().clone();
1163 Box::pin(
1164 self.raw_find_by_range(Range {
1165 start: &key_range.start.to_bytes(),
1166 end: &key_range.end.to_bytes(),
1167 })
1168 .await
1169 .expect("Unrecoverable error occurred while listing entries from the database")
1170 .map(move |(key_bytes, value_bytes)| {
1171 let key = decode_key_expect(&key_bytes, &decoders);
1172 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1173 (key, value)
1174 }),
1175 )
1176 }
1177
1178 async fn find_by_prefix<KP>(
1179 &mut self,
1180 key_prefix: &KP,
1181 ) -> Pin<
1182 Box<
1183 maybe_add_send!(
1184 dyn Stream<
1185 Item = (
1186 KP::Record,
1187 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1188 ),
1189 > + '_
1190 ),
1191 >,
1192 >
1193 where
1194 KP: DatabaseLookup + MaybeSend + MaybeSync,
1195 KP::Record: DatabaseKey,
1196 {
1197 let decoders = self.decoders().clone();
1198 Box::pin(
1199 self.raw_find_by_prefix(&key_prefix.to_bytes())
1200 .await
1201 .expect("Unrecoverable error occurred while listing entries from the database")
1202 .map(move |(key_bytes, value_bytes)| {
1203 let key = decode_key_expect(&key_bytes, &decoders);
1204 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1205 (key, value)
1206 }),
1207 )
1208 }
1209
1210 async fn find_by_prefix_sorted_descending<KP>(
1211 &mut self,
1212 key_prefix: &KP,
1213 ) -> Pin<
1214 Box<
1215 maybe_add_send!(
1216 dyn Stream<
1217 Item = (
1218 KP::Record,
1219 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1220 ),
1221 > + '_
1222 ),
1223 >,
1224 >
1225 where
1226 KP: DatabaseLookup + MaybeSend + MaybeSync,
1227 KP::Record: DatabaseKey,
1228 {
1229 let decoders = self.decoders().clone();
1230 Box::pin(
1231 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1232 .await
1233 .expect("Unrecoverable error occurred while listing entries from the database")
1234 .map(move |(key_bytes, value_bytes)| {
1235 let key = decode_key_expect(&key_bytes, &decoders);
1236 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1237 (key, value)
1238 }),
1239 )
1240 }
1241 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1242 where
1243 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1244 {
1245 let key_bytes = key.to_bytes();
1246 self.raw_remove_entry(&key_bytes)
1247 .await
1248 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1249 .map(|value_bytes| {
1250 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1251 })
1252 }
1253 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1254 where
1255 KP: DatabaseLookup + MaybeSend + MaybeSync,
1256 {
1257 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1258 .await
1259 .expect("Unrecoverable error when removing entries from the database");
1260 }
1261}
1262
1263pub trait WithDecoders {
1266 fn decoders(&self) -> &ModuleDecoderRegistry;
1267}
1268
1269#[apply(async_trait_maybe_send!)]
1271pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1272 async fn commit_tx(self) -> Result<()>;
1273}
1274
1275#[apply(async_trait_maybe_send!)]
1279pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1280 async fn commit_tx(&mut self) -> Result<()>;
1282
1283 fn is_global(&self) -> bool;
1285
1286 #[doc(hidden)]
1291 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1292 -> &mut dyn IDatabaseTransaction;
1293}
1294
1295#[apply(async_trait_maybe_send!)]
1296impl<T> IDatabaseTransaction for Box<T>
1297where
1298 T: IDatabaseTransaction + ?Sized,
1299{
1300 async fn commit_tx(&mut self) -> Result<()> {
1301 (**self).commit_tx().await
1302 }
1303
1304 fn is_global(&self) -> bool {
1305 (**self).is_global()
1306 }
1307
1308 fn global_dbtx(
1309 &mut self,
1310 access_token: GlobalDBTxAccessToken,
1311 ) -> &mut dyn IDatabaseTransaction {
1312 (**self).global_dbtx(access_token)
1313 }
1314}
1315
1316#[apply(async_trait_maybe_send!)]
1317impl<'a, T> IDatabaseTransaction for &'a mut T
1318where
1319 T: IDatabaseTransaction + ?Sized,
1320{
1321 async fn commit_tx(&mut self) -> Result<()> {
1322 (**self).commit_tx().await
1323 }
1324
1325 fn is_global(&self) -> bool {
1326 (**self).is_global()
1327 }
1328
1329 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1330 (**self).global_dbtx(access_key)
1331 }
1332}
1333
1334struct BaseDatabaseTransaction<Tx> {
1337 raw: Option<Tx>,
1339 notify_queue: Option<NotifyQueue>,
1340 notifications: Arc<Notifications>,
1341}
1342
1343impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1344where
1345 Tx: fmt::Debug,
1346{
1347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1348 f.write_fmt(format_args!(
1349 "BaseDatabaseTransaction{{ raw={:?} }}",
1350 self.raw
1351 ))
1352 }
1353}
1354impl<Tx> BaseDatabaseTransaction<Tx>
1355where
1356 Tx: IRawDatabaseTransaction,
1357{
1358 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1359 Self {
1360 raw: Some(dbtx),
1361 notifications,
1362 notify_queue: Some(NotifyQueue::new()),
1363 }
1364 }
1365
1366 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1367 self.notify_queue
1368 .as_mut()
1369 .context("can not call add_notification_key after commit")?
1370 .add(&key);
1371 Ok(())
1372 }
1373}
1374
1375#[apply(async_trait_maybe_send!)]
1376impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1377 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1378 self.add_notification_key(key)?;
1379 self.raw
1380 .as_mut()
1381 .context("Cannot insert into already consumed transaction")?
1382 .raw_insert_bytes(key, value)
1383 .await
1384 }
1385
1386 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1387 self.raw
1388 .as_mut()
1389 .context("Cannot retrieve from already consumed transaction")?
1390 .raw_get_bytes(key)
1391 .await
1392 }
1393
1394 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1395 self.add_notification_key(key)?;
1396 self.raw
1397 .as_mut()
1398 .context("Cannot remove from already consumed transaction")?
1399 .raw_remove_entry(key)
1400 .await
1401 }
1402
1403 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1404 self.raw
1405 .as_mut()
1406 .context("Cannot retrieve from already consumed transaction")?
1407 .raw_find_by_range(key_range)
1408 .await
1409 }
1410
1411 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1412 self.raw
1413 .as_mut()
1414 .context("Cannot retrieve from already consumed transaction")?
1415 .raw_find_by_prefix(key_prefix)
1416 .await
1417 }
1418
1419 async fn raw_find_by_prefix_sorted_descending(
1420 &mut self,
1421 key_prefix: &[u8],
1422 ) -> Result<PrefixStream<'_>> {
1423 self.raw
1424 .as_mut()
1425 .context("Cannot retrieve from already consumed transaction")?
1426 .raw_find_by_prefix_sorted_descending(key_prefix)
1427 .await
1428 }
1429
1430 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1431 self.raw
1432 .as_mut()
1433 .context("Cannot remove from already consumed transaction")?
1434 .raw_remove_by_prefix(key_prefix)
1435 .await
1436 }
1437}
1438
1439#[apply(async_trait_maybe_send!)]
1440impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1441 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1442 self.raw
1443 .as_mut()
1444 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1445 .rollback_tx_to_savepoint()
1446 .await?;
1447 Ok(())
1448 }
1449
1450 async fn set_tx_savepoint(&mut self) -> Result<()> {
1451 self.raw
1452 .as_mut()
1453 .context("Cannot set a tx savepoint on an already consumed transaction")?
1454 .set_tx_savepoint()
1455 .await?;
1456 Ok(())
1457 }
1458}
1459
1460#[apply(async_trait_maybe_send!)]
1461impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1462 for BaseDatabaseTransaction<Tx>
1463{
1464 async fn commit_tx(&mut self) -> Result<()> {
1465 self.raw
1466 .take()
1467 .context("Cannot commit an already committed transaction")?
1468 .commit_tx()
1469 .await?;
1470 self.notifications.submit_queue(
1471 &self
1472 .notify_queue
1473 .take()
1474 .expect("commit must be called only once"),
1475 );
1476 Ok(())
1477 }
1478
1479 fn is_global(&self) -> bool {
1480 true
1481 }
1482
1483 fn global_dbtx(
1484 &mut self,
1485 _access_token: GlobalDBTxAccessToken,
1486 ) -> &mut dyn IDatabaseTransaction {
1487 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1488 }
1489}
1490
1491#[derive(Clone)]
1494struct CommitTracker {
1495 is_committed: bool,
1497 has_writes: bool,
1499 ignore_uncommitted: bool,
1501}
1502
1503impl Drop for CommitTracker {
1504 fn drop(&mut self) {
1505 if self.has_writes && !self.is_committed {
1506 if self.ignore_uncommitted {
1507 trace!(
1508 target: LOG_DB,
1509 "DatabaseTransaction has writes and has not called commit, but that's expected."
1510 );
1511 } else {
1512 warn!(
1513 target: LOG_DB,
1514 location = ?backtrace::Backtrace::new(),
1515 "DatabaseTransaction has writes and has not called commit."
1516 );
1517 }
1518 }
1519 }
1520}
1521
1522enum MaybeRef<'a, T> {
1523 Owned(T),
1524 Borrowed(&'a mut T),
1525}
1526
1527impl<T> ops::Deref for MaybeRef<'_, T> {
1528 type Target = T;
1529
1530 fn deref(&self) -> &Self::Target {
1531 match self {
1532 MaybeRef::Owned(o) => o,
1533 MaybeRef::Borrowed(r) => r,
1534 }
1535 }
1536}
1537
1538impl<T> ops::DerefMut for MaybeRef<'_, T> {
1539 fn deref_mut(&mut self) -> &mut Self::Target {
1540 match self {
1541 MaybeRef::Owned(o) => o,
1542 MaybeRef::Borrowed(r) => r,
1543 }
1544 }
1545}
1546
1547pub struct Committable;
1551
1552pub struct NonCommittable;
1556
1557pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1561 tx: Box<dyn IDatabaseTransaction + 'tx>,
1562 decoders: ModuleDecoderRegistry,
1563 commit_tracker: MaybeRef<'tx, CommitTracker>,
1564 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1565 capability: marker::PhantomData<Cap>,
1566}
1567
1568impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1570 f.write_fmt(format_args!(
1571 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1572 self.tx, self.decoders
1573 ))
1574 }
1575}
1576
1577impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1578 fn decoders(&self) -> &ModuleDecoderRegistry {
1579 &self.decoders
1580 }
1581}
1582
1583#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1584fn decode_value<V: DatabaseValue>(
1585 value_bytes: &[u8],
1586 decoders: &ModuleDecoderRegistry,
1587) -> Result<V, DecodingError> {
1588 trace!(
1589 bytes = %AbbreviateHexBytes(value_bytes),
1590 "decoding value",
1591 );
1592 V::from_bytes(value_bytes, decoders)
1593}
1594
1595fn decode_value_expect<V: DatabaseValue>(
1596 value_bytes: &[u8],
1597 decoders: &ModuleDecoderRegistry,
1598 key_bytes: &[u8],
1599) -> V {
1600 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1601 panic!(
1602 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1603 any::type_name::<V>(),
1604 err,
1605 AbbreviateHexBytes(key_bytes),
1606 AbbreviateHexBytes(value_bytes),
1607 )
1608 })
1609}
1610
1611fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1612 trace!(
1613 bytes = %AbbreviateHexBytes(key_bytes),
1614 "decoding key",
1615 );
1616 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1617 panic!(
1618 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1619 any::type_name::<K>(),
1620 err,
1621 AbbreviateHexBytes(key_bytes)
1622 )
1623 })
1624}
1625
1626impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1627 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1629 DatabaseTransaction {
1630 tx: self.tx,
1631 decoders: self.decoders,
1632 commit_tracker: self.commit_tracker,
1633 on_commit_hooks: self.on_commit_hooks,
1634 capability: PhantomData::<NonCommittable>,
1635 }
1636 }
1637
1638 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1640 where
1641 's: 'a,
1642 {
1643 self.to_ref().into_nc()
1644 }
1645
1646 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1648 where
1649 'tx: 'a,
1650 {
1651 DatabaseTransaction {
1652 tx: Box::new(PrefixDatabaseTransaction {
1653 inner: self.tx,
1654 global_dbtx_access_token: None,
1655 prefix,
1656 }),
1657 decoders: self.decoders,
1658 commit_tracker: self.commit_tracker,
1659 on_commit_hooks: self.on_commit_hooks,
1660 capability: self.capability,
1661 }
1662 }
1663
1664 pub fn with_prefix_module_id<'a: 'tx>(
1668 self,
1669 module_instance_id: ModuleInstanceId,
1670 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1671 where
1672 'tx: 'a,
1673 {
1674 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1675 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1676 (
1677 DatabaseTransaction {
1678 tx: Box::new(PrefixDatabaseTransaction {
1679 inner: self.tx,
1680 global_dbtx_access_token: Some(global_dbtx_access_token),
1681 prefix,
1682 }),
1683 decoders: self.decoders,
1684 commit_tracker: self.commit_tracker,
1685 on_commit_hooks: self.on_commit_hooks,
1686 capability: self.capability,
1687 },
1688 global_dbtx_access_token,
1689 )
1690 }
1691
1692 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1694 where
1695 's: 'a,
1696 {
1697 let decoders = self.decoders.clone();
1698
1699 DatabaseTransaction {
1700 tx: Box::new(&mut self.tx),
1701 decoders,
1702 commit_tracker: match self.commit_tracker {
1703 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1704 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1705 },
1706 on_commit_hooks: match self.on_commit_hooks {
1707 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1708 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1709 },
1710 capability: self.capability,
1711 }
1712 }
1713
1714 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1716 where
1717 'tx: 'a,
1718 {
1719 DatabaseTransaction {
1720 tx: Box::new(PrefixDatabaseTransaction {
1721 inner: &mut self.tx,
1722 global_dbtx_access_token: None,
1723 prefix,
1724 }),
1725 decoders: self.decoders.clone(),
1726 commit_tracker: match self.commit_tracker {
1727 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1728 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1729 },
1730 on_commit_hooks: match self.on_commit_hooks {
1731 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1732 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1733 },
1734 capability: self.capability,
1735 }
1736 }
1737
1738 pub fn to_ref_with_prefix_module_id<'a>(
1739 &'a mut self,
1740 module_instance_id: ModuleInstanceId,
1741 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1742 where
1743 'tx: 'a,
1744 {
1745 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1746 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1747 (
1748 DatabaseTransaction {
1749 tx: Box::new(PrefixDatabaseTransaction {
1750 inner: &mut self.tx,
1751 global_dbtx_access_token: Some(global_dbtx_access_token),
1752 prefix,
1753 }),
1754 decoders: self.decoders.clone(),
1755 commit_tracker: match self.commit_tracker {
1756 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1757 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1758 },
1759 on_commit_hooks: match self.on_commit_hooks {
1760 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1761 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1762 },
1763 capability: self.capability,
1764 },
1765 global_dbtx_access_token,
1766 )
1767 }
1768
1769 pub fn is_global(&self) -> bool {
1771 self.tx.is_global()
1772 }
1773
1774 pub fn ensure_global(&self) -> Result<()> {
1776 if !self.is_global() {
1777 bail!("Database instance not global");
1778 }
1779
1780 Ok(())
1781 }
1782
1783 pub fn ensure_isolated(&self) -> Result<()> {
1785 if self.is_global() {
1786 bail!("Database instance not isolated");
1787 }
1788
1789 Ok(())
1790 }
1791
1792 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1794 self.commit_tracker.ignore_uncommitted = true;
1795 self
1796 }
1797
1798 pub fn warn_uncommitted(&mut self) -> &mut Self {
1800 self.commit_tracker.ignore_uncommitted = false;
1801 self
1802 }
1803
1804 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1806 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1807 self.on_commit_hooks.push(Box::new(f));
1808 }
1809
1810 pub fn global_dbtx<'a>(
1811 &'a mut self,
1812 access_token: GlobalDBTxAccessToken,
1813 ) -> DatabaseTransaction<'a, Cap>
1814 where
1815 'tx: 'a,
1816 {
1817 let decoders = self.decoders.clone();
1818
1819 DatabaseTransaction {
1820 tx: Box::new(self.tx.global_dbtx(access_token)),
1821 decoders,
1822 commit_tracker: match self.commit_tracker {
1823 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1824 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1825 },
1826 on_commit_hooks: match self.on_commit_hooks {
1827 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1828 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1829 },
1830 capability: self.capability,
1831 }
1832 }
1833}
1834
1835#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1837pub struct GlobalDBTxAccessToken(u32);
1838
1839impl GlobalDBTxAccessToken {
1840 fn from_prefix(prefix: &[u8]) -> Self {
1851 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1852 }
1853}
1854
1855impl<'tx> DatabaseTransaction<'tx, Committable> {
1856 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1857 Self {
1858 tx: dbtx,
1859 decoders,
1860 commit_tracker: MaybeRef::Owned(CommitTracker {
1861 is_committed: false,
1862 has_writes: false,
1863 ignore_uncommitted: false,
1864 }),
1865 on_commit_hooks: MaybeRef::Owned(vec![]),
1866 capability: PhantomData,
1867 }
1868 }
1869
1870 pub async fn commit_tx_result(mut self) -> Result<()> {
1871 self.commit_tracker.is_committed = true;
1872 let commit_result = self.tx.commit_tx().await;
1873
1874 if commit_result.is_ok() {
1876 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1877 hook();
1878 }
1879 }
1880
1881 commit_result
1882 }
1883
1884 pub async fn commit_tx(mut self) {
1885 self.commit_tracker.is_committed = true;
1886 self.commit_tx_result()
1887 .await
1888 .expect("Unrecoverable error occurred while committing to the database.");
1889 }
1890}
1891
1892#[apply(async_trait_maybe_send!)]
1893impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1894where
1895 Cap: Send,
1896{
1897 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1898 self.commit_tracker.has_writes = true;
1899 self.tx.raw_insert_bytes(key, value).await
1900 }
1901
1902 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1903 self.tx.raw_get_bytes(key).await
1904 }
1905
1906 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1907 self.tx.raw_remove_entry(key).await
1908 }
1909
1910 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1911 self.tx.raw_find_by_range(key_range).await
1912 }
1913
1914 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1915 self.tx.raw_find_by_prefix(key_prefix).await
1916 }
1917
1918 async fn raw_find_by_prefix_sorted_descending(
1919 &mut self,
1920 key_prefix: &[u8],
1921 ) -> Result<PrefixStream<'_>> {
1922 self.tx
1923 .raw_find_by_prefix_sorted_descending(key_prefix)
1924 .await
1925 }
1926
1927 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1928 self.commit_tracker.has_writes = true;
1929 self.tx.raw_remove_by_prefix(key_prefix).await
1930 }
1931}
1932#[apply(async_trait_maybe_send!)]
1933impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {
1934 async fn set_tx_savepoint(&mut self) -> Result<()> {
1935 self.tx.set_tx_savepoint().await
1936 }
1937
1938 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1939 self.tx.rollback_tx_to_savepoint().await
1940 }
1941}
1942
1943impl<T> DatabaseKeyPrefix for T
1944where
1945 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1946{
1947 fn to_bytes(&self) -> Vec<u8> {
1948 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1949 data.append(&mut self.consensus_encode_to_vec());
1950 data
1951 }
1952}
1953
1954impl<T> DatabaseKey for T
1955where
1956 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1959{
1960 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1961 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1962 if data.is_empty() {
1963 return Err(DecodingError::wrong_length(1, 0));
1965 }
1966
1967 if data[0] != Self::DB_PREFIX {
1968 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1969 }
1970
1971 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1972 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1973 }
1974}
1975
1976impl<T> DatabaseValue for T
1977where
1978 T: Debug + Encodable + Decodable,
1979{
1980 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1981 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1982 }
1983
1984 fn to_bytes(&self) -> Vec<u8> {
1985 self.consensus_encode_to_vec()
1986 }
1987}
1988
1989#[macro_export]
2050macro_rules! impl_db_record {
2051 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2052 impl $crate::db::DatabaseRecord for $key {
2053 const DB_PREFIX: u8 = $db_prefix as u8;
2054 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2055 type Key = Self;
2056 type Value = $val;
2057 }
2058 $(
2059 impl_db_record! {
2060 @impl_notify_marker key = $key, notify_on_modify = $notify
2061 }
2062 )?
2063 };
2064 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2066 impl $crate::db::DatabaseKeyWithNotify for $key {}
2067 };
2068 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2070}
2071
2072#[macro_export]
2073macro_rules! impl_db_lookup{
2074 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2075 $(
2076 impl $crate::db::DatabaseLookup for $query_prefix {
2077 type Record = $key;
2078 }
2079 )*
2080 };
2081}
2082
2083#[derive(Debug, Encodable, Decodable, Serialize)]
2085pub struct DatabaseVersionKeyV0;
2086
2087#[derive(Debug, Encodable, Decodable, Serialize)]
2088pub struct DatabaseVersionKey(pub ModuleInstanceId);
2089
2090#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2091pub struct DatabaseVersion(pub u64);
2092
2093impl_db_record!(
2094 key = DatabaseVersionKeyV0,
2095 value = DatabaseVersion,
2096 db_prefix = DbKeyPrefix::DatabaseVersion
2097);
2098
2099impl_db_record!(
2100 key = DatabaseVersionKey,
2101 value = DatabaseVersion,
2102 db_prefix = DbKeyPrefix::DatabaseVersion
2103);
2104
2105impl std::fmt::Display for DatabaseVersion {
2106 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2107 write!(f, "{}", self.0)
2108 }
2109}
2110
2111impl DatabaseVersion {
2112 pub fn increment(&self) -> Self {
2113 Self(self.0 + 1)
2114 }
2115}
2116
2117impl std::fmt::Display for DbKeyPrefix {
2118 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2119 write!(f, "{self:?}")
2120 }
2121}
2122
2123#[repr(u8)]
2124#[derive(Clone, EnumIter, Debug)]
2125pub enum DbKeyPrefix {
2126 DatabaseVersion = 0x50,
2127 ClientBackup = 0x51,
2128}
2129
2130#[derive(Debug, Error)]
2131pub enum DecodingError {
2132 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2133 WrongPrefix { expected: u8, found: u8 },
2134 #[error("Key had a wrong length, expected {expected} but got {found}")]
2135 WrongLength { expected: usize, found: usize },
2136 #[error("Other decoding error: {0:#}")]
2137 Other(anyhow::Error),
2138}
2139
2140impl DecodingError {
2141 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2142 Self::Other(anyhow::Error::from(error))
2143 }
2144
2145 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2146 Self::WrongPrefix { expected, found }
2147 }
2148
2149 pub fn wrong_length(expected: usize, found: usize) -> Self {
2150 Self::WrongLength { expected, found }
2151 }
2152}
2153
2154#[macro_export]
2155macro_rules! push_db_pair_items {
2156 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2157 let db_items =
2158 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2159 .await
2160 .map(|(key, val)| {
2161 (
2162 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2163 val,
2164 )
2165 })
2166 .collect::<BTreeMap<String, $value_type>>()
2167 .await;
2168
2169 $map.insert($key_literal.to_string(), Box::new(db_items));
2170 };
2171}
2172
2173#[macro_export]
2174macro_rules! push_db_key_items {
2175 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2176 let db_items =
2177 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2178 .await
2179 .map(|(key, _)| key)
2180 .collect::<Vec<$key_type>>()
2181 .await;
2182
2183 $map.insert($key_literal.to_string(), Box::new(db_items));
2184 };
2185}
2186
2187pub struct DbMigrationFnContext<'tx, C> {
2201 dbtx: DatabaseTransaction<'tx>,
2202 module_instance_id: Option<ModuleInstanceId>,
2203 ctx: C,
2204 __please_use_constructor: (),
2205}
2206
2207impl<'tx, C> DbMigrationFnContext<'tx, C> {
2208 pub fn new(
2209 dbtx: DatabaseTransaction<'tx>,
2210 module_instance_id: Option<ModuleInstanceId>,
2211 ctx: C,
2212 ) -> Self {
2213 dbtx.ensure_global().expect("Must pass global dbtx");
2214 Self {
2215 dbtx,
2216 module_instance_id,
2217 ctx,
2218 __please_use_constructor: (),
2220 }
2221 }
2222
2223 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2224 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2225 }
2226
2227 #[doc(hidden)]
2229 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2230 let Self { dbtx, ctx, .. } = self;
2231
2232 (dbtx, ctx)
2233 }
2234
2235 pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2236 if let Some(module_instance_id) = self.module_instance_id {
2237 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2238 } else {
2239 self.dbtx.to_ref_nc()
2240 }
2241 }
2242
2243 #[doc(hidden)]
2245 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2246 self.module_instance_id
2247 }
2248}
2249
2250pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2252pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2253
2254pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2259pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2260
2261pub type DbMigrationFn<C> = Box<
2272 maybe_add_send_sync!(
2273 dyn for<'tx> Fn(
2274 DbMigrationFnContext<'tx, C>,
2275 ) -> Pin<
2276 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2277 >
2278 ),
2279>;
2280
2281pub fn get_current_database_version<F>(
2285 migrations: &BTreeMap<DatabaseVersion, F>,
2286) -> DatabaseVersion {
2287 let versions = migrations.keys().copied().collect::<Vec<_>>();
2288
2289 if !versions
2292 .windows(2)
2293 .all(|window| window[0].increment() == window[1])
2294 {
2295 panic!("Database Migrations are not defined contiguously");
2296 }
2297
2298 versions
2299 .last()
2300 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2301}
2302
2303pub async fn apply_migrations<C>(
2304 db: &Database,
2305 ctx: C,
2306 kind: String,
2307 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2308 module_instance_id: Option<ModuleInstanceId>,
2309 external_prefixes_above: Option<u8>,
2312) -> Result<(), anyhow::Error>
2313where
2314 C: Clone,
2315{
2316 let mut dbtx = db.begin_transaction().await;
2317 apply_migrations_dbtx(
2318 &mut dbtx.to_ref_nc(),
2319 ctx,
2320 kind,
2321 migrations,
2322 module_instance_id,
2323 external_prefixes_above,
2324 )
2325 .await?;
2326
2327 dbtx.commit_tx_result().await
2328}
2329pub async fn apply_migrations_dbtx<C>(
2341 global_dbtx: &mut DatabaseTransaction<'_>,
2342 ctx: C,
2343 kind: String,
2344 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2345 module_instance_id: Option<ModuleInstanceId>,
2346 external_prefixes_above: Option<u8>,
2349) -> Result<(), anyhow::Error>
2350where
2351 C: Clone,
2352{
2353 let is_new_db = global_dbtx
2356 .raw_find_by_prefix(&[])
2357 .await?
2358 .filter(|(key, _v)| {
2359 std::future::ready(
2360 external_prefixes_above.is_none_or(|external_prefixes_above| {
2361 !key.is_empty() && key[0] < external_prefixes_above
2362 }),
2363 )
2364 })
2365 .next()
2366 .await
2367 .is_none();
2368
2369 let target_db_version = get_current_database_version(&migrations);
2370
2371 create_database_version_dbtx(
2373 global_dbtx,
2374 target_db_version,
2375 module_instance_id,
2376 kind.clone(),
2377 is_new_db,
2378 )
2379 .await?;
2380
2381 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2382
2383 let disk_version = global_dbtx
2384 .get_value(&DatabaseVersionKey(module_instance_id_key))
2385 .await;
2386
2387 let db_version = if let Some(disk_version) = disk_version {
2388 let mut current_db_version = disk_version;
2389
2390 if current_db_version > target_db_version {
2391 return Err(anyhow::anyhow!(format!(
2392 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2393 )));
2394 }
2395
2396 while current_db_version < target_db_version {
2397 if let Some(migration) = migrations.get(¤t_db_version) {
2398 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2399 migration(DbMigrationFnContext::new(
2400 global_dbtx.to_ref_nc(),
2401 module_instance_id,
2402 ctx.clone(),
2403 ))
2404 .await?;
2405 } else {
2406 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2407 }
2408
2409 current_db_version = current_db_version.increment();
2410
2411 global_dbtx
2412 .insert_entry(
2413 &DatabaseVersionKey(module_instance_id_key),
2414 ¤t_db_version,
2415 )
2416 .await;
2417 }
2418
2419 current_db_version
2420 } else {
2421 target_db_version
2422 };
2423
2424 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2425 Ok(())
2426}
2427
2428pub async fn create_database_version(
2429 db: &Database,
2430 target_db_version: DatabaseVersion,
2431 module_instance_id: Option<ModuleInstanceId>,
2432 kind: String,
2433 is_new_db: bool,
2434) -> Result<(), anyhow::Error> {
2435 let mut dbtx = db.begin_transaction().await;
2436
2437 create_database_version_dbtx(
2438 &mut dbtx.to_ref_nc(),
2439 target_db_version,
2440 module_instance_id,
2441 kind,
2442 is_new_db,
2443 )
2444 .await?;
2445
2446 dbtx.commit_tx_result().await?;
2447 Ok(())
2448}
2449
2450pub async fn create_database_version_dbtx(
2454 global_dbtx: &mut DatabaseTransaction<'_>,
2455 target_db_version: DatabaseVersion,
2456 module_instance_id: Option<ModuleInstanceId>,
2457 kind: String,
2458 is_new_db: bool,
2459) -> Result<(), anyhow::Error> {
2460 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2461
2462 if global_dbtx
2466 .get_value(&DatabaseVersionKey(key_module_instance_id))
2467 .await
2468 .is_none()
2469 {
2470 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2479 remove_current_db_version_if_exists(
2480 &mut global_dbtx
2481 .to_ref_with_prefix_module_id(module_instance_id)
2482 .0
2483 .into_nc(),
2484 is_new_db,
2485 target_db_version,
2486 )
2487 .await
2488 } else {
2489 remove_current_db_version_if_exists(
2490 &mut global_dbtx.to_ref().into_nc(),
2491 is_new_db,
2492 target_db_version,
2493 )
2494 .await
2495 };
2496
2497 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2499 global_dbtx
2500 .insert_new_entry(
2501 &DatabaseVersionKey(key_module_instance_id),
2502 ¤t_version_in_module,
2503 )
2504 .await;
2505 }
2506
2507 Ok(())
2508}
2509
2510async fn remove_current_db_version_if_exists(
2515 version_dbtx: &mut DatabaseTransaction<'_>,
2516 is_new_db: bool,
2517 target_db_version: DatabaseVersion,
2518) -> DatabaseVersion {
2519 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2523 match current_version_in_module {
2524 Some(database_version) => database_version,
2525 None if is_new_db => target_db_version,
2526 None => DatabaseVersion(0),
2527 }
2528}
2529
2530fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2533 module_instance_id.map_or_else(
2535 || MODULE_GLOBAL_PREFIX.into(),
2536 |module_instance_id| module_instance_id,
2537 )
2538}
2539#[allow(unused_imports)]
2540mod test_utils {
2541 use std::collections::BTreeMap;
2542 use std::time::Duration;
2543
2544 use fedimint_core::db::DbMigrationFnContext;
2545 use futures::future::ready;
2546 use futures::{Future, FutureExt, StreamExt};
2547 use rand::Rng;
2548 use tokio::join;
2549
2550 use super::{
2551 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2552 DbMigrationFn, apply_migrations,
2553 };
2554 use crate::core::ModuleKind;
2555 use crate::db::mem_impl::MemDatabase;
2556 use crate::db::{
2557 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2558 };
2559 use crate::encoding::{Decodable, Encodable};
2560 use crate::module::registry::ModuleDecoderRegistry;
2561
2562 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2563 crate::runtime::timeout(Duration::from_millis(10), fut)
2564 .await
2565 .ok()
2566 }
2567
2568 #[repr(u8)]
2569 #[derive(Clone)]
2570 pub enum TestDbKeyPrefix {
2571 Test = 0x42,
2572 AltTest = 0x43,
2573 PercentTestKey = 0x25,
2574 }
2575
2576 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2577 pub(super) struct TestKey(pub u64);
2578
2579 #[derive(Debug, Encodable, Decodable)]
2580 struct DbPrefixTestPrefix;
2581
2582 impl_db_record!(
2583 key = TestKey,
2584 value = TestVal,
2585 db_prefix = TestDbKeyPrefix::Test,
2586 notify_on_modify = true,
2587 );
2588 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2589
2590 #[derive(Debug, Encodable, Decodable)]
2591 struct TestKeyV0(u64, u64);
2592
2593 #[derive(Debug, Encodable, Decodable)]
2594 struct DbPrefixTestPrefixV0;
2595
2596 impl_db_record!(
2597 key = TestKeyV0,
2598 value = TestVal,
2599 db_prefix = TestDbKeyPrefix::Test,
2600 );
2601 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2602
2603 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2604 struct AltTestKey(u64);
2605
2606 #[derive(Debug, Encodable, Decodable)]
2607 struct AltDbPrefixTestPrefix;
2608
2609 impl_db_record!(
2610 key = AltTestKey,
2611 value = TestVal,
2612 db_prefix = TestDbKeyPrefix::AltTest,
2613 );
2614 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2615
2616 #[derive(Debug, Encodable, Decodable)]
2617 struct PercentTestKey(u64);
2618
2619 #[derive(Debug, Encodable, Decodable)]
2620 struct PercentPrefixTestPrefix;
2621
2622 impl_db_record!(
2623 key = PercentTestKey,
2624 value = TestVal,
2625 db_prefix = TestDbKeyPrefix::PercentTestKey,
2626 );
2627
2628 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2629 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2630 pub(super) struct TestVal(pub u64);
2631
2632 const TEST_MODULE_PREFIX: u16 = 1;
2633 const ALT_MODULE_PREFIX: u16 = 2;
2634
2635 pub async fn verify_insert_elements(db: Database) {
2636 let mut dbtx = db.begin_transaction().await;
2637 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2638 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2639 dbtx.commit_tx().await;
2640
2641 let mut dbtx = db.begin_transaction().await;
2643 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2644 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2645 dbtx.commit_tx().await;
2646
2647 let mut dbtx = db.begin_transaction().await;
2649 assert_eq!(
2650 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2651 Some(TestVal(2))
2652 );
2653 assert_eq!(
2654 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2655 Some(TestVal(3))
2656 );
2657 dbtx.commit_tx().await;
2658
2659 let mut dbtx = db.begin_transaction().await;
2660 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2661 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2662 dbtx.commit_tx().await;
2663 }
2664
2665 pub async fn verify_remove_nonexisting(db: Database) {
2666 let mut dbtx = db.begin_transaction().await;
2667 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2668 let removed = dbtx.remove_entry(&TestKey(1)).await;
2669 assert!(removed.is_none());
2670
2671 dbtx.commit_tx().await;
2673 }
2674
2675 pub async fn verify_remove_existing(db: Database) {
2676 let mut dbtx = db.begin_transaction().await;
2677
2678 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2679
2680 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2681
2682 let removed = dbtx.remove_entry(&TestKey(1)).await;
2683 assert_eq!(removed, Some(TestVal(2)));
2684 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2685
2686 dbtx.commit_tx().await;
2688 }
2689
2690 pub async fn verify_read_own_writes(db: Database) {
2691 let mut dbtx = db.begin_transaction().await;
2692
2693 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2694
2695 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2696
2697 dbtx.commit_tx().await;
2699 }
2700
2701 pub async fn verify_prevent_dirty_reads(db: Database) {
2702 let mut dbtx = db.begin_transaction().await;
2703
2704 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2705
2706 let mut dbtx2 = db.begin_transaction().await;
2708 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2709
2710 dbtx.commit_tx().await;
2712 }
2713
2714 pub async fn verify_find_by_range(db: Database) {
2715 let mut dbtx = db.begin_transaction().await;
2716 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2717 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2718 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2719
2720 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2721 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2722
2723 {
2724 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2725 module_dbtx
2726 .insert_entry(&TestKey(300), &TestVal(3000))
2727 .await;
2728 }
2729
2730 dbtx.commit_tx().await;
2731
2732 let mut dbtx = db.begin_transaction_nc().await;
2734
2735 let returned_keys = dbtx
2736 .find_by_range(TestKey(55)..TestKey(56))
2737 .await
2738 .collect::<Vec<_>>()
2739 .await;
2740
2741 let expected = vec![(TestKey(55), TestVal(9999))];
2742
2743 assert_eq!(returned_keys, expected);
2744
2745 let returned_keys = dbtx
2746 .find_by_range(TestKey(54)..TestKey(56))
2747 .await
2748 .collect::<Vec<_>>()
2749 .await;
2750
2751 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2752 assert_eq!(returned_keys, expected);
2753
2754 let returned_keys = dbtx
2755 .find_by_range(TestKey(54)..TestKey(57))
2756 .await
2757 .collect::<Vec<_>>()
2758 .await;
2759
2760 let expected = vec![
2761 (TestKey(54), TestVal(8888)),
2762 (TestKey(55), TestVal(9999)),
2763 (TestKey(56), TestVal(7777)),
2764 ];
2765 assert_eq!(returned_keys, expected);
2766
2767 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2768 let test_range = module_dbtx
2769 .find_by_range(TestKey(300)..TestKey(301))
2770 .await
2771 .collect::<Vec<_>>()
2772 .await;
2773 assert!(test_range.len() == 1);
2774 }
2775
2776 pub async fn verify_find_by_prefix(db: Database) {
2777 let mut dbtx = db.begin_transaction().await;
2778 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2779 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2780
2781 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2782 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2783 dbtx.commit_tx().await;
2784
2785 let mut dbtx = db.begin_transaction().await;
2787
2788 let returned_keys = dbtx
2789 .find_by_prefix(&DbPrefixTestPrefix)
2790 .await
2791 .collect::<Vec<_>>()
2792 .await;
2793
2794 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2795 assert_eq!(returned_keys, expected);
2796
2797 let reversed = dbtx
2798 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2799 .await
2800 .collect::<Vec<_>>()
2801 .await;
2802 let mut reversed_expected = expected;
2803 reversed_expected.reverse();
2804 assert_eq!(reversed, reversed_expected);
2805
2806 let returned_keys = dbtx
2807 .find_by_prefix(&AltDbPrefixTestPrefix)
2808 .await
2809 .collect::<Vec<_>>()
2810 .await;
2811
2812 let expected = vec![
2813 (AltTestKey(54), TestVal(6666)),
2814 (AltTestKey(55), TestVal(7777)),
2815 ];
2816 assert_eq!(returned_keys, expected);
2817
2818 let reversed = dbtx
2819 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2820 .await
2821 .collect::<Vec<_>>()
2822 .await;
2823 let mut reversed_expected = expected;
2824 reversed_expected.reverse();
2825 assert_eq!(reversed, reversed_expected);
2826 }
2827
2828 pub async fn verify_commit(db: Database) {
2829 let mut dbtx = db.begin_transaction().await;
2830
2831 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2832 dbtx.commit_tx().await;
2833
2834 let mut dbtx2 = db.begin_transaction().await;
2836 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2837 }
2838
2839 pub async fn verify_rollback_to_savepoint(db: Database) {
2840 let mut dbtx_rollback = db.begin_transaction().await;
2841
2842 dbtx_rollback
2843 .insert_entry(&TestKey(20), &TestVal(2000))
2844 .await;
2845
2846 dbtx_rollback
2847 .set_tx_savepoint()
2848 .await
2849 .expect("Error setting transaction savepoint");
2850
2851 dbtx_rollback
2852 .insert_entry(&TestKey(21), &TestVal(2001))
2853 .await;
2854
2855 assert_eq!(
2856 dbtx_rollback.get_value(&TestKey(20)).await,
2857 Some(TestVal(2000))
2858 );
2859 assert_eq!(
2860 dbtx_rollback.get_value(&TestKey(21)).await,
2861 Some(TestVal(2001))
2862 );
2863
2864 dbtx_rollback
2865 .rollback_tx_to_savepoint()
2866 .await
2867 .expect("Error setting transaction savepoint");
2868
2869 assert_eq!(
2870 dbtx_rollback.get_value(&TestKey(20)).await,
2871 Some(TestVal(2000))
2872 );
2873
2874 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2875
2876 dbtx_rollback.commit_tx().await;
2878 }
2879
2880 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2881 let mut dbtx = db.begin_transaction().await;
2882 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2883
2884 let mut dbtx2 = db.begin_transaction().await;
2885
2886 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2887
2888 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2889
2890 dbtx2.commit_tx().await;
2891
2892 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2895
2896 let expected_keys = 0;
2897 let returned_keys = dbtx
2898 .find_by_prefix(&DbPrefixTestPrefix)
2899 .await
2900 .fold(0, |returned_keys, (key, value)| async move {
2901 if key == TestKey(100) {
2902 assert!(value.eq(&TestVal(101)));
2903 }
2904 returned_keys + 1
2905 })
2906 .await;
2907
2908 assert_eq!(returned_keys, expected_keys);
2909 }
2910
2911 pub async fn verify_snapshot_isolation(db: Database) {
2912 async fn random_yield() {
2913 let times = if rand::thread_rng().gen_bool(0.5) {
2914 0
2915 } else {
2916 10
2917 };
2918 for _ in 0..times {
2919 tokio::task::yield_now().await;
2920 }
2921 }
2922
2923 for i in 0..1000 {
2925 let base_key = i * 2;
2926 let tx_accepted_key = base_key;
2927 let spent_input_key = base_key + 1;
2928
2929 join!(
2930 async {
2931 random_yield().await;
2932 let mut dbtx = db.begin_transaction().await;
2933
2934 random_yield().await;
2935 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2936 random_yield().await;
2937 let s = match i % 5 {
2940 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2941 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2942 2 => {
2943 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2944 .await
2945 }
2946 3 => {
2947 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2948 .await
2949 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2950 .map(|(_k, v)| v)
2951 .next()
2952 .await
2953 }
2954 4 => {
2955 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2956 .await
2957 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2958 .map(|(_k, v)| v)
2959 .next()
2960 .await
2961 }
2962 _ => {
2963 panic!("woot?");
2964 }
2965 };
2966
2967 match (a, s) {
2968 (None, None) | (Some(_), Some(_)) => {}
2969 (None, Some(_)) => panic!("none some?! {i}"),
2970 (Some(_), None) => panic!("some none?! {i}"),
2971 }
2972 },
2973 async {
2974 random_yield().await;
2975
2976 let mut dbtx = db.begin_transaction().await;
2977 random_yield().await;
2978 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2979
2980 random_yield().await;
2981 assert_eq!(
2982 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2983 .await,
2984 None
2985 );
2986
2987 random_yield().await;
2988 assert_eq!(
2989 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2990 .await,
2991 None
2992 );
2993 random_yield().await;
2994 dbtx.commit_tx().await;
2995 }
2996 );
2997 }
2998 }
2999
3000 pub async fn verify_phantom_entry(db: Database) {
3001 let mut dbtx = db.begin_transaction().await;
3002
3003 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3004
3005 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3006
3007 dbtx.commit_tx().await;
3008
3009 let mut dbtx = db.begin_transaction().await;
3010 let expected_keys = 2;
3011 let returned_keys = dbtx
3012 .find_by_prefix(&DbPrefixTestPrefix)
3013 .await
3014 .fold(0, |returned_keys, (key, value)| async move {
3015 match key {
3016 TestKey(100) => {
3017 assert!(value.eq(&TestVal(101)));
3018 }
3019 TestKey(101) => {
3020 assert!(value.eq(&TestVal(102)));
3021 }
3022 _ => {}
3023 }
3024 returned_keys + 1
3025 })
3026 .await;
3027
3028 assert_eq!(returned_keys, expected_keys);
3029
3030 let mut dbtx2 = db.begin_transaction().await;
3031
3032 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3033
3034 dbtx2.commit_tx().await;
3035
3036 let returned_keys = dbtx
3037 .find_by_prefix(&DbPrefixTestPrefix)
3038 .await
3039 .fold(0, |returned_keys, (key, value)| async move {
3040 match key {
3041 TestKey(100) => {
3042 assert!(value.eq(&TestVal(101)));
3043 }
3044 TestKey(101) => {
3045 assert!(value.eq(&TestVal(102)));
3046 }
3047 _ => {}
3048 }
3049 returned_keys + 1
3050 })
3051 .await;
3052
3053 assert_eq!(returned_keys, expected_keys);
3054 }
3055
3056 pub async fn expect_write_conflict(db: Database) {
3057 let mut dbtx = db.begin_transaction().await;
3058 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3059 dbtx.commit_tx().await;
3060
3061 let mut dbtx2 = db.begin_transaction().await;
3062 let mut dbtx3 = db.begin_transaction().await;
3063
3064 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3065
3066 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3070
3071 dbtx2.commit_tx().await;
3072 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3073 }
3074
3075 pub async fn verify_string_prefix(db: Database) {
3076 let mut dbtx = db.begin_transaction().await;
3077 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3078
3079 assert_eq!(
3080 dbtx.get_value(&PercentTestKey(100)).await,
3081 Some(TestVal(101))
3082 );
3083
3084 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3085
3086 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3087
3088 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3089
3090 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3093
3094 let expected_keys = 4;
3095 let returned_keys = dbtx
3096 .find_by_prefix(&PercentPrefixTestPrefix)
3097 .await
3098 .fold(0, |returned_keys, (key, value)| async move {
3099 if matches!(key, PercentTestKey(101)) {
3100 assert!(value.eq(&TestVal(100)));
3101 }
3102 returned_keys + 1
3103 })
3104 .await;
3105
3106 assert_eq!(returned_keys, expected_keys);
3107 }
3108
3109 pub async fn verify_remove_by_prefix(db: Database) {
3110 let mut dbtx = db.begin_transaction().await;
3111
3112 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3113
3114 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3115
3116 dbtx.commit_tx().await;
3117
3118 let mut remove_dbtx = db.begin_transaction().await;
3119 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3120 remove_dbtx.commit_tx().await;
3121
3122 let mut dbtx = db.begin_transaction().await;
3123 let expected_keys = 0;
3124 let returned_keys = dbtx
3125 .find_by_prefix(&DbPrefixTestPrefix)
3126 .await
3127 .fold(0, |returned_keys, (key, value)| async move {
3128 match key {
3129 TestKey(100) => {
3130 assert!(value.eq(&TestVal(101)));
3131 }
3132 TestKey(101) => {
3133 assert!(value.eq(&TestVal(102)));
3134 }
3135 _ => {}
3136 }
3137 returned_keys + 1
3138 })
3139 .await;
3140
3141 assert_eq!(returned_keys, expected_keys);
3142 }
3143
3144 pub async fn verify_module_db(db: Database, module_db: Database) {
3145 let mut dbtx = db.begin_transaction().await;
3146
3147 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3148
3149 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3150
3151 dbtx.commit_tx().await;
3152
3153 let mut module_dbtx = module_db.begin_transaction().await;
3155 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3156
3157 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3158
3159 let mut dbtx = db.begin_transaction().await;
3161 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3162
3163 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3164
3165 let mut module_dbtx = module_db.begin_transaction().await;
3166
3167 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3168
3169 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3170
3171 module_dbtx.commit_tx().await;
3172
3173 let expected_keys = 2;
3174 let mut dbtx = db.begin_transaction().await;
3175 let returned_keys = dbtx
3176 .find_by_prefix(&DbPrefixTestPrefix)
3177 .await
3178 .fold(0, |returned_keys, (key, value)| async move {
3179 match key {
3180 TestKey(100) => {
3181 assert!(value.eq(&TestVal(101)));
3182 }
3183 TestKey(101) => {
3184 assert!(value.eq(&TestVal(102)));
3185 }
3186 _ => {}
3187 }
3188 returned_keys + 1
3189 })
3190 .await;
3191
3192 assert_eq!(returned_keys, expected_keys);
3193
3194 let removed = dbtx.remove_entry(&TestKey(100)).await;
3195 assert_eq!(removed, Some(TestVal(101)));
3196 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3197
3198 let mut module_dbtx = module_db.begin_transaction().await;
3199 assert_eq!(
3200 module_dbtx.get_value(&TestKey(100)).await,
3201 Some(TestVal(103))
3202 );
3203 }
3204
3205 pub async fn verify_module_prefix(db: Database) {
3206 let mut test_dbtx = db.begin_transaction().await;
3207 {
3208 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3209
3210 test_module_dbtx
3211 .insert_entry(&TestKey(100), &TestVal(101))
3212 .await;
3213
3214 test_module_dbtx
3215 .insert_entry(&TestKey(101), &TestVal(102))
3216 .await;
3217 }
3218
3219 test_dbtx.commit_tx().await;
3220
3221 let mut alt_dbtx = db.begin_transaction().await;
3222 {
3223 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3224
3225 alt_module_dbtx
3226 .insert_entry(&TestKey(100), &TestVal(103))
3227 .await;
3228
3229 alt_module_dbtx
3230 .insert_entry(&TestKey(101), &TestVal(104))
3231 .await;
3232 }
3233
3234 alt_dbtx.commit_tx().await;
3235
3236 let mut test_dbtx = db.begin_transaction().await;
3238 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3239 assert_eq!(
3240 test_module_dbtx.get_value(&TestKey(100)).await,
3241 Some(TestVal(101))
3242 );
3243
3244 assert_eq!(
3245 test_module_dbtx.get_value(&TestKey(101)).await,
3246 Some(TestVal(102))
3247 );
3248
3249 let expected_keys = 2;
3250 let returned_keys = test_module_dbtx
3251 .find_by_prefix(&DbPrefixTestPrefix)
3252 .await
3253 .fold(0, |returned_keys, (key, value)| async move {
3254 match key {
3255 TestKey(100) => {
3256 assert!(value.eq(&TestVal(101)));
3257 }
3258 TestKey(101) => {
3259 assert!(value.eq(&TestVal(102)));
3260 }
3261 _ => {}
3262 }
3263 returned_keys + 1
3264 })
3265 .await;
3266
3267 assert_eq!(returned_keys, expected_keys);
3268
3269 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3270 assert_eq!(removed, Some(TestVal(101)));
3271 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3272
3273 let mut test_dbtx = db.begin_transaction().await;
3276 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3277
3278 test_dbtx.commit_tx().await;
3279 }
3280
3281 #[cfg(test)]
3282 #[tokio::test]
3283 pub async fn verify_test_migration() {
3284 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3286 let expected_test_keys_size: usize = 100;
3287 let mut dbtx = db.begin_transaction().await;
3288 for i in 0..expected_test_keys_size {
3289 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3290 .await;
3291 }
3292
3293 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3295 .await;
3296 dbtx.commit_tx().await;
3297
3298 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3299
3300 migrations.insert(
3301 DatabaseVersion(0),
3302 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3303 );
3304
3305 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3306 .await
3307 .expect("Error applying migrations for TestModule");
3308
3309 let mut dbtx = db.begin_transaction().await;
3311
3312 assert!(
3315 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3316 .await
3317 .is_some()
3318 );
3319
3320 let test_keys = dbtx
3322 .find_by_prefix(&DbPrefixTestPrefix)
3323 .await
3324 .collect::<Vec<_>>()
3325 .await;
3326 let test_keys_size = test_keys.len();
3327 assert_eq!(test_keys_size, expected_test_keys_size);
3328 for (key, val) in test_keys {
3329 assert_eq!(key.0, val.0 + 1);
3330 }
3331 }
3332
3333 #[allow(dead_code)]
3334 async fn migrate_test_db_version_0(
3335 mut ctx: DbMigrationFnContext<'_, ()>,
3336 ) -> Result<(), anyhow::Error> {
3337 let mut dbtx = ctx.dbtx();
3338 let example_keys_v0 = dbtx
3339 .find_by_prefix(&DbPrefixTestPrefixV0)
3340 .await
3341 .collect::<Vec<_>>()
3342 .await;
3343 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3344 for (key, val) in example_keys_v0 {
3345 let key_v2 = TestKey(key.1);
3346 dbtx.insert_new_entry(&key_v2, &val).await;
3347 }
3348 Ok(())
3349 }
3350
3351 #[cfg(test)]
3352 #[tokio::test]
3353 async fn test_autocommit() {
3354 use std::marker::PhantomData;
3355 use std::ops::Range;
3356 use std::path::Path;
3357
3358 use anyhow::anyhow;
3359 use async_trait::async_trait;
3360
3361 use crate::ModuleDecoderRegistry;
3362 use crate::db::{
3363 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3364 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3365 IRawDatabaseTransaction,
3366 };
3367
3368 #[derive(Debug)]
3369 struct FakeDatabase;
3370
3371 #[async_trait]
3372 impl IRawDatabase for FakeDatabase {
3373 type Transaction<'a> = FakeTransaction<'a>;
3374 async fn begin_transaction(&self) -> FakeTransaction {
3375 FakeTransaction(PhantomData)
3376 }
3377
3378 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3379 Ok(())
3380 }
3381 }
3382
3383 #[derive(Debug)]
3384 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3385
3386 #[async_trait]
3387 impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3388 async fn raw_insert_bytes(
3389 &mut self,
3390 _key: &[u8],
3391 _value: &[u8],
3392 ) -> anyhow::Result<Option<Vec<u8>>> {
3393 unimplemented!()
3394 }
3395
3396 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3397 unimplemented!()
3398 }
3399
3400 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3401 unimplemented!()
3402 }
3403
3404 async fn raw_find_by_range(
3405 &mut self,
3406 _key_range: Range<&[u8]>,
3407 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3408 unimplemented!()
3409 }
3410
3411 async fn raw_find_by_prefix(
3412 &mut self,
3413 _key_prefix: &[u8],
3414 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3415 unimplemented!()
3416 }
3417
3418 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3419 unimplemented!()
3420 }
3421
3422 async fn raw_find_by_prefix_sorted_descending(
3423 &mut self,
3424 _key_prefix: &[u8],
3425 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3426 unimplemented!()
3427 }
3428 }
3429
3430 #[async_trait]
3431 impl IDatabaseTransactionOps for FakeTransaction<'_> {
3432 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3433 unimplemented!()
3434 }
3435
3436 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3437 unimplemented!()
3438 }
3439 }
3440
3441 #[async_trait]
3442 impl IRawDatabaseTransaction for FakeTransaction<'_> {
3443 async fn commit_tx(self) -> anyhow::Result<()> {
3444 Err(anyhow!("Can't commit!"))
3445 }
3446 }
3447
3448 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3449 let err = db
3450 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3451 .await
3452 .unwrap_err();
3453
3454 match err {
3455 AutocommitError::CommitFailed {
3456 attempts: failed_attempts,
3457 ..
3458 } => {
3459 assert_eq!(failed_attempts, 5);
3460 }
3461 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3462 }
3463 }
3464}
3465
3466pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3467 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3468 decoders: ModuleDecoderRegistry,
3469 key_prefix: &KP,
3470) -> impl Stream<
3471 Item = (
3472 KP::Record,
3473 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3474 ),
3475>
3476+ 'r
3477+ use<'r, KP>
3478where
3479 'inner: 'r,
3480 KP: DatabaseLookup,
3481 KP::Record: DatabaseKey,
3482{
3483 debug!(target: LOG_DB, "find by prefix sorted descending");
3484 let prefix_bytes = key_prefix.to_bytes();
3485 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3486 .await
3487 .expect("Error doing prefix search in database")
3488 .map(move |(key_bytes, value_bytes)| {
3489 let key = decode_key_expect(&key_bytes, &decoders);
3490 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3491 (key, value)
3492 })
3493}
3494
3495pub async fn verify_module_db_integrity_dbtx(
3496 dbtx: &mut DatabaseTransaction<'_>,
3497 module_id: ModuleInstanceId,
3498 module_kind: ModuleKind,
3499 prefixes: &BTreeSet<u8>,
3500) {
3501 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3502 if module_id < 250 {
3503 assert_eq!(module_db_prefix.len(), 2);
3504 }
3505 let mut records = dbtx
3506 .raw_find_by_prefix(&module_db_prefix)
3507 .await
3508 .expect("DB fail");
3509 while let Some((k, v)) = records.next().await {
3510 assert!(
3511 prefixes.contains(&k[module_db_prefix.len()]),
3512 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3513 k.as_hex(),
3514 v.as_hex()
3515 );
3516 }
3517}
3518
3519#[cfg(test)]
3520mod tests;