1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146 fn to_bytes(&self) -> Vec<u8>;
147}
148
149pub trait DatabaseRecord: DatabaseKeyPrefix {
152 const DB_PREFIX: u8;
153 const NOTIFY_ON_MODIFY: bool = false;
154 type Key: DatabaseKey + Debug;
155 type Value: DatabaseValue + Debug;
156}
157
158pub trait DatabaseLookup: DatabaseKeyPrefix {
161 type Record: DatabaseRecord;
162}
163
164impl<Record> DatabaseLookup for Record
166where
167 Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169 type Record = Record;
170}
171
172pub trait DatabaseKey: Sized {
175 const NOTIFY_ON_MODIFY: bool = false;
183 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186pub trait DatabaseKeyWithNotify {}
188
189pub trait DatabaseValue: Sized + Debug {
191 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192 fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205 #[error("Commit Failed: {last_error}")]
207 CommitFailed {
208 attempts: usize,
210 last_error: anyhow::Error,
212 },
213 #[error("Closure error: {error}")]
216 ClosureError {
217 attempts: usize,
223 error: E,
225 },
226}
227
228pub trait AutocommitResultExt<T, E> {
229 fn unwrap_autocommit(self) -> Result<T, E>;
233}
234
235impl<T, E> AutocommitResultExt<T, E> for Result<T, AutocommitError<E>> {
236 fn unwrap_autocommit(self) -> Result<T, E> {
237 match self {
238 Ok(value) => Ok(value),
239 Err(AutocommitError::CommitFailed { .. }) => {
240 panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
241 }
242 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
243 }
244 }
245}
246
247#[apply(async_trait_maybe_send!)]
256pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
257 type Transaction<'a>: IRawDatabaseTransaction + Debug;
259
260 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
262
263 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
265}
266
267#[apply(async_trait_maybe_send!)]
268impl<T> IRawDatabase for Box<T>
269where
270 T: IRawDatabase,
271{
272 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
273
274 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
275 (**self).begin_transaction().await
276 }
277
278 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
279 (**self).checkpoint(backup_path)
280 }
281}
282
283pub trait IRawDatabaseExt: IRawDatabase + Sized {
285 fn into_database(self) -> Database {
289 Database::new(self, ModuleRegistry::default())
290 }
291}
292
293impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
294
295impl<T> From<T> for Database
296where
297 T: IRawDatabase,
298{
299 fn from(raw: T) -> Self {
300 Self::new(raw, ModuleRegistry::default())
301 }
302}
303
304#[apply(async_trait_maybe_send!)]
307pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
308 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
310 async fn register(&self, key: &[u8]);
312 async fn notify(&self, key: &[u8]);
314
315 fn is_global(&self) -> bool;
318
319 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
321}
322
323#[apply(async_trait_maybe_send!)]
324impl<T> IDatabase for Arc<T>
325where
326 T: IDatabase + ?Sized,
327{
328 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
329 (**self).begin_transaction().await
330 }
331 async fn register(&self, key: &[u8]) {
332 (**self).register(key).await;
333 }
334 async fn notify(&self, key: &[u8]) {
335 (**self).notify(key).await;
336 }
337
338 fn is_global(&self) -> bool {
339 (**self).is_global()
340 }
341
342 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
343 (**self).checkpoint(backup_path)
344 }
345}
346
347struct BaseDatabase<RawDatabase> {
351 notifications: Arc<Notifications>,
352 raw: RawDatabase,
353}
354
355impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
356 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357 f.write_str("BaseDatabase")
358 }
359}
360
361#[apply(async_trait_maybe_send!)]
362impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
363 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
364 Box::new(BaseDatabaseTransaction::new(
365 self.raw.begin_transaction().await,
366 self.notifications.clone(),
367 ))
368 }
369 async fn register(&self, key: &[u8]) {
370 self.notifications.register(key).await;
371 }
372 async fn notify(&self, key: &[u8]) {
373 self.notifications.notify(key);
374 }
375
376 fn is_global(&self) -> bool {
377 true
378 }
379
380 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
381 self.raw.checkpoint(backup_path)
382 }
383}
384
385#[derive(Clone, Debug)]
391pub struct Database {
392 inner: Arc<dyn IDatabase + 'static>,
393 module_decoders: ModuleDecoderRegistry,
394}
395
396impl Database {
397 pub fn strong_count(&self) -> usize {
398 Arc::strong_count(&self.inner)
399 }
400
401 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
402 self.inner
403 }
404}
405
406impl Database {
407 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
412 let inner = BaseDatabase {
413 raw,
414 notifications: Arc::new(Notifications::new()),
415 };
416 Self::new_from_arc(
417 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
418 module_decoders,
419 )
420 }
421
422 pub fn new_from_arc(
424 inner: Arc<dyn IDatabase + 'static>,
425 module_decoders: ModuleDecoderRegistry,
426 ) -> Self {
427 Self {
428 inner,
429 module_decoders,
430 }
431 }
432
433 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
435 Self {
436 inner: Arc::new(PrefixDatabase {
437 inner: self.inner.clone(),
438 global_dbtx_access_token: None,
439 prefix,
440 }),
441 module_decoders: self.module_decoders.clone(),
442 }
443 }
444
445 pub fn with_prefix_module_id(
449 &self,
450 module_instance_id: ModuleInstanceId,
451 ) -> (Self, GlobalDBTxAccessToken) {
452 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
453 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
454 (
455 Self {
456 inner: Arc::new(PrefixDatabase {
457 inner: self.inner.clone(),
458 global_dbtx_access_token: Some(global_dbtx_access_token),
459 prefix,
460 }),
461 module_decoders: self.module_decoders.clone(),
462 },
463 global_dbtx_access_token,
464 )
465 }
466
467 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
468 Self {
469 inner: self.inner.clone(),
470 module_decoders,
471 }
472 }
473
474 pub fn is_global(&self) -> bool {
476 self.inner.is_global()
477 }
478
479 pub fn ensure_global(&self) -> Result<()> {
481 if !self.is_global() {
482 bail!("Database instance not global");
483 }
484
485 Ok(())
486 }
487
488 pub fn ensure_isolated(&self) -> Result<()> {
490 if self.is_global() {
491 bail!("Database instance not isolated");
492 }
493
494 Ok(())
495 }
496
497 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
499 where
500 's: 'tx,
501 {
502 DatabaseTransaction::<Committable>::new(
503 self.inner.begin_transaction().await,
504 self.module_decoders.clone(),
505 )
506 }
507
508 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
510 where
511 's: 'tx,
512 {
513 self.begin_transaction().await.into_nc()
514 }
515
516 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
517 self.inner.checkpoint(backup_path)
518 }
519
520 pub async fn autocommit<'s, 'dbtx, F, T, E>(
548 &'s self,
549 tx_fn: F,
550 max_attempts: Option<usize>,
551 ) -> Result<T, AutocommitError<E>>
552 where
553 's: 'dbtx,
554 for<'r, 'o> F: Fn(
555 &'r mut DatabaseTransaction<'o>,
556 PhantomBound<'dbtx, 'o>,
557 ) -> BoxFuture<'r, Result<T, E>>,
558 {
559 assert_ne!(max_attempts, Some(0));
560 let mut curr_attempts: usize = 0;
561
562 loop {
563 curr_attempts = curr_attempts
568 .checked_add(1)
569 .expect("db autocommit attempt counter overflowed");
570
571 let mut dbtx = self.begin_transaction().await;
572
573 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
574 let val = match tx_fn_res {
575 Ok(val) => val,
576 Err(err) => {
577 dbtx.ignore_uncommitted();
578 return Err(AutocommitError::ClosureError {
579 attempts: curr_attempts,
580 error: err,
581 });
582 }
583 };
584
585 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
586
587 match dbtx.commit_tx_result().await {
588 Ok(()) => {
589 return Ok(val);
590 }
591 Err(err) => {
592 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
593 warn!(
594 target: LOG_DB,
595 curr_attempts,
596 err = %err.fmt_compact_anyhow(),
597 "Database commit failed in an autocommit block - terminating"
598 );
599 return Err(AutocommitError::CommitFailed {
600 attempts: curr_attempts,
601 last_error: err,
602 });
603 }
604
605 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
606 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
607 warn!(
608 target: LOG_DB,
609 curr_attempts,
610 err = %err.fmt_compact_anyhow(),
611 delay_ms = %delay,
612 "Database commit failed in an autocommit block - retrying"
613 );
614 crate::runtime::sleep(Duration::from_millis(delay)).await;
615 }
616 }
617 }
618 }
619
620 pub async fn wait_key_check<'a, K, T>(
625 &'a self,
626 key: &K,
627 checker: impl Fn(Option<K::Value>) -> Option<T>,
628 ) -> (T, DatabaseTransaction<'a, Committable>)
629 where
630 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
631 {
632 let key_bytes = key.to_bytes();
633 loop {
634 let notify = self.inner.register(&key_bytes);
636
637 let mut tx = self.inner.begin_transaction().await;
639
640 let maybe_value_bytes = tx
641 .raw_get_bytes(&key_bytes)
642 .await
643 .expect("Unrecoverable error when reading from database")
644 .map(|value_bytes| {
645 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
646 });
647
648 if let Some(value) = checker(maybe_value_bytes) {
649 return (
650 value,
651 DatabaseTransaction::new(tx, self.module_decoders.clone()),
652 );
653 }
654
655 notify.await;
657 }
660 }
661
662 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
664 where
665 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
666 {
667 self.wait_key_check(key, std::convert::identity).await.0
668 }
669}
670
671fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
672 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
673 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
674 bytes
675}
676
677#[derive(Clone, Debug)]
680struct PrefixDatabase<Inner>
681where
682 Inner: Debug,
683{
684 prefix: Vec<u8>,
685 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
686 inner: Inner,
687}
688
689impl<Inner> PrefixDatabase<Inner>
690where
691 Inner: Debug,
692{
693 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
697 let mut full_key = self.prefix.clone();
698 full_key.extend_from_slice(key);
699 full_key
700 }
701}
702
703#[apply(async_trait_maybe_send!)]
704impl<Inner> IDatabase for PrefixDatabase<Inner>
705where
706 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
707{
708 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
709 Box::new(PrefixDatabaseTransaction {
710 inner: self.inner.begin_transaction().await,
711 global_dbtx_access_token: self.global_dbtx_access_token,
712 prefix: self.prefix.clone(),
713 })
714 }
715 async fn register(&self, key: &[u8]) {
716 self.inner.register(&self.get_full_key(key)).await;
717 }
718
719 async fn notify(&self, key: &[u8]) {
720 self.inner.notify(&self.get_full_key(key)).await;
721 }
722
723 fn is_global(&self) -> bool {
724 if self.global_dbtx_access_token.is_some() {
725 false
726 } else {
727 self.inner.is_global()
728 }
729 }
730
731 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
732 self.inner.checkpoint(backup_path)
733 }
734}
735
736#[derive(Debug)]
741struct PrefixDatabaseTransaction<Inner> {
742 inner: Inner,
743 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
744 prefix: Vec<u8>,
745}
746
747impl<Inner> PrefixDatabaseTransaction<Inner> {
748 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
752 let mut full_key = self.prefix.clone();
753 full_key.extend_from_slice(key);
754 full_key
755 }
756
757 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
758 Range {
759 start: self.get_full_key(range.start),
760 end: self.get_full_key(range.end),
761 }
762 }
763
764 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
765 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
766 }
767}
768
769#[apply(async_trait_maybe_send!)]
770impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
771where
772 Inner: IDatabaseTransaction,
773{
774 async fn commit_tx(&mut self) -> Result<()> {
775 self.inner.commit_tx().await
776 }
777
778 fn is_global(&self) -> bool {
779 if self.global_dbtx_access_token.is_some() {
780 false
781 } else {
782 self.inner.is_global()
783 }
784 }
785
786 fn global_dbtx(
787 &mut self,
788 access_token: GlobalDBTxAccessToken,
789 ) -> &mut dyn IDatabaseTransaction {
790 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
791 assert_eq!(
792 access_token, self_global_dbtx_access_token,
793 "Invalid access key used to access global_dbtx"
794 );
795 &mut self.inner
796 } else {
797 self.inner.global_dbtx(access_token)
798 }
799 }
800}
801
802#[apply(async_trait_maybe_send!)]
803impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
804where
805 Inner: IDatabaseTransactionOpsCore,
806{
807 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
808 let key = self.get_full_key(key);
809 self.inner.raw_insert_bytes(&key, value).await
810 }
811
812 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
813 let key = self.get_full_key(key);
814 self.inner.raw_get_bytes(&key).await
815 }
816
817 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
818 let key = self.get_full_key(key);
819 self.inner.raw_remove_entry(&key).await
820 }
821
822 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
823 let key = self.get_full_key(key_prefix);
824 let stream = self.inner.raw_find_by_prefix(&key).await?;
825 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
826 }
827
828 async fn raw_find_by_prefix_sorted_descending(
829 &mut self,
830 key_prefix: &[u8],
831 ) -> Result<PrefixStream<'_>> {
832 let key = self.get_full_key(key_prefix);
833 let stream = self
834 .inner
835 .raw_find_by_prefix_sorted_descending(&key)
836 .await?;
837 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
838 }
839
840 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
841 let range = self.get_full_range(range);
842 let stream = self
843 .inner
844 .raw_find_by_range(Range {
845 start: &range.start,
846 end: &range.end,
847 })
848 .await?;
849 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
850 }
851
852 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
853 let key = self.get_full_key(key_prefix);
854 self.inner.raw_remove_by_prefix(&key).await
855 }
856}
857
858#[apply(async_trait_maybe_send!)]
859impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
860where
861 Inner: IDatabaseTransactionOps,
862{
863 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
864 self.inner.rollback_tx_to_savepoint().await
865 }
866
867 async fn set_tx_savepoint(&mut self) -> Result<()> {
868 self.set_tx_savepoint().await
869 }
870}
871
872#[apply(async_trait_maybe_send!)]
876pub trait IDatabaseTransactionOpsCore: MaybeSend {
877 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
879
880 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
882
883 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
885
886 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
889
890 async fn raw_find_by_prefix_sorted_descending(
892 &mut self,
893 key_prefix: &[u8],
894 ) -> Result<PrefixStream<'_>>;
895
896 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
900
901 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
903}
904
905#[apply(async_trait_maybe_send!)]
906impl<T> IDatabaseTransactionOpsCore for Box<T>
907where
908 T: IDatabaseTransactionOpsCore + ?Sized,
909{
910 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
911 (**self).raw_insert_bytes(key, value).await
912 }
913
914 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
915 (**self).raw_get_bytes(key).await
916 }
917
918 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
919 (**self).raw_remove_entry(key).await
920 }
921
922 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
923 (**self).raw_find_by_prefix(key_prefix).await
924 }
925
926 async fn raw_find_by_prefix_sorted_descending(
927 &mut self,
928 key_prefix: &[u8],
929 ) -> Result<PrefixStream<'_>> {
930 (**self)
931 .raw_find_by_prefix_sorted_descending(key_prefix)
932 .await
933 }
934
935 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
936 (**self).raw_find_by_range(range).await
937 }
938
939 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
940 (**self).raw_remove_by_prefix(key_prefix).await
941 }
942}
943
944#[apply(async_trait_maybe_send!)]
945impl<T> IDatabaseTransactionOpsCore for &mut T
946where
947 T: IDatabaseTransactionOpsCore + ?Sized,
948{
949 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
950 (**self).raw_insert_bytes(key, value).await
951 }
952
953 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
954 (**self).raw_get_bytes(key).await
955 }
956
957 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
958 (**self).raw_remove_entry(key).await
959 }
960
961 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
962 (**self).raw_find_by_prefix(key_prefix).await
963 }
964
965 async fn raw_find_by_prefix_sorted_descending(
966 &mut self,
967 key_prefix: &[u8],
968 ) -> Result<PrefixStream<'_>> {
969 (**self)
970 .raw_find_by_prefix_sorted_descending(key_prefix)
971 .await
972 }
973
974 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
975 (**self).raw_find_by_range(range).await
976 }
977
978 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
979 (**self).raw_remove_by_prefix(key_prefix).await
980 }
981}
982
983#[apply(async_trait_maybe_send!)]
989pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
990 async fn set_tx_savepoint(&mut self) -> Result<()>;
999
1000 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
1001}
1002
1003#[apply(async_trait_maybe_send!)]
1004impl<T> IDatabaseTransactionOps for Box<T>
1005where
1006 T: IDatabaseTransactionOps + ?Sized,
1007{
1008 async fn set_tx_savepoint(&mut self) -> Result<()> {
1009 (**self).set_tx_savepoint().await
1010 }
1011
1012 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1013 (**self).rollback_tx_to_savepoint().await
1014 }
1015}
1016
1017#[apply(async_trait_maybe_send!)]
1018impl<T> IDatabaseTransactionOps for &mut T
1019where
1020 T: IDatabaseTransactionOps + ?Sized,
1021{
1022 async fn set_tx_savepoint(&mut self) -> Result<()> {
1023 (**self).set_tx_savepoint().await
1024 }
1025
1026 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1027 (**self).rollback_tx_to_savepoint().await
1028 }
1029}
1030
1031#[apply(async_trait_maybe_send!)]
1037pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1038 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1039 where
1040 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1041
1042 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1043 where
1044 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1045 K::Value: MaybeSend + MaybeSync;
1046
1047 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1048 where
1049 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1050 K::Value: MaybeSend + MaybeSync;
1051
1052 async fn find_by_range<K>(
1053 &mut self,
1054 key_range: Range<K>,
1055 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1056 where
1057 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1058 K::Value: MaybeSend + MaybeSync;
1059
1060 async fn find_by_prefix<KP>(
1061 &mut self,
1062 key_prefix: &KP,
1063 ) -> Pin<
1064 Box<
1065 maybe_add_send!(
1066 dyn Stream<
1067 Item = (
1068 KP::Record,
1069 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070 ),
1071 > + '_
1072 ),
1073 >,
1074 >
1075 where
1076 KP: DatabaseLookup + MaybeSend + MaybeSync,
1077 KP::Record: DatabaseKey;
1078
1079 async fn find_by_prefix_sorted_descending<KP>(
1080 &mut self,
1081 key_prefix: &KP,
1082 ) -> Pin<
1083 Box<
1084 maybe_add_send!(
1085 dyn Stream<
1086 Item = (
1087 KP::Record,
1088 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1089 ),
1090 > + '_
1091 ),
1092 >,
1093 >
1094 where
1095 KP: DatabaseLookup + MaybeSend + MaybeSync,
1096 KP::Record: DatabaseKey;
1097
1098 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1099 where
1100 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1101
1102 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1103 where
1104 KP: DatabaseLookup + MaybeSend + MaybeSync;
1105}
1106
1107#[apply(async_trait_maybe_send!)]
1110impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1111where
1112 T: IDatabaseTransactionOpsCore + WithDecoders,
1113{
1114 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1115 where
1116 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1117 {
1118 let key_bytes = key.to_bytes();
1119 let raw = self
1120 .raw_get_bytes(&key_bytes)
1121 .await
1122 .expect("Unrecoverable error occurred while reading and entry from the database");
1123 raw.map(|value_bytes| {
1124 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1125 })
1126 }
1127
1128 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1129 where
1130 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1131 K::Value: MaybeSend + MaybeSync,
1132 {
1133 let key_bytes = key.to_bytes();
1134 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1135 .await
1136 .expect("Unrecoverable error occurred while inserting entry into the database")
1137 .map(|value_bytes| {
1138 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1139 })
1140 }
1141
1142 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1143 where
1144 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1145 K::Value: MaybeSend + MaybeSync,
1146 {
1147 if let Some(prev) = self.insert_entry(key, value).await {
1148 panic!(
1149 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1150 );
1151 }
1152 }
1153
1154 async fn find_by_range<K>(
1155 &mut self,
1156 key_range: Range<K>,
1157 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1158 where
1159 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1160 K::Value: MaybeSend + MaybeSync,
1161 {
1162 let decoders = self.decoders().clone();
1163 Box::pin(
1164 self.raw_find_by_range(Range {
1165 start: &key_range.start.to_bytes(),
1166 end: &key_range.end.to_bytes(),
1167 })
1168 .await
1169 .expect("Unrecoverable error occurred while listing entries from the database")
1170 .map(move |(key_bytes, value_bytes)| {
1171 let key = decode_key_expect(&key_bytes, &decoders);
1172 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1173 (key, value)
1174 }),
1175 )
1176 }
1177
1178 async fn find_by_prefix<KP>(
1179 &mut self,
1180 key_prefix: &KP,
1181 ) -> Pin<
1182 Box<
1183 maybe_add_send!(
1184 dyn Stream<
1185 Item = (
1186 KP::Record,
1187 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1188 ),
1189 > + '_
1190 ),
1191 >,
1192 >
1193 where
1194 KP: DatabaseLookup + MaybeSend + MaybeSync,
1195 KP::Record: DatabaseKey,
1196 {
1197 let decoders = self.decoders().clone();
1198 Box::pin(
1199 self.raw_find_by_prefix(&key_prefix.to_bytes())
1200 .await
1201 .expect("Unrecoverable error occurred while listing entries from the database")
1202 .map(move |(key_bytes, value_bytes)| {
1203 let key = decode_key_expect(&key_bytes, &decoders);
1204 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1205 (key, value)
1206 }),
1207 )
1208 }
1209
1210 async fn find_by_prefix_sorted_descending<KP>(
1211 &mut self,
1212 key_prefix: &KP,
1213 ) -> Pin<
1214 Box<
1215 maybe_add_send!(
1216 dyn Stream<
1217 Item = (
1218 KP::Record,
1219 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1220 ),
1221 > + '_
1222 ),
1223 >,
1224 >
1225 where
1226 KP: DatabaseLookup + MaybeSend + MaybeSync,
1227 KP::Record: DatabaseKey,
1228 {
1229 let decoders = self.decoders().clone();
1230 Box::pin(
1231 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1232 .await
1233 .expect("Unrecoverable error occurred while listing entries from the database")
1234 .map(move |(key_bytes, value_bytes)| {
1235 let key = decode_key_expect(&key_bytes, &decoders);
1236 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1237 (key, value)
1238 }),
1239 )
1240 }
1241 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1242 where
1243 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1244 {
1245 let key_bytes = key.to_bytes();
1246 self.raw_remove_entry(&key_bytes)
1247 .await
1248 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1249 .map(|value_bytes| {
1250 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1251 })
1252 }
1253 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1254 where
1255 KP: DatabaseLookup + MaybeSend + MaybeSync,
1256 {
1257 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1258 .await
1259 .expect("Unrecoverable error when removing entries from the database");
1260 }
1261}
1262
1263pub trait WithDecoders {
1266 fn decoders(&self) -> &ModuleDecoderRegistry;
1267}
1268
1269#[apply(async_trait_maybe_send!)]
1271pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1272 async fn commit_tx(self) -> Result<()>;
1273}
1274
1275#[apply(async_trait_maybe_send!)]
1279pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1280 async fn commit_tx(&mut self) -> Result<()>;
1282
1283 fn is_global(&self) -> bool;
1285
1286 #[doc(hidden)]
1291 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1292 -> &mut dyn IDatabaseTransaction;
1293}
1294
1295#[apply(async_trait_maybe_send!)]
1296impl<T> IDatabaseTransaction for Box<T>
1297where
1298 T: IDatabaseTransaction + ?Sized,
1299{
1300 async fn commit_tx(&mut self) -> Result<()> {
1301 (**self).commit_tx().await
1302 }
1303
1304 fn is_global(&self) -> bool {
1305 (**self).is_global()
1306 }
1307
1308 fn global_dbtx(
1309 &mut self,
1310 access_token: GlobalDBTxAccessToken,
1311 ) -> &mut dyn IDatabaseTransaction {
1312 (**self).global_dbtx(access_token)
1313 }
1314}
1315
1316#[apply(async_trait_maybe_send!)]
1317impl<'a, T> IDatabaseTransaction for &'a mut T
1318where
1319 T: IDatabaseTransaction + ?Sized,
1320{
1321 async fn commit_tx(&mut self) -> Result<()> {
1322 (**self).commit_tx().await
1323 }
1324
1325 fn is_global(&self) -> bool {
1326 (**self).is_global()
1327 }
1328
1329 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1330 (**self).global_dbtx(access_key)
1331 }
1332}
1333
1334struct BaseDatabaseTransaction<Tx> {
1337 raw: Option<Tx>,
1339 notify_queue: Option<NotifyQueue>,
1340 notifications: Arc<Notifications>,
1341}
1342
1343impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1344where
1345 Tx: fmt::Debug,
1346{
1347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1348 f.write_fmt(format_args!(
1349 "BaseDatabaseTransaction{{ raw={:?} }}",
1350 self.raw
1351 ))
1352 }
1353}
1354impl<Tx> BaseDatabaseTransaction<Tx>
1355where
1356 Tx: IRawDatabaseTransaction,
1357{
1358 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1359 Self {
1360 raw: Some(dbtx),
1361 notifications,
1362 notify_queue: Some(NotifyQueue::new()),
1363 }
1364 }
1365
1366 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1367 self.notify_queue
1368 .as_mut()
1369 .context("can not call add_notification_key after commit")?
1370 .add(&key);
1371 Ok(())
1372 }
1373}
1374
1375#[apply(async_trait_maybe_send!)]
1376impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1377 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1378 self.add_notification_key(key)?;
1379 self.raw
1380 .as_mut()
1381 .context("Cannot insert into already consumed transaction")?
1382 .raw_insert_bytes(key, value)
1383 .await
1384 }
1385
1386 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1387 self.raw
1388 .as_mut()
1389 .context("Cannot retrieve from already consumed transaction")?
1390 .raw_get_bytes(key)
1391 .await
1392 }
1393
1394 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1395 self.add_notification_key(key)?;
1396 self.raw
1397 .as_mut()
1398 .context("Cannot remove from already consumed transaction")?
1399 .raw_remove_entry(key)
1400 .await
1401 }
1402
1403 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1404 self.raw
1405 .as_mut()
1406 .context("Cannot retrieve from already consumed transaction")?
1407 .raw_find_by_range(key_range)
1408 .await
1409 }
1410
1411 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1412 self.raw
1413 .as_mut()
1414 .context("Cannot retrieve from already consumed transaction")?
1415 .raw_find_by_prefix(key_prefix)
1416 .await
1417 }
1418
1419 async fn raw_find_by_prefix_sorted_descending(
1420 &mut self,
1421 key_prefix: &[u8],
1422 ) -> Result<PrefixStream<'_>> {
1423 self.raw
1424 .as_mut()
1425 .context("Cannot retrieve from already consumed transaction")?
1426 .raw_find_by_prefix_sorted_descending(key_prefix)
1427 .await
1428 }
1429
1430 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1431 self.raw
1432 .as_mut()
1433 .context("Cannot remove from already consumed transaction")?
1434 .raw_remove_by_prefix(key_prefix)
1435 .await
1436 }
1437}
1438
1439#[apply(async_trait_maybe_send!)]
1440impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1441 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1442 self.raw
1443 .as_mut()
1444 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1445 .rollback_tx_to_savepoint()
1446 .await?;
1447 Ok(())
1448 }
1449
1450 async fn set_tx_savepoint(&mut self) -> Result<()> {
1451 self.raw
1452 .as_mut()
1453 .context("Cannot set a tx savepoint on an already consumed transaction")?
1454 .set_tx_savepoint()
1455 .await?;
1456 Ok(())
1457 }
1458}
1459
1460#[apply(async_trait_maybe_send!)]
1461impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1462 for BaseDatabaseTransaction<Tx>
1463{
1464 async fn commit_tx(&mut self) -> Result<()> {
1465 self.raw
1466 .take()
1467 .context("Cannot commit an already committed transaction")?
1468 .commit_tx()
1469 .await?;
1470 self.notifications.submit_queue(
1471 &self
1472 .notify_queue
1473 .take()
1474 .expect("commit must be called only once"),
1475 );
1476 Ok(())
1477 }
1478
1479 fn is_global(&self) -> bool {
1480 true
1481 }
1482
1483 fn global_dbtx(
1484 &mut self,
1485 _access_token: GlobalDBTxAccessToken,
1486 ) -> &mut dyn IDatabaseTransaction {
1487 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1488 }
1489}
1490
1491#[derive(Clone)]
1494struct CommitTracker {
1495 is_committed: bool,
1497 has_writes: bool,
1499 ignore_uncommitted: bool,
1501}
1502
1503impl Drop for CommitTracker {
1504 fn drop(&mut self) {
1505 if self.has_writes && !self.is_committed {
1506 if self.ignore_uncommitted {
1507 trace!(
1508 target: LOG_DB,
1509 "DatabaseTransaction has writes and has not called commit, but that's expected."
1510 );
1511 } else {
1512 warn!(
1513 target: LOG_DB,
1514 location = ?backtrace::Backtrace::new(),
1515 "DatabaseTransaction has writes and has not called commit."
1516 );
1517 }
1518 }
1519 }
1520}
1521
1522enum MaybeRef<'a, T> {
1523 Owned(T),
1524 Borrowed(&'a mut T),
1525}
1526
1527impl<T> ops::Deref for MaybeRef<'_, T> {
1528 type Target = T;
1529
1530 fn deref(&self) -> &Self::Target {
1531 match self {
1532 MaybeRef::Owned(o) => o,
1533 MaybeRef::Borrowed(r) => r,
1534 }
1535 }
1536}
1537
1538impl<T> ops::DerefMut for MaybeRef<'_, T> {
1539 fn deref_mut(&mut self) -> &mut Self::Target {
1540 match self {
1541 MaybeRef::Owned(o) => o,
1542 MaybeRef::Borrowed(r) => r,
1543 }
1544 }
1545}
1546
1547pub struct Committable;
1551
1552pub struct NonCommittable;
1556
1557pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1561 tx: Box<dyn IDatabaseTransaction + 'tx>,
1562 decoders: ModuleDecoderRegistry,
1563 commit_tracker: MaybeRef<'tx, CommitTracker>,
1564 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1565 capability: marker::PhantomData<Cap>,
1566}
1567
1568impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1570 f.write_fmt(format_args!(
1571 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1572 self.tx, self.decoders
1573 ))
1574 }
1575}
1576
1577impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1578 fn decoders(&self) -> &ModuleDecoderRegistry {
1579 &self.decoders
1580 }
1581}
1582
1583#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1584fn decode_value<V: DatabaseValue>(
1585 value_bytes: &[u8],
1586 decoders: &ModuleDecoderRegistry,
1587) -> Result<V, DecodingError> {
1588 trace!(
1589 bytes = %AbbreviateHexBytes(value_bytes),
1590 "decoding value",
1591 );
1592 V::from_bytes(value_bytes, decoders)
1593}
1594
1595#[track_caller]
1596fn decode_value_expect<V: DatabaseValue>(
1597 value_bytes: &[u8],
1598 decoders: &ModuleDecoderRegistry,
1599 key_bytes: &[u8],
1600) -> V {
1601 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1602 panic!(
1603 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1604 any::type_name::<V>(),
1605 err,
1606 AbbreviateHexBytes(key_bytes),
1607 AbbreviateHexBytes(value_bytes),
1608 )
1609 })
1610}
1611
1612#[track_caller]
1613fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1614 trace!(
1615 bytes = %AbbreviateHexBytes(key_bytes),
1616 "decoding key",
1617 );
1618 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1619 panic!(
1620 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1621 any::type_name::<K>(),
1622 err,
1623 AbbreviateHexBytes(key_bytes)
1624 )
1625 })
1626}
1627
1628impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1629 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1631 DatabaseTransaction {
1632 tx: self.tx,
1633 decoders: self.decoders,
1634 commit_tracker: self.commit_tracker,
1635 on_commit_hooks: self.on_commit_hooks,
1636 capability: PhantomData::<NonCommittable>,
1637 }
1638 }
1639
1640 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1642 where
1643 's: 'a,
1644 {
1645 self.to_ref().into_nc()
1646 }
1647
1648 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1650 where
1651 'tx: 'a,
1652 {
1653 DatabaseTransaction {
1654 tx: Box::new(PrefixDatabaseTransaction {
1655 inner: self.tx,
1656 global_dbtx_access_token: None,
1657 prefix,
1658 }),
1659 decoders: self.decoders,
1660 commit_tracker: self.commit_tracker,
1661 on_commit_hooks: self.on_commit_hooks,
1662 capability: self.capability,
1663 }
1664 }
1665
1666 pub fn with_prefix_module_id<'a: 'tx>(
1670 self,
1671 module_instance_id: ModuleInstanceId,
1672 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1673 where
1674 'tx: 'a,
1675 {
1676 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1677 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1678 (
1679 DatabaseTransaction {
1680 tx: Box::new(PrefixDatabaseTransaction {
1681 inner: self.tx,
1682 global_dbtx_access_token: Some(global_dbtx_access_token),
1683 prefix,
1684 }),
1685 decoders: self.decoders,
1686 commit_tracker: self.commit_tracker,
1687 on_commit_hooks: self.on_commit_hooks,
1688 capability: self.capability,
1689 },
1690 global_dbtx_access_token,
1691 )
1692 }
1693
1694 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1696 where
1697 's: 'a,
1698 {
1699 let decoders = self.decoders.clone();
1700
1701 DatabaseTransaction {
1702 tx: Box::new(&mut self.tx),
1703 decoders,
1704 commit_tracker: match self.commit_tracker {
1705 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1706 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1707 },
1708 on_commit_hooks: match self.on_commit_hooks {
1709 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1710 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1711 },
1712 capability: self.capability,
1713 }
1714 }
1715
1716 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1718 where
1719 'tx: 'a,
1720 {
1721 DatabaseTransaction {
1722 tx: Box::new(PrefixDatabaseTransaction {
1723 inner: &mut self.tx,
1724 global_dbtx_access_token: None,
1725 prefix,
1726 }),
1727 decoders: self.decoders.clone(),
1728 commit_tracker: match self.commit_tracker {
1729 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1730 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1731 },
1732 on_commit_hooks: match self.on_commit_hooks {
1733 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1734 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1735 },
1736 capability: self.capability,
1737 }
1738 }
1739
1740 pub fn to_ref_with_prefix_module_id<'a>(
1741 &'a mut self,
1742 module_instance_id: ModuleInstanceId,
1743 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1744 where
1745 'tx: 'a,
1746 {
1747 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1748 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1749 (
1750 DatabaseTransaction {
1751 tx: Box::new(PrefixDatabaseTransaction {
1752 inner: &mut self.tx,
1753 global_dbtx_access_token: Some(global_dbtx_access_token),
1754 prefix,
1755 }),
1756 decoders: self.decoders.clone(),
1757 commit_tracker: match self.commit_tracker {
1758 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1759 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1760 },
1761 on_commit_hooks: match self.on_commit_hooks {
1762 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1763 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1764 },
1765 capability: self.capability,
1766 },
1767 global_dbtx_access_token,
1768 )
1769 }
1770
1771 pub fn is_global(&self) -> bool {
1773 self.tx.is_global()
1774 }
1775
1776 pub fn ensure_global(&self) -> Result<()> {
1778 if !self.is_global() {
1779 bail!("Database instance not global");
1780 }
1781
1782 Ok(())
1783 }
1784
1785 pub fn ensure_isolated(&self) -> Result<()> {
1787 if self.is_global() {
1788 bail!("Database instance not isolated");
1789 }
1790
1791 Ok(())
1792 }
1793
1794 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1796 self.commit_tracker.ignore_uncommitted = true;
1797 self
1798 }
1799
1800 pub fn warn_uncommitted(&mut self) -> &mut Self {
1802 self.commit_tracker.ignore_uncommitted = false;
1803 self
1804 }
1805
1806 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1808 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1809 self.on_commit_hooks.push(Box::new(f));
1810 }
1811
1812 pub fn global_dbtx<'a>(
1813 &'a mut self,
1814 access_token: GlobalDBTxAccessToken,
1815 ) -> DatabaseTransaction<'a, Cap>
1816 where
1817 'tx: 'a,
1818 {
1819 let decoders = self.decoders.clone();
1820
1821 DatabaseTransaction {
1822 tx: Box::new(self.tx.global_dbtx(access_token)),
1823 decoders,
1824 commit_tracker: match self.commit_tracker {
1825 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1826 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1827 },
1828 on_commit_hooks: match self.on_commit_hooks {
1829 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1830 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1831 },
1832 capability: self.capability,
1833 }
1834 }
1835}
1836
1837#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1839pub struct GlobalDBTxAccessToken(u32);
1840
1841impl GlobalDBTxAccessToken {
1842 fn from_prefix(prefix: &[u8]) -> Self {
1853 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1854 }
1855}
1856
1857impl<'tx> DatabaseTransaction<'tx, Committable> {
1858 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1859 Self {
1860 tx: dbtx,
1861 decoders,
1862 commit_tracker: MaybeRef::Owned(CommitTracker {
1863 is_committed: false,
1864 has_writes: false,
1865 ignore_uncommitted: false,
1866 }),
1867 on_commit_hooks: MaybeRef::Owned(vec![]),
1868 capability: PhantomData,
1869 }
1870 }
1871
1872 pub async fn commit_tx_result(mut self) -> Result<()> {
1873 self.commit_tracker.is_committed = true;
1874 let commit_result = self.tx.commit_tx().await;
1875
1876 if commit_result.is_ok() {
1878 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1879 hook();
1880 }
1881 }
1882
1883 commit_result
1884 }
1885
1886 pub async fn commit_tx(mut self) {
1887 self.commit_tracker.is_committed = true;
1888 self.commit_tx_result()
1889 .await
1890 .expect("Unrecoverable error occurred while committing to the database.");
1891 }
1892}
1893
1894#[apply(async_trait_maybe_send!)]
1895impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1896where
1897 Cap: Send,
1898{
1899 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1900 self.commit_tracker.has_writes = true;
1901 self.tx.raw_insert_bytes(key, value).await
1902 }
1903
1904 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1905 self.tx.raw_get_bytes(key).await
1906 }
1907
1908 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1909 self.tx.raw_remove_entry(key).await
1910 }
1911
1912 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1913 self.tx.raw_find_by_range(key_range).await
1914 }
1915
1916 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1917 self.tx.raw_find_by_prefix(key_prefix).await
1918 }
1919
1920 async fn raw_find_by_prefix_sorted_descending(
1921 &mut self,
1922 key_prefix: &[u8],
1923 ) -> Result<PrefixStream<'_>> {
1924 self.tx
1925 .raw_find_by_prefix_sorted_descending(key_prefix)
1926 .await
1927 }
1928
1929 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1930 self.commit_tracker.has_writes = true;
1931 self.tx.raw_remove_by_prefix(key_prefix).await
1932 }
1933}
1934#[apply(async_trait_maybe_send!)]
1935impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {
1936 async fn set_tx_savepoint(&mut self) -> Result<()> {
1937 self.tx.set_tx_savepoint().await
1938 }
1939
1940 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1941 self.tx.rollback_tx_to_savepoint().await
1942 }
1943}
1944
1945impl<T> DatabaseKeyPrefix for T
1946where
1947 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1948{
1949 fn to_bytes(&self) -> Vec<u8> {
1950 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1951 data.append(&mut self.consensus_encode_to_vec());
1952 data
1953 }
1954}
1955
1956impl<T> DatabaseKey for T
1957where
1958 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1961{
1962 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1963 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1964 if data.is_empty() {
1965 return Err(DecodingError::wrong_length(1, 0));
1967 }
1968
1969 if data[0] != Self::DB_PREFIX {
1970 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1971 }
1972
1973 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1974 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1975 }
1976}
1977
1978impl<T> DatabaseValue for T
1979where
1980 T: Debug + Encodable + Decodable,
1981{
1982 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1983 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1984 }
1985
1986 fn to_bytes(&self) -> Vec<u8> {
1987 self.consensus_encode_to_vec()
1988 }
1989}
1990
1991#[macro_export]
2052macro_rules! impl_db_record {
2053 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2054 impl $crate::db::DatabaseRecord for $key {
2055 const DB_PREFIX: u8 = $db_prefix as u8;
2056 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2057 type Key = Self;
2058 type Value = $val;
2059 }
2060 $(
2061 impl_db_record! {
2062 @impl_notify_marker key = $key, notify_on_modify = $notify
2063 }
2064 )?
2065 };
2066 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2068 impl $crate::db::DatabaseKeyWithNotify for $key {}
2069 };
2070 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2072}
2073
2074#[macro_export]
2075macro_rules! impl_db_lookup{
2076 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2077 $(
2078 impl $crate::db::DatabaseLookup for $query_prefix {
2079 type Record = $key;
2080 }
2081 )*
2082 };
2083}
2084
2085#[derive(Debug, Encodable, Decodable, Serialize)]
2087pub struct DatabaseVersionKeyV0;
2088
2089#[derive(Debug, Encodable, Decodable, Serialize)]
2090pub struct DatabaseVersionKey(pub ModuleInstanceId);
2091
2092#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2093pub struct DatabaseVersion(pub u64);
2094
2095impl_db_record!(
2096 key = DatabaseVersionKeyV0,
2097 value = DatabaseVersion,
2098 db_prefix = DbKeyPrefix::DatabaseVersion
2099);
2100
2101impl_db_record!(
2102 key = DatabaseVersionKey,
2103 value = DatabaseVersion,
2104 db_prefix = DbKeyPrefix::DatabaseVersion
2105);
2106
2107impl std::fmt::Display for DatabaseVersion {
2108 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2109 write!(f, "{}", self.0)
2110 }
2111}
2112
2113impl DatabaseVersion {
2114 pub fn increment(&self) -> Self {
2115 Self(self.0 + 1)
2116 }
2117}
2118
2119impl std::fmt::Display for DbKeyPrefix {
2120 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2121 write!(f, "{self:?}")
2122 }
2123}
2124
2125#[repr(u8)]
2126#[derive(Clone, EnumIter, Debug)]
2127pub enum DbKeyPrefix {
2128 DatabaseVersion = 0x50,
2129 ClientBackup = 0x51,
2130}
2131
2132#[derive(Debug, Error)]
2133pub enum DecodingError {
2134 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2135 WrongPrefix { expected: u8, found: u8 },
2136 #[error("Key had a wrong length, expected {expected} but got {found}")]
2137 WrongLength { expected: usize, found: usize },
2138 #[error("Other decoding error: {0:#}")]
2139 Other(anyhow::Error),
2140}
2141
2142impl DecodingError {
2143 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2144 Self::Other(anyhow::Error::from(error))
2145 }
2146
2147 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2148 Self::WrongPrefix { expected, found }
2149 }
2150
2151 pub fn wrong_length(expected: usize, found: usize) -> Self {
2152 Self::WrongLength { expected, found }
2153 }
2154}
2155
2156#[macro_export]
2157macro_rules! push_db_pair_items {
2158 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2159 let db_items =
2160 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2161 .await
2162 .map(|(key, val)| {
2163 (
2164 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2165 val,
2166 )
2167 })
2168 .collect::<BTreeMap<String, $value_type>>()
2169 .await;
2170
2171 $map.insert($key_literal.to_string(), Box::new(db_items));
2172 };
2173}
2174
2175#[macro_export]
2176macro_rules! push_db_key_items {
2177 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2178 let db_items =
2179 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2180 .await
2181 .map(|(key, _)| key)
2182 .collect::<Vec<$key_type>>()
2183 .await;
2184
2185 $map.insert($key_literal.to_string(), Box::new(db_items));
2186 };
2187}
2188
2189pub struct DbMigrationFnContext<'tx, C> {
2203 dbtx: DatabaseTransaction<'tx>,
2204 module_instance_id: Option<ModuleInstanceId>,
2205 ctx: C,
2206 __please_use_constructor: (),
2207}
2208
2209impl<'tx, C> DbMigrationFnContext<'tx, C> {
2210 pub fn new(
2211 dbtx: DatabaseTransaction<'tx>,
2212 module_instance_id: Option<ModuleInstanceId>,
2213 ctx: C,
2214 ) -> Self {
2215 dbtx.ensure_global().expect("Must pass global dbtx");
2216 Self {
2217 dbtx,
2218 module_instance_id,
2219 ctx,
2220 __please_use_constructor: (),
2222 }
2223 }
2224
2225 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2226 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2227 }
2228
2229 #[doc(hidden)]
2231 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2232 let Self { dbtx, ctx, .. } = self;
2233
2234 (dbtx, ctx)
2235 }
2236
2237 pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2238 if let Some(module_instance_id) = self.module_instance_id {
2239 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2240 } else {
2241 self.dbtx.to_ref_nc()
2242 }
2243 }
2244
2245 #[doc(hidden)]
2247 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2248 self.module_instance_id
2249 }
2250}
2251
2252pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2254pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2255
2256pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2261pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2262
2263pub type DbMigrationFn<C> = Box<
2274 maybe_add_send_sync!(
2275 dyn for<'tx> Fn(
2276 DbMigrationFnContext<'tx, C>,
2277 ) -> Pin<
2278 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2279 >
2280 ),
2281>;
2282
2283pub fn get_current_database_version<F>(
2287 migrations: &BTreeMap<DatabaseVersion, F>,
2288) -> DatabaseVersion {
2289 let versions = migrations.keys().copied().collect::<Vec<_>>();
2290
2291 if !versions
2294 .windows(2)
2295 .all(|window| window[0].increment() == window[1])
2296 {
2297 panic!("Database Migrations are not defined contiguously");
2298 }
2299
2300 versions
2301 .last()
2302 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2303}
2304
2305pub async fn apply_migrations<C>(
2306 db: &Database,
2307 ctx: C,
2308 kind: String,
2309 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2310 module_instance_id: Option<ModuleInstanceId>,
2311 external_prefixes_above: Option<u8>,
2314) -> Result<(), anyhow::Error>
2315where
2316 C: Clone,
2317{
2318 let mut dbtx = db.begin_transaction().await;
2319 apply_migrations_dbtx(
2320 &mut dbtx.to_ref_nc(),
2321 ctx,
2322 kind,
2323 migrations,
2324 module_instance_id,
2325 external_prefixes_above,
2326 )
2327 .await?;
2328
2329 dbtx.commit_tx_result().await
2330}
2331pub async fn apply_migrations_dbtx<C>(
2343 global_dbtx: &mut DatabaseTransaction<'_>,
2344 ctx: C,
2345 kind: String,
2346 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2347 module_instance_id: Option<ModuleInstanceId>,
2348 external_prefixes_above: Option<u8>,
2351) -> Result<(), anyhow::Error>
2352where
2353 C: Clone,
2354{
2355 let is_new_db = global_dbtx
2358 .raw_find_by_prefix(&[])
2359 .await?
2360 .filter(|(key, _v)| {
2361 std::future::ready(
2362 external_prefixes_above.is_none_or(|external_prefixes_above| {
2363 !key.is_empty() && key[0] < external_prefixes_above
2364 }),
2365 )
2366 })
2367 .next()
2368 .await
2369 .is_none();
2370
2371 let target_db_version = get_current_database_version(&migrations);
2372
2373 create_database_version_dbtx(
2375 global_dbtx,
2376 target_db_version,
2377 module_instance_id,
2378 kind.clone(),
2379 is_new_db,
2380 )
2381 .await?;
2382
2383 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2384
2385 let disk_version = global_dbtx
2386 .get_value(&DatabaseVersionKey(module_instance_id_key))
2387 .await;
2388
2389 let db_version = if let Some(disk_version) = disk_version {
2390 let mut current_db_version = disk_version;
2391
2392 if current_db_version > target_db_version {
2393 return Err(anyhow::anyhow!(format!(
2394 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2395 )));
2396 }
2397
2398 while current_db_version < target_db_version {
2399 if let Some(migration) = migrations.get(¤t_db_version) {
2400 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2401 migration(DbMigrationFnContext::new(
2402 global_dbtx.to_ref_nc(),
2403 module_instance_id,
2404 ctx.clone(),
2405 ))
2406 .await?;
2407 } else {
2408 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2409 }
2410
2411 current_db_version = current_db_version.increment();
2412
2413 global_dbtx
2414 .insert_entry(
2415 &DatabaseVersionKey(module_instance_id_key),
2416 ¤t_db_version,
2417 )
2418 .await;
2419 }
2420
2421 current_db_version
2422 } else {
2423 target_db_version
2424 };
2425
2426 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2427 Ok(())
2428}
2429
2430pub async fn create_database_version(
2431 db: &Database,
2432 target_db_version: DatabaseVersion,
2433 module_instance_id: Option<ModuleInstanceId>,
2434 kind: String,
2435 is_new_db: bool,
2436) -> Result<(), anyhow::Error> {
2437 let mut dbtx = db.begin_transaction().await;
2438
2439 create_database_version_dbtx(
2440 &mut dbtx.to_ref_nc(),
2441 target_db_version,
2442 module_instance_id,
2443 kind,
2444 is_new_db,
2445 )
2446 .await?;
2447
2448 dbtx.commit_tx_result().await?;
2449 Ok(())
2450}
2451
2452pub async fn create_database_version_dbtx(
2456 global_dbtx: &mut DatabaseTransaction<'_>,
2457 target_db_version: DatabaseVersion,
2458 module_instance_id: Option<ModuleInstanceId>,
2459 kind: String,
2460 is_new_db: bool,
2461) -> Result<(), anyhow::Error> {
2462 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2463
2464 if global_dbtx
2468 .get_value(&DatabaseVersionKey(key_module_instance_id))
2469 .await
2470 .is_none()
2471 {
2472 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2481 remove_current_db_version_if_exists(
2482 &mut global_dbtx
2483 .to_ref_with_prefix_module_id(module_instance_id)
2484 .0
2485 .into_nc(),
2486 is_new_db,
2487 target_db_version,
2488 )
2489 .await
2490 } else {
2491 remove_current_db_version_if_exists(
2492 &mut global_dbtx.to_ref().into_nc(),
2493 is_new_db,
2494 target_db_version,
2495 )
2496 .await
2497 };
2498
2499 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2501 global_dbtx
2502 .insert_new_entry(
2503 &DatabaseVersionKey(key_module_instance_id),
2504 ¤t_version_in_module,
2505 )
2506 .await;
2507 }
2508
2509 Ok(())
2510}
2511
2512async fn remove_current_db_version_if_exists(
2517 version_dbtx: &mut DatabaseTransaction<'_>,
2518 is_new_db: bool,
2519 target_db_version: DatabaseVersion,
2520) -> DatabaseVersion {
2521 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2525 match current_version_in_module {
2526 Some(database_version) => database_version,
2527 None if is_new_db => target_db_version,
2528 None => DatabaseVersion(0),
2529 }
2530}
2531
2532fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2535 module_instance_id.map_or_else(
2537 || MODULE_GLOBAL_PREFIX.into(),
2538 |module_instance_id| module_instance_id,
2539 )
2540}
2541#[allow(unused_imports)]
2542mod test_utils {
2543 use std::collections::BTreeMap;
2544 use std::time::Duration;
2545
2546 use fedimint_core::db::DbMigrationFnContext;
2547 use futures::future::ready;
2548 use futures::{Future, FutureExt, StreamExt};
2549 use rand::Rng;
2550 use tokio::join;
2551
2552 use super::{
2553 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2554 DbMigrationFn, apply_migrations,
2555 };
2556 use crate::core::ModuleKind;
2557 use crate::db::mem_impl::MemDatabase;
2558 use crate::db::{
2559 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2560 };
2561 use crate::encoding::{Decodable, Encodable};
2562 use crate::module::registry::ModuleDecoderRegistry;
2563
2564 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2565 crate::runtime::timeout(Duration::from_millis(10), fut)
2566 .await
2567 .ok()
2568 }
2569
2570 #[repr(u8)]
2571 #[derive(Clone)]
2572 pub enum TestDbKeyPrefix {
2573 Test = 0x42,
2574 AltTest = 0x43,
2575 PercentTestKey = 0x25,
2576 }
2577
2578 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2579 pub(super) struct TestKey(pub u64);
2580
2581 #[derive(Debug, Encodable, Decodable)]
2582 struct DbPrefixTestPrefix;
2583
2584 impl_db_record!(
2585 key = TestKey,
2586 value = TestVal,
2587 db_prefix = TestDbKeyPrefix::Test,
2588 notify_on_modify = true,
2589 );
2590 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2591
2592 #[derive(Debug, Encodable, Decodable)]
2593 struct TestKeyV0(u64, u64);
2594
2595 #[derive(Debug, Encodable, Decodable)]
2596 struct DbPrefixTestPrefixV0;
2597
2598 impl_db_record!(
2599 key = TestKeyV0,
2600 value = TestVal,
2601 db_prefix = TestDbKeyPrefix::Test,
2602 );
2603 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2604
2605 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2606 struct AltTestKey(u64);
2607
2608 #[derive(Debug, Encodable, Decodable)]
2609 struct AltDbPrefixTestPrefix;
2610
2611 impl_db_record!(
2612 key = AltTestKey,
2613 value = TestVal,
2614 db_prefix = TestDbKeyPrefix::AltTest,
2615 );
2616 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2617
2618 #[derive(Debug, Encodable, Decodable)]
2619 struct PercentTestKey(u64);
2620
2621 #[derive(Debug, Encodable, Decodable)]
2622 struct PercentPrefixTestPrefix;
2623
2624 impl_db_record!(
2625 key = PercentTestKey,
2626 value = TestVal,
2627 db_prefix = TestDbKeyPrefix::PercentTestKey,
2628 );
2629
2630 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2631 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2632 pub(super) struct TestVal(pub u64);
2633
2634 const TEST_MODULE_PREFIX: u16 = 1;
2635 const ALT_MODULE_PREFIX: u16 = 2;
2636
2637 pub async fn verify_insert_elements(db: Database) {
2638 let mut dbtx = db.begin_transaction().await;
2639 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2640 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2641 dbtx.commit_tx().await;
2642
2643 let mut dbtx = db.begin_transaction().await;
2645 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2646 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2647 dbtx.commit_tx().await;
2648
2649 let mut dbtx = db.begin_transaction().await;
2651 assert_eq!(
2652 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2653 Some(TestVal(2))
2654 );
2655 assert_eq!(
2656 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2657 Some(TestVal(3))
2658 );
2659 dbtx.commit_tx().await;
2660
2661 let mut dbtx = db.begin_transaction().await;
2662 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2663 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2664 dbtx.commit_tx().await;
2665 }
2666
2667 pub async fn verify_remove_nonexisting(db: Database) {
2668 let mut dbtx = db.begin_transaction().await;
2669 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2670 let removed = dbtx.remove_entry(&TestKey(1)).await;
2671 assert!(removed.is_none());
2672
2673 dbtx.commit_tx().await;
2675 }
2676
2677 pub async fn verify_remove_existing(db: Database) {
2678 let mut dbtx = db.begin_transaction().await;
2679
2680 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2681
2682 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2683
2684 let removed = dbtx.remove_entry(&TestKey(1)).await;
2685 assert_eq!(removed, Some(TestVal(2)));
2686 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2687
2688 dbtx.commit_tx().await;
2690 }
2691
2692 pub async fn verify_read_own_writes(db: Database) {
2693 let mut dbtx = db.begin_transaction().await;
2694
2695 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2696
2697 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2698
2699 dbtx.commit_tx().await;
2701 }
2702
2703 pub async fn verify_prevent_dirty_reads(db: Database) {
2704 let mut dbtx = db.begin_transaction().await;
2705
2706 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2707
2708 let mut dbtx2 = db.begin_transaction().await;
2710 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2711
2712 dbtx.commit_tx().await;
2714 }
2715
2716 pub async fn verify_find_by_range(db: Database) {
2717 let mut dbtx = db.begin_transaction().await;
2718 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2719 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2720 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2721
2722 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2723 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2724
2725 {
2726 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2727 module_dbtx
2728 .insert_entry(&TestKey(300), &TestVal(3000))
2729 .await;
2730 }
2731
2732 dbtx.commit_tx().await;
2733
2734 let mut dbtx = db.begin_transaction_nc().await;
2736
2737 let returned_keys = dbtx
2738 .find_by_range(TestKey(55)..TestKey(56))
2739 .await
2740 .collect::<Vec<_>>()
2741 .await;
2742
2743 let expected = vec![(TestKey(55), TestVal(9999))];
2744
2745 assert_eq!(returned_keys, expected);
2746
2747 let returned_keys = dbtx
2748 .find_by_range(TestKey(54)..TestKey(56))
2749 .await
2750 .collect::<Vec<_>>()
2751 .await;
2752
2753 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2754 assert_eq!(returned_keys, expected);
2755
2756 let returned_keys = dbtx
2757 .find_by_range(TestKey(54)..TestKey(57))
2758 .await
2759 .collect::<Vec<_>>()
2760 .await;
2761
2762 let expected = vec![
2763 (TestKey(54), TestVal(8888)),
2764 (TestKey(55), TestVal(9999)),
2765 (TestKey(56), TestVal(7777)),
2766 ];
2767 assert_eq!(returned_keys, expected);
2768
2769 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2770 let test_range = module_dbtx
2771 .find_by_range(TestKey(300)..TestKey(301))
2772 .await
2773 .collect::<Vec<_>>()
2774 .await;
2775 assert!(test_range.len() == 1);
2776 }
2777
2778 pub async fn verify_find_by_prefix(db: Database) {
2779 let mut dbtx = db.begin_transaction().await;
2780 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2781 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2782
2783 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2784 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2785 dbtx.commit_tx().await;
2786
2787 let mut dbtx = db.begin_transaction().await;
2789
2790 let returned_keys = dbtx
2791 .find_by_prefix(&DbPrefixTestPrefix)
2792 .await
2793 .collect::<Vec<_>>()
2794 .await;
2795
2796 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2797 assert_eq!(returned_keys, expected);
2798
2799 let reversed = dbtx
2800 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2801 .await
2802 .collect::<Vec<_>>()
2803 .await;
2804 let mut reversed_expected = expected;
2805 reversed_expected.reverse();
2806 assert_eq!(reversed, reversed_expected);
2807
2808 let returned_keys = dbtx
2809 .find_by_prefix(&AltDbPrefixTestPrefix)
2810 .await
2811 .collect::<Vec<_>>()
2812 .await;
2813
2814 let expected = vec![
2815 (AltTestKey(54), TestVal(6666)),
2816 (AltTestKey(55), TestVal(7777)),
2817 ];
2818 assert_eq!(returned_keys, expected);
2819
2820 let reversed = dbtx
2821 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2822 .await
2823 .collect::<Vec<_>>()
2824 .await;
2825 let mut reversed_expected = expected;
2826 reversed_expected.reverse();
2827 assert_eq!(reversed, reversed_expected);
2828 }
2829
2830 pub async fn verify_commit(db: Database) {
2831 let mut dbtx = db.begin_transaction().await;
2832
2833 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2834 dbtx.commit_tx().await;
2835
2836 let mut dbtx2 = db.begin_transaction().await;
2838 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2839 }
2840
2841 pub async fn verify_rollback_to_savepoint(db: Database) {
2842 let mut dbtx_rollback = db.begin_transaction().await;
2843
2844 dbtx_rollback
2845 .insert_entry(&TestKey(20), &TestVal(2000))
2846 .await;
2847
2848 dbtx_rollback
2849 .set_tx_savepoint()
2850 .await
2851 .expect("Error setting transaction savepoint");
2852
2853 dbtx_rollback
2854 .insert_entry(&TestKey(21), &TestVal(2001))
2855 .await;
2856
2857 assert_eq!(
2858 dbtx_rollback.get_value(&TestKey(20)).await,
2859 Some(TestVal(2000))
2860 );
2861 assert_eq!(
2862 dbtx_rollback.get_value(&TestKey(21)).await,
2863 Some(TestVal(2001))
2864 );
2865
2866 dbtx_rollback
2867 .rollback_tx_to_savepoint()
2868 .await
2869 .expect("Error setting transaction savepoint");
2870
2871 assert_eq!(
2872 dbtx_rollback.get_value(&TestKey(20)).await,
2873 Some(TestVal(2000))
2874 );
2875
2876 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2877
2878 dbtx_rollback.commit_tx().await;
2880 }
2881
2882 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2883 let mut dbtx = db.begin_transaction().await;
2884 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2885
2886 let mut dbtx2 = db.begin_transaction().await;
2887
2888 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2889
2890 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2891
2892 dbtx2.commit_tx().await;
2893
2894 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2897
2898 let expected_keys = 0;
2899 let returned_keys = dbtx
2900 .find_by_prefix(&DbPrefixTestPrefix)
2901 .await
2902 .fold(0, |returned_keys, (key, value)| async move {
2903 if key == TestKey(100) {
2904 assert!(value.eq(&TestVal(101)));
2905 }
2906 returned_keys + 1
2907 })
2908 .await;
2909
2910 assert_eq!(returned_keys, expected_keys);
2911 }
2912
2913 pub async fn verify_snapshot_isolation(db: Database) {
2914 async fn random_yield() {
2915 let times = if rand::thread_rng().gen_bool(0.5) {
2916 0
2917 } else {
2918 10
2919 };
2920 for _ in 0..times {
2921 tokio::task::yield_now().await;
2922 }
2923 }
2924
2925 for i in 0..1000 {
2927 let base_key = i * 2;
2928 let tx_accepted_key = base_key;
2929 let spent_input_key = base_key + 1;
2930
2931 join!(
2932 async {
2933 random_yield().await;
2934 let mut dbtx = db.begin_transaction().await;
2935
2936 random_yield().await;
2937 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2938 random_yield().await;
2939 let s = match i % 5 {
2942 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2943 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2944 2 => {
2945 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2946 .await
2947 }
2948 3 => {
2949 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2950 .await
2951 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2952 .map(|(_k, v)| v)
2953 .next()
2954 .await
2955 }
2956 4 => {
2957 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2958 .await
2959 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2960 .map(|(_k, v)| v)
2961 .next()
2962 .await
2963 }
2964 _ => {
2965 panic!("woot?");
2966 }
2967 };
2968
2969 match (a, s) {
2970 (None, None) | (Some(_), Some(_)) => {}
2971 (None, Some(_)) => panic!("none some?! {i}"),
2972 (Some(_), None) => panic!("some none?! {i}"),
2973 }
2974 },
2975 async {
2976 random_yield().await;
2977
2978 let mut dbtx = db.begin_transaction().await;
2979 random_yield().await;
2980 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2981
2982 random_yield().await;
2983 assert_eq!(
2984 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2985 .await,
2986 None
2987 );
2988
2989 random_yield().await;
2990 assert_eq!(
2991 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2992 .await,
2993 None
2994 );
2995 random_yield().await;
2996 dbtx.commit_tx().await;
2997 }
2998 );
2999 }
3000 }
3001
3002 pub async fn verify_phantom_entry(db: Database) {
3003 let mut dbtx = db.begin_transaction().await;
3004
3005 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3006
3007 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3008
3009 dbtx.commit_tx().await;
3010
3011 let mut dbtx = db.begin_transaction().await;
3012 let expected_keys = 2;
3013 let returned_keys = dbtx
3014 .find_by_prefix(&DbPrefixTestPrefix)
3015 .await
3016 .fold(0, |returned_keys, (key, value)| async move {
3017 match key {
3018 TestKey(100) => {
3019 assert!(value.eq(&TestVal(101)));
3020 }
3021 TestKey(101) => {
3022 assert!(value.eq(&TestVal(102)));
3023 }
3024 _ => {}
3025 }
3026 returned_keys + 1
3027 })
3028 .await;
3029
3030 assert_eq!(returned_keys, expected_keys);
3031
3032 let mut dbtx2 = db.begin_transaction().await;
3033
3034 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3035
3036 dbtx2.commit_tx().await;
3037
3038 let returned_keys = dbtx
3039 .find_by_prefix(&DbPrefixTestPrefix)
3040 .await
3041 .fold(0, |returned_keys, (key, value)| async move {
3042 match key {
3043 TestKey(100) => {
3044 assert!(value.eq(&TestVal(101)));
3045 }
3046 TestKey(101) => {
3047 assert!(value.eq(&TestVal(102)));
3048 }
3049 _ => {}
3050 }
3051 returned_keys + 1
3052 })
3053 .await;
3054
3055 assert_eq!(returned_keys, expected_keys);
3056 }
3057
3058 pub async fn expect_write_conflict(db: Database) {
3059 let mut dbtx = db.begin_transaction().await;
3060 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3061 dbtx.commit_tx().await;
3062
3063 let mut dbtx2 = db.begin_transaction().await;
3064 let mut dbtx3 = db.begin_transaction().await;
3065
3066 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3067
3068 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3072
3073 dbtx2.commit_tx().await;
3074 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3075 }
3076
3077 pub async fn verify_string_prefix(db: Database) {
3078 let mut dbtx = db.begin_transaction().await;
3079 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3080
3081 assert_eq!(
3082 dbtx.get_value(&PercentTestKey(100)).await,
3083 Some(TestVal(101))
3084 );
3085
3086 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3087
3088 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3089
3090 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3091
3092 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3095
3096 let expected_keys = 4;
3097 let returned_keys = dbtx
3098 .find_by_prefix(&PercentPrefixTestPrefix)
3099 .await
3100 .fold(0, |returned_keys, (key, value)| async move {
3101 if matches!(key, PercentTestKey(101)) {
3102 assert!(value.eq(&TestVal(100)));
3103 }
3104 returned_keys + 1
3105 })
3106 .await;
3107
3108 assert_eq!(returned_keys, expected_keys);
3109 }
3110
3111 pub async fn verify_remove_by_prefix(db: Database) {
3112 let mut dbtx = db.begin_transaction().await;
3113
3114 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3115
3116 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3117
3118 dbtx.commit_tx().await;
3119
3120 let mut remove_dbtx = db.begin_transaction().await;
3121 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3122 remove_dbtx.commit_tx().await;
3123
3124 let mut dbtx = db.begin_transaction().await;
3125 let expected_keys = 0;
3126 let returned_keys = dbtx
3127 .find_by_prefix(&DbPrefixTestPrefix)
3128 .await
3129 .fold(0, |returned_keys, (key, value)| async move {
3130 match key {
3131 TestKey(100) => {
3132 assert!(value.eq(&TestVal(101)));
3133 }
3134 TestKey(101) => {
3135 assert!(value.eq(&TestVal(102)));
3136 }
3137 _ => {}
3138 }
3139 returned_keys + 1
3140 })
3141 .await;
3142
3143 assert_eq!(returned_keys, expected_keys);
3144 }
3145
3146 pub async fn verify_module_db(db: Database, module_db: Database) {
3147 let mut dbtx = db.begin_transaction().await;
3148
3149 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3150
3151 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3152
3153 dbtx.commit_tx().await;
3154
3155 let mut module_dbtx = module_db.begin_transaction().await;
3157 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3158
3159 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3160
3161 let mut dbtx = db.begin_transaction().await;
3163 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3164
3165 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3166
3167 let mut module_dbtx = module_db.begin_transaction().await;
3168
3169 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3170
3171 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3172
3173 module_dbtx.commit_tx().await;
3174
3175 let expected_keys = 2;
3176 let mut dbtx = db.begin_transaction().await;
3177 let returned_keys = dbtx
3178 .find_by_prefix(&DbPrefixTestPrefix)
3179 .await
3180 .fold(0, |returned_keys, (key, value)| async move {
3181 match key {
3182 TestKey(100) => {
3183 assert!(value.eq(&TestVal(101)));
3184 }
3185 TestKey(101) => {
3186 assert!(value.eq(&TestVal(102)));
3187 }
3188 _ => {}
3189 }
3190 returned_keys + 1
3191 })
3192 .await;
3193
3194 assert_eq!(returned_keys, expected_keys);
3195
3196 let removed = dbtx.remove_entry(&TestKey(100)).await;
3197 assert_eq!(removed, Some(TestVal(101)));
3198 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3199
3200 let mut module_dbtx = module_db.begin_transaction().await;
3201 assert_eq!(
3202 module_dbtx.get_value(&TestKey(100)).await,
3203 Some(TestVal(103))
3204 );
3205 }
3206
3207 pub async fn verify_module_prefix(db: Database) {
3208 let mut test_dbtx = db.begin_transaction().await;
3209 {
3210 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3211
3212 test_module_dbtx
3213 .insert_entry(&TestKey(100), &TestVal(101))
3214 .await;
3215
3216 test_module_dbtx
3217 .insert_entry(&TestKey(101), &TestVal(102))
3218 .await;
3219 }
3220
3221 test_dbtx.commit_tx().await;
3222
3223 let mut alt_dbtx = db.begin_transaction().await;
3224 {
3225 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3226
3227 alt_module_dbtx
3228 .insert_entry(&TestKey(100), &TestVal(103))
3229 .await;
3230
3231 alt_module_dbtx
3232 .insert_entry(&TestKey(101), &TestVal(104))
3233 .await;
3234 }
3235
3236 alt_dbtx.commit_tx().await;
3237
3238 let mut test_dbtx = db.begin_transaction().await;
3240 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3241 assert_eq!(
3242 test_module_dbtx.get_value(&TestKey(100)).await,
3243 Some(TestVal(101))
3244 );
3245
3246 assert_eq!(
3247 test_module_dbtx.get_value(&TestKey(101)).await,
3248 Some(TestVal(102))
3249 );
3250
3251 let expected_keys = 2;
3252 let returned_keys = test_module_dbtx
3253 .find_by_prefix(&DbPrefixTestPrefix)
3254 .await
3255 .fold(0, |returned_keys, (key, value)| async move {
3256 match key {
3257 TestKey(100) => {
3258 assert!(value.eq(&TestVal(101)));
3259 }
3260 TestKey(101) => {
3261 assert!(value.eq(&TestVal(102)));
3262 }
3263 _ => {}
3264 }
3265 returned_keys + 1
3266 })
3267 .await;
3268
3269 assert_eq!(returned_keys, expected_keys);
3270
3271 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3272 assert_eq!(removed, Some(TestVal(101)));
3273 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3274
3275 let mut test_dbtx = db.begin_transaction().await;
3278 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3279
3280 test_dbtx.commit_tx().await;
3281 }
3282
3283 #[cfg(test)]
3284 #[tokio::test]
3285 pub async fn verify_test_migration() {
3286 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3288 let expected_test_keys_size: usize = 100;
3289 let mut dbtx = db.begin_transaction().await;
3290 for i in 0..expected_test_keys_size {
3291 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3292 .await;
3293 }
3294
3295 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3297 .await;
3298 dbtx.commit_tx().await;
3299
3300 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3301
3302 migrations.insert(
3303 DatabaseVersion(0),
3304 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3305 );
3306
3307 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3308 .await
3309 .expect("Error applying migrations for TestModule");
3310
3311 let mut dbtx = db.begin_transaction().await;
3313
3314 assert!(
3317 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3318 .await
3319 .is_some()
3320 );
3321
3322 let test_keys = dbtx
3324 .find_by_prefix(&DbPrefixTestPrefix)
3325 .await
3326 .collect::<Vec<_>>()
3327 .await;
3328 let test_keys_size = test_keys.len();
3329 assert_eq!(test_keys_size, expected_test_keys_size);
3330 for (key, val) in test_keys {
3331 assert_eq!(key.0, val.0 + 1);
3332 }
3333 }
3334
3335 #[allow(dead_code)]
3336 async fn migrate_test_db_version_0(
3337 mut ctx: DbMigrationFnContext<'_, ()>,
3338 ) -> Result<(), anyhow::Error> {
3339 let mut dbtx = ctx.dbtx();
3340 let example_keys_v0 = dbtx
3341 .find_by_prefix(&DbPrefixTestPrefixV0)
3342 .await
3343 .collect::<Vec<_>>()
3344 .await;
3345 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3346 for (key, val) in example_keys_v0 {
3347 let key_v2 = TestKey(key.1);
3348 dbtx.insert_new_entry(&key_v2, &val).await;
3349 }
3350 Ok(())
3351 }
3352
3353 #[cfg(test)]
3354 #[tokio::test]
3355 async fn test_autocommit() {
3356 use std::marker::PhantomData;
3357 use std::ops::Range;
3358 use std::path::Path;
3359
3360 use anyhow::anyhow;
3361 use async_trait::async_trait;
3362
3363 use crate::ModuleDecoderRegistry;
3364 use crate::db::{
3365 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3366 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3367 IRawDatabaseTransaction,
3368 };
3369
3370 #[derive(Debug)]
3371 struct FakeDatabase;
3372
3373 #[async_trait]
3374 impl IRawDatabase for FakeDatabase {
3375 type Transaction<'a> = FakeTransaction<'a>;
3376 async fn begin_transaction(&self) -> FakeTransaction {
3377 FakeTransaction(PhantomData)
3378 }
3379
3380 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3381 Ok(())
3382 }
3383 }
3384
3385 #[derive(Debug)]
3386 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3387
3388 #[async_trait]
3389 impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3390 async fn raw_insert_bytes(
3391 &mut self,
3392 _key: &[u8],
3393 _value: &[u8],
3394 ) -> anyhow::Result<Option<Vec<u8>>> {
3395 unimplemented!()
3396 }
3397
3398 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3399 unimplemented!()
3400 }
3401
3402 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3403 unimplemented!()
3404 }
3405
3406 async fn raw_find_by_range(
3407 &mut self,
3408 _key_range: Range<&[u8]>,
3409 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3410 unimplemented!()
3411 }
3412
3413 async fn raw_find_by_prefix(
3414 &mut self,
3415 _key_prefix: &[u8],
3416 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3417 unimplemented!()
3418 }
3419
3420 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3421 unimplemented!()
3422 }
3423
3424 async fn raw_find_by_prefix_sorted_descending(
3425 &mut self,
3426 _key_prefix: &[u8],
3427 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3428 unimplemented!()
3429 }
3430 }
3431
3432 #[async_trait]
3433 impl IDatabaseTransactionOps for FakeTransaction<'_> {
3434 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3435 unimplemented!()
3436 }
3437
3438 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3439 unimplemented!()
3440 }
3441 }
3442
3443 #[async_trait]
3444 impl IRawDatabaseTransaction for FakeTransaction<'_> {
3445 async fn commit_tx(self) -> anyhow::Result<()> {
3446 Err(anyhow!("Can't commit!"))
3447 }
3448 }
3449
3450 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3451 let err = db
3452 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3453 .await
3454 .unwrap_err();
3455
3456 match err {
3457 AutocommitError::CommitFailed {
3458 attempts: failed_attempts,
3459 ..
3460 } => {
3461 assert_eq!(failed_attempts, 5);
3462 }
3463 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3464 }
3465 }
3466}
3467
3468pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3469 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3470 decoders: ModuleDecoderRegistry,
3471 key_prefix: &KP,
3472) -> impl Stream<
3473 Item = (
3474 KP::Record,
3475 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3476 ),
3477>
3478+ 'r
3479+ use<'r, KP>
3480where
3481 'inner: 'r,
3482 KP: DatabaseLookup,
3483 KP::Record: DatabaseKey,
3484{
3485 debug!(target: LOG_DB, "find by prefix sorted descending");
3486 let prefix_bytes = key_prefix.to_bytes();
3487 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3488 .await
3489 .expect("Error doing prefix search in database")
3490 .map(move |(key_bytes, value_bytes)| {
3491 let key = decode_key_expect(&key_bytes, &decoders);
3492 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3493 (key, value)
3494 })
3495}
3496
3497pub async fn verify_module_db_integrity_dbtx(
3498 dbtx: &mut DatabaseTransaction<'_>,
3499 module_id: ModuleInstanceId,
3500 module_kind: ModuleKind,
3501 prefixes: &BTreeSet<u8>,
3502) {
3503 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3504 if module_id < 250 {
3505 assert_eq!(module_db_prefix.len(), 2);
3506 }
3507 let mut records = dbtx
3508 .raw_find_by_prefix(&module_db_prefix)
3509 .await
3510 .expect("DB fail");
3511 while let Some((k, v)) = records.next().await {
3512 assert!(
3513 prefixes.contains(&k[module_db_prefix.len()]),
3514 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3515 k.as_hex(),
3516 v.as_hex()
3517 );
3518 }
3519}
3520
3521#[cfg(test)]
3522mod tests;