1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146 fn to_bytes(&self) -> Vec<u8>;
147}
148
149pub trait DatabaseRecord: DatabaseKeyPrefix {
152 const DB_PREFIX: u8;
153 const NOTIFY_ON_MODIFY: bool = false;
154 type Key: DatabaseKey + Debug;
155 type Value: DatabaseValue + Debug;
156}
157
158pub trait DatabaseLookup: DatabaseKeyPrefix {
161 type Record: DatabaseRecord;
162}
163
164impl<Record> DatabaseLookup for Record
166where
167 Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169 type Record = Record;
170}
171
172pub trait DatabaseKey: Sized {
175 const NOTIFY_ON_MODIFY: bool = false;
183 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186pub trait DatabaseKeyWithNotify {}
188
189pub trait DatabaseValue: Sized + Debug {
191 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192 fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205 #[error("Commit Failed: {last_error}")]
207 CommitFailed {
208 attempts: usize,
210 last_error: anyhow::Error,
212 },
213 #[error("Closure error: {error}")]
216 ClosureError {
217 attempts: usize,
223 error: E,
225 },
226}
227
228#[apply(async_trait_maybe_send!)]
237pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
238 type Transaction<'a>: IRawDatabaseTransaction + Debug;
240
241 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
243
244 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
246}
247
248#[apply(async_trait_maybe_send!)]
249impl<T> IRawDatabase for Box<T>
250where
251 T: IRawDatabase,
252{
253 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
254
255 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
256 (**self).begin_transaction().await
257 }
258
259 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
260 (**self).checkpoint(backup_path)
261 }
262}
263
264pub trait IRawDatabaseExt: IRawDatabase + Sized {
266 fn into_database(self) -> Database {
270 Database::new(self, ModuleRegistry::default())
271 }
272}
273
274impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
275
276impl<T> From<T> for Database
277where
278 T: IRawDatabase,
279{
280 fn from(raw: T) -> Self {
281 Self::new(raw, ModuleRegistry::default())
282 }
283}
284
285#[apply(async_trait_maybe_send!)]
288pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
289 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
291 async fn register(&self, key: &[u8]);
293 async fn notify(&self, key: &[u8]);
295
296 fn is_global(&self) -> bool;
299
300 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
302}
303
304#[apply(async_trait_maybe_send!)]
305impl<T> IDatabase for Arc<T>
306where
307 T: IDatabase + ?Sized,
308{
309 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
310 (**self).begin_transaction().await
311 }
312 async fn register(&self, key: &[u8]) {
313 (**self).register(key).await;
314 }
315 async fn notify(&self, key: &[u8]) {
316 (**self).notify(key).await;
317 }
318
319 fn is_global(&self) -> bool {
320 (**self).is_global()
321 }
322
323 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
324 (**self).checkpoint(backup_path)
325 }
326}
327
328struct BaseDatabase<RawDatabase> {
332 notifications: Arc<Notifications>,
333 raw: RawDatabase,
334}
335
336impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 f.write_str("BaseDatabase")
339 }
340}
341
342#[apply(async_trait_maybe_send!)]
343impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
344 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
345 Box::new(BaseDatabaseTransaction::new(
346 self.raw.begin_transaction().await,
347 self.notifications.clone(),
348 ))
349 }
350 async fn register(&self, key: &[u8]) {
351 self.notifications.register(key).await;
352 }
353 async fn notify(&self, key: &[u8]) {
354 self.notifications.notify(key);
355 }
356
357 fn is_global(&self) -> bool {
358 true
359 }
360
361 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
362 self.raw.checkpoint(backup_path)
363 }
364}
365
366#[derive(Clone, Debug)]
372pub struct Database {
373 inner: Arc<dyn IDatabase + 'static>,
374 module_decoders: ModuleDecoderRegistry,
375}
376
377impl Database {
378 pub fn strong_count(&self) -> usize {
379 Arc::strong_count(&self.inner)
380 }
381
382 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
383 self.inner
384 }
385}
386
387impl Database {
388 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
393 let inner = BaseDatabase {
394 raw,
395 notifications: Arc::new(Notifications::new()),
396 };
397 Self::new_from_arc(
398 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
399 module_decoders,
400 )
401 }
402
403 pub fn new_from_arc(
405 inner: Arc<dyn IDatabase + 'static>,
406 module_decoders: ModuleDecoderRegistry,
407 ) -> Self {
408 Self {
409 inner,
410 module_decoders,
411 }
412 }
413
414 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
416 Self {
417 inner: Arc::new(PrefixDatabase {
418 inner: self.inner.clone(),
419 global_dbtx_access_token: None,
420 prefix,
421 }),
422 module_decoders: self.module_decoders.clone(),
423 }
424 }
425
426 pub fn with_prefix_module_id(
430 &self,
431 module_instance_id: ModuleInstanceId,
432 ) -> (Self, GlobalDBTxAccessToken) {
433 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
434 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
435 (
436 Self {
437 inner: Arc::new(PrefixDatabase {
438 inner: self.inner.clone(),
439 global_dbtx_access_token: Some(global_dbtx_access_token),
440 prefix,
441 }),
442 module_decoders: self.module_decoders.clone(),
443 },
444 global_dbtx_access_token,
445 )
446 }
447
448 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
449 Self {
450 inner: self.inner.clone(),
451 module_decoders,
452 }
453 }
454
455 pub fn is_global(&self) -> bool {
457 self.inner.is_global()
458 }
459
460 pub fn ensure_global(&self) -> Result<()> {
462 if !self.is_global() {
463 bail!("Database instance not global");
464 }
465
466 Ok(())
467 }
468
469 pub fn ensure_isolated(&self) -> Result<()> {
471 if self.is_global() {
472 bail!("Database instance not isolated");
473 }
474
475 Ok(())
476 }
477
478 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
480 where
481 's: 'tx,
482 {
483 DatabaseTransaction::<Committable>::new(
484 self.inner.begin_transaction().await,
485 self.module_decoders.clone(),
486 )
487 }
488
489 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
491 where
492 's: 'tx,
493 {
494 self.begin_transaction().await.into_nc()
495 }
496
497 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
498 self.inner.checkpoint(backup_path)
499 }
500
501 pub async fn autocommit<'s, 'dbtx, F, T, E>(
529 &'s self,
530 tx_fn: F,
531 max_attempts: Option<usize>,
532 ) -> Result<T, AutocommitError<E>>
533 where
534 's: 'dbtx,
535 for<'r, 'o> F: Fn(
536 &'r mut DatabaseTransaction<'o>,
537 PhantomBound<'dbtx, 'o>,
538 ) -> BoxFuture<'r, Result<T, E>>,
539 {
540 assert_ne!(max_attempts, Some(0));
541 let mut curr_attempts: usize = 0;
542
543 loop {
544 curr_attempts = curr_attempts
549 .checked_add(1)
550 .expect("db autocommit attempt counter overflowed");
551
552 let mut dbtx = self.begin_transaction().await;
553
554 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
555 let val = match tx_fn_res {
556 Ok(val) => val,
557 Err(err) => {
558 dbtx.ignore_uncommitted();
559 return Err(AutocommitError::ClosureError {
560 attempts: curr_attempts,
561 error: err,
562 });
563 }
564 };
565
566 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
567
568 match dbtx.commit_tx_result().await {
569 Ok(()) => {
570 return Ok(val);
571 }
572 Err(err) => {
573 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
574 warn!(
575 target: LOG_DB,
576 curr_attempts,
577 ?err,
578 "Database commit failed in an autocommit block - terminating"
579 );
580 return Err(AutocommitError::CommitFailed {
581 attempts: curr_attempts,
582 last_error: err,
583 });
584 }
585
586 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
587 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
588 warn!(
589 target: LOG_DB,
590 curr_attempts,
591 err = %err.fmt_compact_anyhow(),
592 delay_ms = %delay,
593 "Database commit failed in an autocommit block - retrying"
594 );
595 crate::runtime::sleep(Duration::from_millis(delay)).await;
596 }
597 }
598 }
599 }
600
601 pub async fn wait_key_check<'a, K, T>(
606 &'a self,
607 key: &K,
608 checker: impl Fn(Option<K::Value>) -> Option<T>,
609 ) -> (T, DatabaseTransaction<'a, Committable>)
610 where
611 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
612 {
613 let key_bytes = key.to_bytes();
614 loop {
615 let notify = self.inner.register(&key_bytes);
617
618 let mut tx = self.inner.begin_transaction().await;
620
621 let maybe_value_bytes = tx
622 .raw_get_bytes(&key_bytes)
623 .await
624 .expect("Unrecoverable error when reading from database")
625 .map(|value_bytes| {
626 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
627 });
628
629 if let Some(value) = checker(maybe_value_bytes) {
630 return (
631 value,
632 DatabaseTransaction::new(tx, self.module_decoders.clone()),
633 );
634 }
635
636 notify.await;
638 }
641 }
642
643 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
645 where
646 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
647 {
648 self.wait_key_check(key, std::convert::identity).await.0
649 }
650}
651
652fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
653 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
654 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
655 bytes
656}
657
658#[derive(Clone, Debug)]
661struct PrefixDatabase<Inner>
662where
663 Inner: Debug,
664{
665 prefix: Vec<u8>,
666 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
667 inner: Inner,
668}
669
670impl<Inner> PrefixDatabase<Inner>
671where
672 Inner: Debug,
673{
674 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
678 let mut full_key = self.prefix.clone();
679 full_key.extend_from_slice(key);
680 full_key
681 }
682}
683
684#[apply(async_trait_maybe_send!)]
685impl<Inner> IDatabase for PrefixDatabase<Inner>
686where
687 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
688{
689 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
690 Box::new(PrefixDatabaseTransaction {
691 inner: self.inner.begin_transaction().await,
692 global_dbtx_access_token: self.global_dbtx_access_token,
693 prefix: self.prefix.clone(),
694 })
695 }
696 async fn register(&self, key: &[u8]) {
697 self.inner.register(&self.get_full_key(key)).await;
698 }
699
700 async fn notify(&self, key: &[u8]) {
701 self.inner.notify(&self.get_full_key(key)).await;
702 }
703
704 fn is_global(&self) -> bool {
705 if self.global_dbtx_access_token.is_some() {
706 false
707 } else {
708 self.inner.is_global()
709 }
710 }
711
712 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
713 self.inner.checkpoint(backup_path)
714 }
715}
716
717#[derive(Debug)]
722struct PrefixDatabaseTransaction<Inner> {
723 inner: Inner,
724 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
725 prefix: Vec<u8>,
726}
727
728impl<Inner> PrefixDatabaseTransaction<Inner> {
729 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
733 let mut full_key = self.prefix.clone();
734 full_key.extend_from_slice(key);
735 full_key
736 }
737
738 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
739 Range {
740 start: self.get_full_key(range.start),
741 end: self.get_full_key(range.end),
742 }
743 }
744
745 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
746 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
747 }
748}
749
750#[apply(async_trait_maybe_send!)]
751impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
752where
753 Inner: IDatabaseTransaction,
754{
755 async fn commit_tx(&mut self) -> Result<()> {
756 self.inner.commit_tx().await
757 }
758
759 fn is_global(&self) -> bool {
760 if self.global_dbtx_access_token.is_some() {
761 false
762 } else {
763 self.inner.is_global()
764 }
765 }
766
767 fn global_dbtx(
768 &mut self,
769 access_token: GlobalDBTxAccessToken,
770 ) -> &mut dyn IDatabaseTransaction {
771 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
772 assert_eq!(
773 access_token, self_global_dbtx_access_token,
774 "Invalid access key used to access global_dbtx"
775 );
776 &mut self.inner
777 } else {
778 self.inner.global_dbtx(access_token)
779 }
780 }
781}
782
783#[apply(async_trait_maybe_send!)]
784impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
785where
786 Inner: IDatabaseTransactionOpsCore,
787{
788 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
789 let key = self.get_full_key(key);
790 self.inner.raw_insert_bytes(&key, value).await
791 }
792
793 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794 let key = self.get_full_key(key);
795 self.inner.raw_get_bytes(&key).await
796 }
797
798 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
799 let key = self.get_full_key(key);
800 self.inner.raw_remove_entry(&key).await
801 }
802
803 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
804 let key = self.get_full_key(key_prefix);
805 let stream = self.inner.raw_find_by_prefix(&key).await?;
806 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
807 }
808
809 async fn raw_find_by_prefix_sorted_descending(
810 &mut self,
811 key_prefix: &[u8],
812 ) -> Result<PrefixStream<'_>> {
813 let key = self.get_full_key(key_prefix);
814 let stream = self
815 .inner
816 .raw_find_by_prefix_sorted_descending(&key)
817 .await?;
818 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
819 }
820
821 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
822 let range = self.get_full_range(range);
823 let stream = self
824 .inner
825 .raw_find_by_range(Range {
826 start: &range.start,
827 end: &range.end,
828 })
829 .await?;
830 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
831 }
832
833 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
834 let key = self.get_full_key(key_prefix);
835 self.inner.raw_remove_by_prefix(&key).await
836 }
837}
838
839#[apply(async_trait_maybe_send!)]
840impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
841where
842 Inner: IDatabaseTransactionOps,
843{
844 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
845 self.inner.rollback_tx_to_savepoint().await
846 }
847
848 async fn set_tx_savepoint(&mut self) -> Result<()> {
849 self.set_tx_savepoint().await
850 }
851}
852
853#[apply(async_trait_maybe_send!)]
857pub trait IDatabaseTransactionOpsCore: MaybeSend {
858 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
860
861 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
863
864 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
866
867 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
870
871 async fn raw_find_by_prefix_sorted_descending(
873 &mut self,
874 key_prefix: &[u8],
875 ) -> Result<PrefixStream<'_>>;
876
877 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
881
882 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
884}
885
886#[apply(async_trait_maybe_send!)]
887impl<T> IDatabaseTransactionOpsCore for Box<T>
888where
889 T: IDatabaseTransactionOpsCore + ?Sized,
890{
891 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
892 (**self).raw_insert_bytes(key, value).await
893 }
894
895 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
896 (**self).raw_get_bytes(key).await
897 }
898
899 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
900 (**self).raw_remove_entry(key).await
901 }
902
903 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
904 (**self).raw_find_by_prefix(key_prefix).await
905 }
906
907 async fn raw_find_by_prefix_sorted_descending(
908 &mut self,
909 key_prefix: &[u8],
910 ) -> Result<PrefixStream<'_>> {
911 (**self)
912 .raw_find_by_prefix_sorted_descending(key_prefix)
913 .await
914 }
915
916 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
917 (**self).raw_find_by_range(range).await
918 }
919
920 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
921 (**self).raw_remove_by_prefix(key_prefix).await
922 }
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for &mut T
927where
928 T: IDatabaseTransactionOpsCore + ?Sized,
929{
930 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
931 (**self).raw_insert_bytes(key, value).await
932 }
933
934 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
935 (**self).raw_get_bytes(key).await
936 }
937
938 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
939 (**self).raw_remove_entry(key).await
940 }
941
942 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
943 (**self).raw_find_by_prefix(key_prefix).await
944 }
945
946 async fn raw_find_by_prefix_sorted_descending(
947 &mut self,
948 key_prefix: &[u8],
949 ) -> Result<PrefixStream<'_>> {
950 (**self)
951 .raw_find_by_prefix_sorted_descending(key_prefix)
952 .await
953 }
954
955 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
956 (**self).raw_find_by_range(range).await
957 }
958
959 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
960 (**self).raw_remove_by_prefix(key_prefix).await
961 }
962}
963
964#[apply(async_trait_maybe_send!)]
970pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
971 async fn set_tx_savepoint(&mut self) -> Result<()>;
980
981 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
982}
983
984#[apply(async_trait_maybe_send!)]
985impl<T> IDatabaseTransactionOps for Box<T>
986where
987 T: IDatabaseTransactionOps + ?Sized,
988{
989 async fn set_tx_savepoint(&mut self) -> Result<()> {
990 (**self).set_tx_savepoint().await
991 }
992
993 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
994 (**self).rollback_tx_to_savepoint().await
995 }
996}
997
998#[apply(async_trait_maybe_send!)]
999impl<T> IDatabaseTransactionOps for &mut T
1000where
1001 T: IDatabaseTransactionOps + ?Sized,
1002{
1003 async fn set_tx_savepoint(&mut self) -> Result<()> {
1004 (**self).set_tx_savepoint().await
1005 }
1006
1007 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1008 (**self).rollback_tx_to_savepoint().await
1009 }
1010}
1011
1012#[apply(async_trait_maybe_send!)]
1018pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1019 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1020 where
1021 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1022
1023 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1024 where
1025 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1026 K::Value: MaybeSend + MaybeSync;
1027
1028 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1029 where
1030 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1031 K::Value: MaybeSend + MaybeSync;
1032
1033 async fn find_by_range<K>(
1034 &mut self,
1035 key_range: Range<K>,
1036 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1037 where
1038 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1039 K::Value: MaybeSend + MaybeSync;
1040
1041 async fn find_by_prefix<KP>(
1042 &mut self,
1043 key_prefix: &KP,
1044 ) -> Pin<
1045 Box<
1046 maybe_add_send!(
1047 dyn Stream<
1048 Item = (
1049 KP::Record,
1050 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1051 ),
1052 > + '_
1053 ),
1054 >,
1055 >
1056 where
1057 KP: DatabaseLookup + MaybeSend + MaybeSync,
1058 KP::Record: DatabaseKey;
1059
1060 async fn find_by_prefix_sorted_descending<KP>(
1061 &mut self,
1062 key_prefix: &KP,
1063 ) -> Pin<
1064 Box<
1065 maybe_add_send!(
1066 dyn Stream<
1067 Item = (
1068 KP::Record,
1069 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070 ),
1071 > + '_
1072 ),
1073 >,
1074 >
1075 where
1076 KP: DatabaseLookup + MaybeSend + MaybeSync,
1077 KP::Record: DatabaseKey;
1078
1079 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1080 where
1081 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1082
1083 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1084 where
1085 KP: DatabaseLookup + MaybeSend + MaybeSync;
1086}
1087
1088#[apply(async_trait_maybe_send!)]
1091impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1092where
1093 T: IDatabaseTransactionOpsCore + WithDecoders,
1094{
1095 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1096 where
1097 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1098 {
1099 let key_bytes = key.to_bytes();
1100 let raw = self
1101 .raw_get_bytes(&key_bytes)
1102 .await
1103 .expect("Unrecoverable error occurred while reading and entry from the database");
1104 raw.map(|value_bytes| {
1105 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1106 })
1107 }
1108
1109 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1110 where
1111 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1112 K::Value: MaybeSend + MaybeSync,
1113 {
1114 let key_bytes = key.to_bytes();
1115 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1116 .await
1117 .expect("Unrecoverable error occurred while inserting entry into the database")
1118 .map(|value_bytes| {
1119 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1120 })
1121 }
1122
1123 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1124 where
1125 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1126 K::Value: MaybeSend + MaybeSync,
1127 {
1128 if let Some(prev) = self.insert_entry(key, value).await {
1129 panic!(
1130 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1131 );
1132 }
1133 }
1134
1135 async fn find_by_range<K>(
1136 &mut self,
1137 key_range: Range<K>,
1138 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1139 where
1140 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1141 K::Value: MaybeSend + MaybeSync,
1142 {
1143 let decoders = self.decoders().clone();
1144 Box::pin(
1145 self.raw_find_by_range(Range {
1146 start: &key_range.start.to_bytes(),
1147 end: &key_range.end.to_bytes(),
1148 })
1149 .await
1150 .expect("Unrecoverable error occurred while listing entries from the database")
1151 .map(move |(key_bytes, value_bytes)| {
1152 let key = decode_key_expect(&key_bytes, &decoders);
1153 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1154 (key, value)
1155 }),
1156 )
1157 }
1158
1159 async fn find_by_prefix<KP>(
1160 &mut self,
1161 key_prefix: &KP,
1162 ) -> Pin<
1163 Box<
1164 maybe_add_send!(
1165 dyn Stream<
1166 Item = (
1167 KP::Record,
1168 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1169 ),
1170 > + '_
1171 ),
1172 >,
1173 >
1174 where
1175 KP: DatabaseLookup + MaybeSend + MaybeSync,
1176 KP::Record: DatabaseKey,
1177 {
1178 let decoders = self.decoders().clone();
1179 Box::pin(
1180 self.raw_find_by_prefix(&key_prefix.to_bytes())
1181 .await
1182 .expect("Unrecoverable error occurred while listing entries from the database")
1183 .map(move |(key_bytes, value_bytes)| {
1184 let key = decode_key_expect(&key_bytes, &decoders);
1185 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1186 (key, value)
1187 }),
1188 )
1189 }
1190
1191 async fn find_by_prefix_sorted_descending<KP>(
1192 &mut self,
1193 key_prefix: &KP,
1194 ) -> Pin<
1195 Box<
1196 maybe_add_send!(
1197 dyn Stream<
1198 Item = (
1199 KP::Record,
1200 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1201 ),
1202 > + '_
1203 ),
1204 >,
1205 >
1206 where
1207 KP: DatabaseLookup + MaybeSend + MaybeSync,
1208 KP::Record: DatabaseKey,
1209 {
1210 let decoders = self.decoders().clone();
1211 Box::pin(
1212 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1213 .await
1214 .expect("Unrecoverable error occurred while listing entries from the database")
1215 .map(move |(key_bytes, value_bytes)| {
1216 let key = decode_key_expect(&key_bytes, &decoders);
1217 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1218 (key, value)
1219 }),
1220 )
1221 }
1222 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1223 where
1224 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1225 {
1226 let key_bytes = key.to_bytes();
1227 self.raw_remove_entry(&key_bytes)
1228 .await
1229 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1230 .map(|value_bytes| {
1231 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1232 })
1233 }
1234 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1235 where
1236 KP: DatabaseLookup + MaybeSend + MaybeSync,
1237 {
1238 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1239 .await
1240 .expect("Unrecoverable error when removing entries from the database");
1241 }
1242}
1243
1244pub trait WithDecoders {
1247 fn decoders(&self) -> &ModuleDecoderRegistry;
1248}
1249
1250#[apply(async_trait_maybe_send!)]
1252pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1253 async fn commit_tx(self) -> Result<()>;
1254}
1255
1256#[apply(async_trait_maybe_send!)]
1260pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1261 async fn commit_tx(&mut self) -> Result<()>;
1263
1264 fn is_global(&self) -> bool;
1266
1267 #[doc(hidden)]
1272 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1273 -> &mut dyn IDatabaseTransaction;
1274}
1275
1276#[apply(async_trait_maybe_send!)]
1277impl<T> IDatabaseTransaction for Box<T>
1278where
1279 T: IDatabaseTransaction + ?Sized,
1280{
1281 async fn commit_tx(&mut self) -> Result<()> {
1282 (**self).commit_tx().await
1283 }
1284
1285 fn is_global(&self) -> bool {
1286 (**self).is_global()
1287 }
1288
1289 fn global_dbtx(
1290 &mut self,
1291 access_token: GlobalDBTxAccessToken,
1292 ) -> &mut dyn IDatabaseTransaction {
1293 (**self).global_dbtx(access_token)
1294 }
1295}
1296
1297#[apply(async_trait_maybe_send!)]
1298impl<'a, T> IDatabaseTransaction for &'a mut T
1299where
1300 T: IDatabaseTransaction + ?Sized,
1301{
1302 async fn commit_tx(&mut self) -> Result<()> {
1303 (**self).commit_tx().await
1304 }
1305
1306 fn is_global(&self) -> bool {
1307 (**self).is_global()
1308 }
1309
1310 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1311 (**self).global_dbtx(access_key)
1312 }
1313}
1314
1315struct BaseDatabaseTransaction<Tx> {
1318 raw: Option<Tx>,
1320 notify_queue: Option<NotifyQueue>,
1321 notifications: Arc<Notifications>,
1322}
1323
1324impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1325where
1326 Tx: fmt::Debug,
1327{
1328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329 f.write_fmt(format_args!(
1330 "BaseDatabaseTransaction{{ raw={:?} }}",
1331 self.raw
1332 ))
1333 }
1334}
1335impl<Tx> BaseDatabaseTransaction<Tx>
1336where
1337 Tx: IRawDatabaseTransaction,
1338{
1339 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1340 Self {
1341 raw: Some(dbtx),
1342 notifications,
1343 notify_queue: Some(NotifyQueue::new()),
1344 }
1345 }
1346
1347 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1348 self.notify_queue
1349 .as_mut()
1350 .context("can not call add_notification_key after commit")?
1351 .add(&key);
1352 Ok(())
1353 }
1354}
1355
1356#[apply(async_trait_maybe_send!)]
1357impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1358 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1359 self.add_notification_key(key)?;
1360 self.raw
1361 .as_mut()
1362 .context("Cannot insert into already consumed transaction")?
1363 .raw_insert_bytes(key, value)
1364 .await
1365 }
1366
1367 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1368 self.raw
1369 .as_mut()
1370 .context("Cannot retrieve from already consumed transaction")?
1371 .raw_get_bytes(key)
1372 .await
1373 }
1374
1375 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1376 self.add_notification_key(key)?;
1377 self.raw
1378 .as_mut()
1379 .context("Cannot remove from already consumed transaction")?
1380 .raw_remove_entry(key)
1381 .await
1382 }
1383
1384 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1385 self.raw
1386 .as_mut()
1387 .context("Cannot retrieve from already consumed transaction")?
1388 .raw_find_by_range(key_range)
1389 .await
1390 }
1391
1392 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1393 self.raw
1394 .as_mut()
1395 .context("Cannot retrieve from already consumed transaction")?
1396 .raw_find_by_prefix(key_prefix)
1397 .await
1398 }
1399
1400 async fn raw_find_by_prefix_sorted_descending(
1401 &mut self,
1402 key_prefix: &[u8],
1403 ) -> Result<PrefixStream<'_>> {
1404 self.raw
1405 .as_mut()
1406 .context("Cannot retrieve from already consumed transaction")?
1407 .raw_find_by_prefix_sorted_descending(key_prefix)
1408 .await
1409 }
1410
1411 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1412 self.raw
1413 .as_mut()
1414 .context("Cannot remove from already consumed transaction")?
1415 .raw_remove_by_prefix(key_prefix)
1416 .await
1417 }
1418}
1419
1420#[apply(async_trait_maybe_send!)]
1421impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1422 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1423 self.raw
1424 .as_mut()
1425 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1426 .rollback_tx_to_savepoint()
1427 .await?;
1428 Ok(())
1429 }
1430
1431 async fn set_tx_savepoint(&mut self) -> Result<()> {
1432 self.raw
1433 .as_mut()
1434 .context("Cannot set a tx savepoint on an already consumed transaction")?
1435 .set_tx_savepoint()
1436 .await?;
1437 Ok(())
1438 }
1439}
1440
1441#[apply(async_trait_maybe_send!)]
1442impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1443 for BaseDatabaseTransaction<Tx>
1444{
1445 async fn commit_tx(&mut self) -> Result<()> {
1446 self.raw
1447 .take()
1448 .context("Cannot commit an already committed transaction")?
1449 .commit_tx()
1450 .await?;
1451 self.notifications.submit_queue(
1452 &self
1453 .notify_queue
1454 .take()
1455 .expect("commit must be called only once"),
1456 );
1457 Ok(())
1458 }
1459
1460 fn is_global(&self) -> bool {
1461 true
1462 }
1463
1464 fn global_dbtx(
1465 &mut self,
1466 _access_token: GlobalDBTxAccessToken,
1467 ) -> &mut dyn IDatabaseTransaction {
1468 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1469 }
1470}
1471
1472#[derive(Clone)]
1475struct CommitTracker {
1476 is_committed: bool,
1478 has_writes: bool,
1480 ignore_uncommitted: bool,
1482}
1483
1484impl Drop for CommitTracker {
1485 fn drop(&mut self) {
1486 if self.has_writes && !self.is_committed {
1487 if self.ignore_uncommitted {
1488 trace!(
1489 target: LOG_DB,
1490 "DatabaseTransaction has writes and has not called commit, but that's expected."
1491 );
1492 } else {
1493 warn!(
1494 target: LOG_DB,
1495 location = ?backtrace::Backtrace::new(),
1496 "DatabaseTransaction has writes and has not called commit."
1497 );
1498 }
1499 }
1500 }
1501}
1502
1503enum MaybeRef<'a, T> {
1504 Owned(T),
1505 Borrowed(&'a mut T),
1506}
1507
1508impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1509 type Target = T;
1510
1511 fn deref(&self) -> &Self::Target {
1512 match self {
1513 MaybeRef::Owned(o) => o,
1514 MaybeRef::Borrowed(r) => r,
1515 }
1516 }
1517}
1518
1519impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1520 fn deref_mut(&mut self) -> &mut Self::Target {
1521 match self {
1522 MaybeRef::Owned(o) => o,
1523 MaybeRef::Borrowed(r) => r,
1524 }
1525 }
1526}
1527
1528pub struct Committable;
1532
1533pub struct NonCommittable;
1537
1538pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1542 tx: Box<dyn IDatabaseTransaction + 'tx>,
1543 decoders: ModuleDecoderRegistry,
1544 commit_tracker: MaybeRef<'tx, CommitTracker>,
1545 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1546 capability: marker::PhantomData<Cap>,
1547}
1548
1549impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1551 f.write_fmt(format_args!(
1552 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1553 self.tx, self.decoders
1554 ))
1555 }
1556}
1557
1558impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1559 fn decoders(&self) -> &ModuleDecoderRegistry {
1560 &self.decoders
1561 }
1562}
1563
1564#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1565fn decode_value<V: DatabaseValue>(
1566 value_bytes: &[u8],
1567 decoders: &ModuleDecoderRegistry,
1568) -> Result<V, DecodingError> {
1569 trace!(
1570 bytes = %AbbreviateHexBytes(value_bytes),
1571 "decoding value",
1572 );
1573 V::from_bytes(value_bytes, decoders)
1574}
1575
1576fn decode_value_expect<V: DatabaseValue>(
1577 value_bytes: &[u8],
1578 decoders: &ModuleDecoderRegistry,
1579 key_bytes: &[u8],
1580) -> V {
1581 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1582 panic!(
1583 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1584 any::type_name::<V>(),
1585 err,
1586 AbbreviateHexBytes(key_bytes),
1587 AbbreviateHexBytes(value_bytes),
1588 )
1589 })
1590}
1591
1592fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1593 trace!(
1594 bytes = %AbbreviateHexBytes(key_bytes),
1595 "decoding key",
1596 );
1597 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1598 panic!(
1599 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1600 any::type_name::<K>(),
1601 err,
1602 AbbreviateHexBytes(key_bytes)
1603 )
1604 })
1605}
1606
1607impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1608 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1610 DatabaseTransaction {
1611 tx: self.tx,
1612 decoders: self.decoders,
1613 commit_tracker: self.commit_tracker,
1614 on_commit_hooks: self.on_commit_hooks,
1615 capability: PhantomData::<NonCommittable>,
1616 }
1617 }
1618
1619 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1621 where
1622 's: 'a,
1623 {
1624 self.to_ref().into_nc()
1625 }
1626
1627 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1629 where
1630 'tx: 'a,
1631 {
1632 DatabaseTransaction {
1633 tx: Box::new(PrefixDatabaseTransaction {
1634 inner: self.tx,
1635 global_dbtx_access_token: None,
1636 prefix,
1637 }),
1638 decoders: self.decoders,
1639 commit_tracker: self.commit_tracker,
1640 on_commit_hooks: self.on_commit_hooks,
1641 capability: self.capability,
1642 }
1643 }
1644
1645 pub fn with_prefix_module_id<'a: 'tx>(
1649 self,
1650 module_instance_id: ModuleInstanceId,
1651 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1652 where
1653 'tx: 'a,
1654 {
1655 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1656 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1657 (
1658 DatabaseTransaction {
1659 tx: Box::new(PrefixDatabaseTransaction {
1660 inner: self.tx,
1661 global_dbtx_access_token: Some(global_dbtx_access_token),
1662 prefix,
1663 }),
1664 decoders: self.decoders,
1665 commit_tracker: self.commit_tracker,
1666 on_commit_hooks: self.on_commit_hooks,
1667 capability: self.capability,
1668 },
1669 global_dbtx_access_token,
1670 )
1671 }
1672
1673 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1675 where
1676 's: 'a,
1677 {
1678 let decoders = self.decoders.clone();
1679
1680 DatabaseTransaction {
1681 tx: Box::new(&mut self.tx),
1682 decoders,
1683 commit_tracker: match self.commit_tracker {
1684 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1685 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1686 },
1687 on_commit_hooks: match self.on_commit_hooks {
1688 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1689 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1690 },
1691 capability: self.capability,
1692 }
1693 }
1694
1695 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1697 where
1698 'tx: 'a,
1699 {
1700 DatabaseTransaction {
1701 tx: Box::new(PrefixDatabaseTransaction {
1702 inner: &mut self.tx,
1703 global_dbtx_access_token: None,
1704 prefix,
1705 }),
1706 decoders: self.decoders.clone(),
1707 commit_tracker: match self.commit_tracker {
1708 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1709 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1710 },
1711 on_commit_hooks: match self.on_commit_hooks {
1712 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1713 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1714 },
1715 capability: self.capability,
1716 }
1717 }
1718
1719 pub fn to_ref_with_prefix_module_id<'a>(
1720 &'a mut self,
1721 module_instance_id: ModuleInstanceId,
1722 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1723 where
1724 'tx: 'a,
1725 {
1726 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1727 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1728 (
1729 DatabaseTransaction {
1730 tx: Box::new(PrefixDatabaseTransaction {
1731 inner: &mut self.tx,
1732 global_dbtx_access_token: Some(global_dbtx_access_token),
1733 prefix,
1734 }),
1735 decoders: self.decoders.clone(),
1736 commit_tracker: match self.commit_tracker {
1737 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1738 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1739 },
1740 on_commit_hooks: match self.on_commit_hooks {
1741 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1742 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1743 },
1744 capability: self.capability,
1745 },
1746 global_dbtx_access_token,
1747 )
1748 }
1749
1750 pub fn is_global(&self) -> bool {
1752 self.tx.is_global()
1753 }
1754
1755 pub fn ensure_global(&self) -> Result<()> {
1757 if !self.is_global() {
1758 bail!("Database instance not global");
1759 }
1760
1761 Ok(())
1762 }
1763
1764 pub fn ensure_isolated(&self) -> Result<()> {
1766 if self.is_global() {
1767 bail!("Database instance not isolated");
1768 }
1769
1770 Ok(())
1771 }
1772
1773 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1775 self.commit_tracker.ignore_uncommitted = true;
1776 self
1777 }
1778
1779 pub fn warn_uncommitted(&mut self) -> &mut Self {
1781 self.commit_tracker.ignore_uncommitted = false;
1782 self
1783 }
1784
1785 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1787 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1788 self.on_commit_hooks.push(Box::new(f));
1789 }
1790
1791 pub fn global_dbtx<'a>(
1792 &'a mut self,
1793 access_token: GlobalDBTxAccessToken,
1794 ) -> DatabaseTransaction<'a, Cap>
1795 where
1796 'tx: 'a,
1797 {
1798 let decoders = self.decoders.clone();
1799
1800 DatabaseTransaction {
1801 tx: Box::new(self.tx.global_dbtx(access_token)),
1802 decoders,
1803 commit_tracker: match self.commit_tracker {
1804 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1805 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1806 },
1807 on_commit_hooks: match self.on_commit_hooks {
1808 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1809 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1810 },
1811 capability: self.capability,
1812 }
1813 }
1814}
1815
1816#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1818pub struct GlobalDBTxAccessToken(u32);
1819
1820impl GlobalDBTxAccessToken {
1821 fn from_prefix(prefix: &[u8]) -> Self {
1832 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1833 }
1834}
1835
1836impl<'tx> DatabaseTransaction<'tx, Committable> {
1837 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1838 Self {
1839 tx: dbtx,
1840 decoders,
1841 commit_tracker: MaybeRef::Owned(CommitTracker {
1842 is_committed: false,
1843 has_writes: false,
1844 ignore_uncommitted: false,
1845 }),
1846 on_commit_hooks: MaybeRef::Owned(vec![]),
1847 capability: PhantomData,
1848 }
1849 }
1850
1851 pub async fn commit_tx_result(mut self) -> Result<()> {
1852 self.commit_tracker.is_committed = true;
1853 let commit_result = self.tx.commit_tx().await;
1854
1855 if commit_result.is_ok() {
1857 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1858 hook();
1859 }
1860 }
1861
1862 commit_result
1863 }
1864
1865 pub async fn commit_tx(mut self) {
1866 self.commit_tracker.is_committed = true;
1867 self.commit_tx_result()
1868 .await
1869 .expect("Unrecoverable error occurred while committing to the database.");
1870 }
1871}
1872
1873#[apply(async_trait_maybe_send!)]
1874impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1875where
1876 Cap: Send,
1877{
1878 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1879 self.commit_tracker.has_writes = true;
1880 self.tx.raw_insert_bytes(key, value).await
1881 }
1882
1883 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1884 self.tx.raw_get_bytes(key).await
1885 }
1886
1887 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1888 self.tx.raw_remove_entry(key).await
1889 }
1890
1891 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1892 self.tx.raw_find_by_range(key_range).await
1893 }
1894
1895 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1896 self.tx.raw_find_by_prefix(key_prefix).await
1897 }
1898
1899 async fn raw_find_by_prefix_sorted_descending(
1900 &mut self,
1901 key_prefix: &[u8],
1902 ) -> Result<PrefixStream<'_>> {
1903 self.tx
1904 .raw_find_by_prefix_sorted_descending(key_prefix)
1905 .await
1906 }
1907
1908 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1909 self.commit_tracker.has_writes = true;
1910 self.tx.raw_remove_by_prefix(key_prefix).await
1911 }
1912}
1913#[apply(async_trait_maybe_send!)]
1914impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1915 async fn set_tx_savepoint(&mut self) -> Result<()> {
1916 self.tx.set_tx_savepoint().await
1917 }
1918
1919 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1920 self.tx.rollback_tx_to_savepoint().await
1921 }
1922}
1923
1924impl<T> DatabaseKeyPrefix for T
1925where
1926 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1927{
1928 fn to_bytes(&self) -> Vec<u8> {
1929 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1930 data.append(&mut self.consensus_encode_to_vec());
1931 data
1932 }
1933}
1934
1935impl<T> DatabaseKey for T
1936where
1937 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1940{
1941 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1942 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1943 if data.is_empty() {
1944 return Err(DecodingError::wrong_length(1, 0));
1946 }
1947
1948 if data[0] != Self::DB_PREFIX {
1949 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1950 }
1951
1952 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1953 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1954 }
1955}
1956
1957impl<T> DatabaseValue for T
1958where
1959 T: Debug + Encodable + Decodable,
1960{
1961 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1962 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1963 }
1964
1965 fn to_bytes(&self) -> Vec<u8> {
1966 self.consensus_encode_to_vec()
1967 }
1968}
1969
1970#[macro_export]
2031macro_rules! impl_db_record {
2032 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2033 impl $crate::db::DatabaseRecord for $key {
2034 const DB_PREFIX: u8 = $db_prefix as u8;
2035 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2036 type Key = Self;
2037 type Value = $val;
2038 }
2039 $(
2040 impl_db_record! {
2041 @impl_notify_marker key = $key, notify_on_modify = $notify
2042 }
2043 )?
2044 };
2045 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2047 impl $crate::db::DatabaseKeyWithNotify for $key {}
2048 };
2049 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2051}
2052
2053#[macro_export]
2054macro_rules! impl_db_lookup{
2055 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2056 $(
2057 impl $crate::db::DatabaseLookup for $query_prefix {
2058 type Record = $key;
2059 }
2060 )*
2061 };
2062}
2063
2064#[derive(Debug, Encodable, Decodable, Serialize)]
2066pub struct DatabaseVersionKeyV0;
2067
2068#[derive(Debug, Encodable, Decodable, Serialize)]
2069pub struct DatabaseVersionKey(pub ModuleInstanceId);
2070
2071#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2072pub struct DatabaseVersion(pub u64);
2073
2074impl_db_record!(
2075 key = DatabaseVersionKeyV0,
2076 value = DatabaseVersion,
2077 db_prefix = DbKeyPrefix::DatabaseVersion
2078);
2079
2080impl_db_record!(
2081 key = DatabaseVersionKey,
2082 value = DatabaseVersion,
2083 db_prefix = DbKeyPrefix::DatabaseVersion
2084);
2085
2086impl std::fmt::Display for DatabaseVersion {
2087 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2088 write!(f, "{}", self.0)
2089 }
2090}
2091
2092impl DatabaseVersion {
2093 pub fn increment(&self) -> Self {
2094 Self(self.0 + 1)
2095 }
2096}
2097
2098impl std::fmt::Display for DbKeyPrefix {
2099 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2100 write!(f, "{self:?}")
2101 }
2102}
2103
2104#[repr(u8)]
2105#[derive(Clone, EnumIter, Debug)]
2106pub enum DbKeyPrefix {
2107 DatabaseVersion = 0x50,
2108 ClientBackup = 0x51,
2109}
2110
2111#[derive(Debug, Error)]
2112pub enum DecodingError {
2113 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2114 WrongPrefix { expected: u8, found: u8 },
2115 #[error("Key had a wrong length, expected {expected} but got {found}")]
2116 WrongLength { expected: usize, found: usize },
2117 #[error("Other decoding error: {0:#}")]
2118 Other(anyhow::Error),
2119}
2120
2121impl DecodingError {
2122 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2123 Self::Other(anyhow::Error::from(error))
2124 }
2125
2126 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2127 Self::WrongPrefix { expected, found }
2128 }
2129
2130 pub fn wrong_length(expected: usize, found: usize) -> Self {
2131 Self::WrongLength { expected, found }
2132 }
2133}
2134
2135#[macro_export]
2136macro_rules! push_db_pair_items {
2137 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2138 let db_items =
2139 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2140 .await
2141 .map(|(key, val)| {
2142 (
2143 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2144 val,
2145 )
2146 })
2147 .collect::<BTreeMap<String, $value_type>>()
2148 .await;
2149
2150 $map.insert($key_literal.to_string(), Box::new(db_items));
2151 };
2152}
2153
2154#[macro_export]
2155macro_rules! push_db_key_items {
2156 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2157 let db_items =
2158 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2159 .await
2160 .map(|(key, _)| key)
2161 .collect::<Vec<$key_type>>()
2162 .await;
2163
2164 $map.insert($key_literal.to_string(), Box::new(db_items));
2165 };
2166}
2167
2168pub type CoreMigrationFn = for<'tx> fn(
2171 MigrationContext<'tx>,
2172) -> Pin<
2173 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2174>;
2175
2176pub fn get_current_database_version<F>(
2180 migrations: &BTreeMap<DatabaseVersion, F>,
2181) -> DatabaseVersion {
2182 let versions = migrations.keys().copied().collect::<Vec<_>>();
2183
2184 if !versions
2187 .windows(2)
2188 .all(|window| window[0].increment() == window[1])
2189 {
2190 panic!("Database Migrations are not defined contiguously");
2191 }
2192
2193 versions
2194 .last()
2195 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2196}
2197
2198pub async fn apply_migrations_server(
2200 db: &Database,
2201 kind: String,
2202 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2203) -> Result<(), anyhow::Error> {
2204 let mut global_dbtx = db.begin_transaction().await;
2205 global_dbtx.ensure_global()?;
2206 apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), kind, migrations).await?;
2207 global_dbtx.commit_tx_result().await
2208}
2209
2210pub async fn apply_migrations_server_dbtx(
2212 global_dbtx: &mut DatabaseTransaction<'_>,
2213 kind: String,
2214 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2215) -> Result<(), anyhow::Error> {
2216 global_dbtx.ensure_global()?;
2217 apply_migrations_dbtx(global_dbtx, kind, migrations, None, None).await
2218}
2219
2220pub async fn apply_migrations(
2221 db: &Database,
2222 kind: String,
2223 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2224 module_instance_id: Option<ModuleInstanceId>,
2225 external_prefixes_above: Option<u8>,
2228) -> Result<(), anyhow::Error> {
2229 let mut dbtx = db.begin_transaction().await;
2230 apply_migrations_dbtx(
2231 &mut dbtx.to_ref_nc(),
2232 kind,
2233 migrations,
2234 module_instance_id,
2235 external_prefixes_above,
2236 )
2237 .await?;
2238
2239 dbtx.commit_tx_result().await
2240}
2241pub async fn apply_migrations_dbtx(
2253 global_dbtx: &mut DatabaseTransaction<'_>,
2254 kind: String,
2255 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
2256 module_instance_id: Option<ModuleInstanceId>,
2257 external_prefixes_above: Option<u8>,
2260) -> Result<(), anyhow::Error> {
2261 let is_new_db = global_dbtx
2264 .raw_find_by_prefix(&[])
2265 .await?
2266 .filter(|(key, _v)| {
2267 std::future::ready(
2268 external_prefixes_above.is_none_or(|external_prefixes_above| {
2269 !key.is_empty() && key[0] < external_prefixes_above
2270 }),
2271 )
2272 })
2273 .next()
2274 .await
2275 .is_none();
2276
2277 let target_db_version = get_current_database_version(&migrations);
2278
2279 create_database_version_dbtx(
2281 global_dbtx,
2282 target_db_version,
2283 module_instance_id,
2284 kind.clone(),
2285 is_new_db,
2286 )
2287 .await?;
2288
2289 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2290
2291 let disk_version = global_dbtx
2292 .get_value(&DatabaseVersionKey(module_instance_id_key))
2293 .await;
2294
2295 let db_version = if let Some(disk_version) = disk_version {
2296 let mut current_db_version = disk_version;
2297
2298 if current_db_version > target_db_version {
2299 return Err(anyhow::anyhow!(format!(
2300 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2301 )));
2302 }
2303
2304 while current_db_version < target_db_version {
2305 if let Some(migration) = migrations.get(¤t_db_version) {
2306 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2307 migration(MigrationContext {
2308 dbtx: global_dbtx.to_ref_nc(),
2309 module_instance_id,
2310 })
2311 .await?;
2312 } else {
2313 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2314 }
2315
2316 current_db_version = current_db_version.increment();
2317 global_dbtx
2318 .insert_entry(
2319 &DatabaseVersionKey(module_instance_id_key),
2320 ¤t_db_version,
2321 )
2322 .await;
2323 }
2324
2325 current_db_version
2326 } else {
2327 target_db_version
2328 };
2329
2330 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2331 Ok(())
2332}
2333
2334pub async fn create_database_version(
2335 db: &Database,
2336 target_db_version: DatabaseVersion,
2337 module_instance_id: Option<ModuleInstanceId>,
2338 kind: String,
2339 is_new_db: bool,
2340) -> Result<(), anyhow::Error> {
2341 let mut dbtx = db.begin_transaction().await;
2342
2343 create_database_version_dbtx(
2344 &mut dbtx.to_ref_nc(),
2345 target_db_version,
2346 module_instance_id,
2347 kind,
2348 is_new_db,
2349 )
2350 .await?;
2351
2352 dbtx.commit_tx_result().await?;
2353 Ok(())
2354}
2355
2356pub async fn create_database_version_dbtx(
2360 global_dbtx: &mut DatabaseTransaction<'_>,
2361 target_db_version: DatabaseVersion,
2362 module_instance_id: Option<ModuleInstanceId>,
2363 kind: String,
2364 is_new_db: bool,
2365) -> Result<(), anyhow::Error> {
2366 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2367
2368 if global_dbtx
2372 .get_value(&DatabaseVersionKey(key_module_instance_id))
2373 .await
2374 .is_none()
2375 {
2376 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2385 remove_current_db_version_if_exists(
2386 &mut global_dbtx
2387 .to_ref_with_prefix_module_id(module_instance_id)
2388 .0
2389 .into_nc(),
2390 is_new_db,
2391 target_db_version,
2392 )
2393 .await
2394 } else {
2395 remove_current_db_version_if_exists(
2396 &mut global_dbtx.to_ref().into_nc(),
2397 is_new_db,
2398 target_db_version,
2399 )
2400 .await
2401 };
2402
2403 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2405 global_dbtx
2406 .insert_new_entry(
2407 &DatabaseVersionKey(key_module_instance_id),
2408 ¤t_version_in_module,
2409 )
2410 .await;
2411 }
2412
2413 Ok(())
2414}
2415
2416async fn remove_current_db_version_if_exists(
2421 version_dbtx: &mut DatabaseTransaction<'_>,
2422 is_new_db: bool,
2423 target_db_version: DatabaseVersion,
2424) -> DatabaseVersion {
2425 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2429 match current_version_in_module {
2430 Some(database_version) => database_version,
2431 None if is_new_db => target_db_version,
2432 None => DatabaseVersion(0),
2433 }
2434}
2435
2436fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2439 module_instance_id.map_or_else(
2441 || MODULE_GLOBAL_PREFIX.into(),
2442 |module_instance_id| module_instance_id,
2443 )
2444}
2445
2446pub struct MigrationContext<'tx> {
2447 dbtx: DatabaseTransaction<'tx>,
2448 module_instance_id: Option<ModuleInstanceId>,
2449}
2450
2451impl<'tx> MigrationContext<'tx> {
2452 pub fn dbtx(&mut self) -> DatabaseTransaction {
2453 if let Some(module_instance_id) = self.module_instance_id {
2454 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2455 } else {
2456 self.dbtx.to_ref_nc()
2457 }
2458 }
2459
2460 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2461 self.module_instance_id
2462 }
2463
2464 #[doc(hidden)]
2465 pub fn __global_dbtx(&mut self) -> &mut DatabaseTransaction<'tx> {
2466 &mut self.dbtx
2467 }
2468}
2469
2470#[allow(unused_imports)]
2471mod test_utils {
2472 use std::collections::BTreeMap;
2473 use std::time::Duration;
2474
2475 use fedimint_core::db::MigrationContext;
2476 use futures::future::ready;
2477 use futures::{Future, FutureExt, StreamExt};
2478 use rand::Rng;
2479 use tokio::join;
2480
2481 use super::{
2482 CoreMigrationFn, Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
2483 DatabaseVersionKeyV0, apply_migrations,
2484 };
2485 use crate::core::ModuleKind;
2486 use crate::db::mem_impl::MemDatabase;
2487 use crate::db::{
2488 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2489 };
2490 use crate::encoding::{Decodable, Encodable};
2491 use crate::module::registry::ModuleDecoderRegistry;
2492
2493 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2494 crate::runtime::timeout(Duration::from_millis(10), fut)
2495 .await
2496 .ok()
2497 }
2498
2499 #[repr(u8)]
2500 #[derive(Clone)]
2501 pub enum TestDbKeyPrefix {
2502 Test = 0x42,
2503 AltTest = 0x43,
2504 PercentTestKey = 0x25,
2505 }
2506
2507 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2508 pub(super) struct TestKey(pub u64);
2509
2510 #[derive(Debug, Encodable, Decodable)]
2511 struct DbPrefixTestPrefix;
2512
2513 impl_db_record!(
2514 key = TestKey,
2515 value = TestVal,
2516 db_prefix = TestDbKeyPrefix::Test,
2517 notify_on_modify = true,
2518 );
2519 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2520
2521 #[derive(Debug, Encodable, Decodable)]
2522 struct TestKeyV0(u64, u64);
2523
2524 #[derive(Debug, Encodable, Decodable)]
2525 struct DbPrefixTestPrefixV0;
2526
2527 impl_db_record!(
2528 key = TestKeyV0,
2529 value = TestVal,
2530 db_prefix = TestDbKeyPrefix::Test,
2531 );
2532 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2533
2534 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2535 struct AltTestKey(u64);
2536
2537 #[derive(Debug, Encodable, Decodable)]
2538 struct AltDbPrefixTestPrefix;
2539
2540 impl_db_record!(
2541 key = AltTestKey,
2542 value = TestVal,
2543 db_prefix = TestDbKeyPrefix::AltTest,
2544 );
2545 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2546
2547 #[derive(Debug, Encodable, Decodable)]
2548 struct PercentTestKey(u64);
2549
2550 #[derive(Debug, Encodable, Decodable)]
2551 struct PercentPrefixTestPrefix;
2552
2553 impl_db_record!(
2554 key = PercentTestKey,
2555 value = TestVal,
2556 db_prefix = TestDbKeyPrefix::PercentTestKey,
2557 );
2558
2559 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2560 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2561 pub(super) struct TestVal(pub u64);
2562
2563 const TEST_MODULE_PREFIX: u16 = 1;
2564 const ALT_MODULE_PREFIX: u16 = 2;
2565
2566 pub async fn verify_insert_elements(db: Database) {
2567 let mut dbtx = db.begin_transaction().await;
2568 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2569 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2570 dbtx.commit_tx().await;
2571
2572 let mut dbtx = db.begin_transaction().await;
2574 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2575 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2576 dbtx.commit_tx().await;
2577
2578 let mut dbtx = db.begin_transaction().await;
2580 assert_eq!(
2581 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2582 Some(TestVal(2))
2583 );
2584 assert_eq!(
2585 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2586 Some(TestVal(3))
2587 );
2588 dbtx.commit_tx().await;
2589
2590 let mut dbtx = db.begin_transaction().await;
2591 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2592 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2593 dbtx.commit_tx().await;
2594 }
2595
2596 pub async fn verify_remove_nonexisting(db: Database) {
2597 let mut dbtx = db.begin_transaction().await;
2598 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2599 let removed = dbtx.remove_entry(&TestKey(1)).await;
2600 assert!(removed.is_none());
2601
2602 dbtx.commit_tx().await;
2604 }
2605
2606 pub async fn verify_remove_existing(db: Database) {
2607 let mut dbtx = db.begin_transaction().await;
2608
2609 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2610
2611 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2612
2613 let removed = dbtx.remove_entry(&TestKey(1)).await;
2614 assert_eq!(removed, Some(TestVal(2)));
2615 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2616
2617 dbtx.commit_tx().await;
2619 }
2620
2621 pub async fn verify_read_own_writes(db: Database) {
2622 let mut dbtx = db.begin_transaction().await;
2623
2624 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2625
2626 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2627
2628 dbtx.commit_tx().await;
2630 }
2631
2632 pub async fn verify_prevent_dirty_reads(db: Database) {
2633 let mut dbtx = db.begin_transaction().await;
2634
2635 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2636
2637 let mut dbtx2 = db.begin_transaction().await;
2639 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2640
2641 dbtx.commit_tx().await;
2643 }
2644
2645 pub async fn verify_find_by_range(db: Database) {
2646 let mut dbtx = db.begin_transaction().await;
2647 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2648 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2649 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2650
2651 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2652 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2653
2654 {
2655 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2656 module_dbtx
2657 .insert_entry(&TestKey(300), &TestVal(3000))
2658 .await;
2659 }
2660
2661 dbtx.commit_tx().await;
2662
2663 let mut dbtx = db.begin_transaction_nc().await;
2665
2666 let returned_keys = dbtx
2667 .find_by_range(TestKey(55)..TestKey(56))
2668 .await
2669 .collect::<Vec<_>>()
2670 .await;
2671
2672 let expected = vec![(TestKey(55), TestVal(9999))];
2673
2674 assert_eq!(returned_keys, expected);
2675
2676 let returned_keys = dbtx
2677 .find_by_range(TestKey(54)..TestKey(56))
2678 .await
2679 .collect::<Vec<_>>()
2680 .await;
2681
2682 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2683 assert_eq!(returned_keys, expected);
2684
2685 let returned_keys = dbtx
2686 .find_by_range(TestKey(54)..TestKey(57))
2687 .await
2688 .collect::<Vec<_>>()
2689 .await;
2690
2691 let expected = vec![
2692 (TestKey(54), TestVal(8888)),
2693 (TestKey(55), TestVal(9999)),
2694 (TestKey(56), TestVal(7777)),
2695 ];
2696 assert_eq!(returned_keys, expected);
2697
2698 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2699 let test_range = module_dbtx
2700 .find_by_range(TestKey(300)..TestKey(301))
2701 .await
2702 .collect::<Vec<_>>()
2703 .await;
2704 assert!(test_range.len() == 1);
2705 }
2706
2707 pub async fn verify_find_by_prefix(db: Database) {
2708 let mut dbtx = db.begin_transaction().await;
2709 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2710 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2711
2712 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2713 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2714 dbtx.commit_tx().await;
2715
2716 let mut dbtx = db.begin_transaction().await;
2718
2719 let returned_keys = dbtx
2720 .find_by_prefix(&DbPrefixTestPrefix)
2721 .await
2722 .collect::<Vec<_>>()
2723 .await;
2724
2725 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2726 assert_eq!(returned_keys, expected);
2727
2728 let reversed = dbtx
2729 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2730 .await
2731 .collect::<Vec<_>>()
2732 .await;
2733 let mut reversed_expected = expected;
2734 reversed_expected.reverse();
2735 assert_eq!(reversed, reversed_expected);
2736
2737 let returned_keys = dbtx
2738 .find_by_prefix(&AltDbPrefixTestPrefix)
2739 .await
2740 .collect::<Vec<_>>()
2741 .await;
2742
2743 let expected = vec![
2744 (AltTestKey(54), TestVal(6666)),
2745 (AltTestKey(55), TestVal(7777)),
2746 ];
2747 assert_eq!(returned_keys, expected);
2748
2749 let reversed = dbtx
2750 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2751 .await
2752 .collect::<Vec<_>>()
2753 .await;
2754 let mut reversed_expected = expected;
2755 reversed_expected.reverse();
2756 assert_eq!(reversed, reversed_expected);
2757 }
2758
2759 pub async fn verify_commit(db: Database) {
2760 let mut dbtx = db.begin_transaction().await;
2761
2762 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2763 dbtx.commit_tx().await;
2764
2765 let mut dbtx2 = db.begin_transaction().await;
2767 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2768 }
2769
2770 pub async fn verify_rollback_to_savepoint(db: Database) {
2771 let mut dbtx_rollback = db.begin_transaction().await;
2772
2773 dbtx_rollback
2774 .insert_entry(&TestKey(20), &TestVal(2000))
2775 .await;
2776
2777 dbtx_rollback
2778 .set_tx_savepoint()
2779 .await
2780 .expect("Error setting transaction savepoint");
2781
2782 dbtx_rollback
2783 .insert_entry(&TestKey(21), &TestVal(2001))
2784 .await;
2785
2786 assert_eq!(
2787 dbtx_rollback.get_value(&TestKey(20)).await,
2788 Some(TestVal(2000))
2789 );
2790 assert_eq!(
2791 dbtx_rollback.get_value(&TestKey(21)).await,
2792 Some(TestVal(2001))
2793 );
2794
2795 dbtx_rollback
2796 .rollback_tx_to_savepoint()
2797 .await
2798 .expect("Error setting transaction savepoint");
2799
2800 assert_eq!(
2801 dbtx_rollback.get_value(&TestKey(20)).await,
2802 Some(TestVal(2000))
2803 );
2804
2805 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2806
2807 dbtx_rollback.commit_tx().await;
2809 }
2810
2811 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2812 let mut dbtx = db.begin_transaction().await;
2813 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2814
2815 let mut dbtx2 = db.begin_transaction().await;
2816
2817 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2818
2819 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2820
2821 dbtx2.commit_tx().await;
2822
2823 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2826
2827 let expected_keys = 0;
2828 let returned_keys = dbtx
2829 .find_by_prefix(&DbPrefixTestPrefix)
2830 .await
2831 .fold(0, |returned_keys, (key, value)| async move {
2832 if key == TestKey(100) {
2833 assert!(value.eq(&TestVal(101)));
2834 }
2835 returned_keys + 1
2836 })
2837 .await;
2838
2839 assert_eq!(returned_keys, expected_keys);
2840 }
2841
2842 pub async fn verify_snapshot_isolation(db: Database) {
2843 async fn random_yield() {
2844 let times = if rand::thread_rng().gen_bool(0.5) {
2845 0
2846 } else {
2847 10
2848 };
2849 for _ in 0..times {
2850 tokio::task::yield_now().await;
2851 }
2852 }
2853
2854 for i in 0..1000 {
2856 let base_key = i * 2;
2857 let tx_accepted_key = base_key;
2858 let spent_input_key = base_key + 1;
2859
2860 join!(
2861 async {
2862 random_yield().await;
2863 let mut dbtx = db.begin_transaction().await;
2864
2865 random_yield().await;
2866 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2867 random_yield().await;
2868 let s = match i % 5 {
2871 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2872 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2873 2 => {
2874 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2875 .await
2876 }
2877 3 => {
2878 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2879 .await
2880 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2881 .map(|(_k, v)| v)
2882 .next()
2883 .await
2884 }
2885 4 => {
2886 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2887 .await
2888 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2889 .map(|(_k, v)| v)
2890 .next()
2891 .await
2892 }
2893 _ => {
2894 panic!("woot?");
2895 }
2896 };
2897
2898 match (a, s) {
2899 (None, None) | (Some(_), Some(_)) => {}
2900 (None, Some(_)) => panic!("none some?! {i}"),
2901 (Some(_), None) => panic!("some none?! {i}"),
2902 }
2903 },
2904 async {
2905 random_yield().await;
2906
2907 let mut dbtx = db.begin_transaction().await;
2908 random_yield().await;
2909 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2910
2911 random_yield().await;
2912 assert_eq!(
2913 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2914 .await,
2915 None
2916 );
2917
2918 random_yield().await;
2919 assert_eq!(
2920 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2921 .await,
2922 None
2923 );
2924 random_yield().await;
2925 dbtx.commit_tx().await;
2926 }
2927 );
2928 }
2929 }
2930
2931 pub async fn verify_phantom_entry(db: Database) {
2932 let mut dbtx = db.begin_transaction().await;
2933
2934 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2935
2936 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2937
2938 dbtx.commit_tx().await;
2939
2940 let mut dbtx = db.begin_transaction().await;
2941 let expected_keys = 2;
2942 let returned_keys = dbtx
2943 .find_by_prefix(&DbPrefixTestPrefix)
2944 .await
2945 .fold(0, |returned_keys, (key, value)| async move {
2946 match key {
2947 TestKey(100) => {
2948 assert!(value.eq(&TestVal(101)));
2949 }
2950 TestKey(101) => {
2951 assert!(value.eq(&TestVal(102)));
2952 }
2953 _ => {}
2954 };
2955 returned_keys + 1
2956 })
2957 .await;
2958
2959 assert_eq!(returned_keys, expected_keys);
2960
2961 let mut dbtx2 = db.begin_transaction().await;
2962
2963 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2964
2965 dbtx2.commit_tx().await;
2966
2967 let returned_keys = dbtx
2968 .find_by_prefix(&DbPrefixTestPrefix)
2969 .await
2970 .fold(0, |returned_keys, (key, value)| async move {
2971 match key {
2972 TestKey(100) => {
2973 assert!(value.eq(&TestVal(101)));
2974 }
2975 TestKey(101) => {
2976 assert!(value.eq(&TestVal(102)));
2977 }
2978 _ => {}
2979 };
2980 returned_keys + 1
2981 })
2982 .await;
2983
2984 assert_eq!(returned_keys, expected_keys);
2985 }
2986
2987 pub async fn expect_write_conflict(db: Database) {
2988 let mut dbtx = db.begin_transaction().await;
2989 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2990 dbtx.commit_tx().await;
2991
2992 let mut dbtx2 = db.begin_transaction().await;
2993 let mut dbtx3 = db.begin_transaction().await;
2994
2995 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2996
2997 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3001
3002 dbtx2.commit_tx().await;
3003 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3004 }
3005
3006 pub async fn verify_string_prefix(db: Database) {
3007 let mut dbtx = db.begin_transaction().await;
3008 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3009
3010 assert_eq!(
3011 dbtx.get_value(&PercentTestKey(100)).await,
3012 Some(TestVal(101))
3013 );
3014
3015 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3016
3017 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3018
3019 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3020
3021 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3024
3025 let expected_keys = 4;
3026 let returned_keys = dbtx
3027 .find_by_prefix(&PercentPrefixTestPrefix)
3028 .await
3029 .fold(0, |returned_keys, (key, value)| async move {
3030 if matches!(key, PercentTestKey(101)) {
3031 assert!(value.eq(&TestVal(100)));
3032 }
3033 returned_keys + 1
3034 })
3035 .await;
3036
3037 assert_eq!(returned_keys, expected_keys);
3038 }
3039
3040 pub async fn verify_remove_by_prefix(db: Database) {
3041 let mut dbtx = db.begin_transaction().await;
3042
3043 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3044
3045 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3046
3047 dbtx.commit_tx().await;
3048
3049 let mut remove_dbtx = db.begin_transaction().await;
3050 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3051 remove_dbtx.commit_tx().await;
3052
3053 let mut dbtx = db.begin_transaction().await;
3054 let expected_keys = 0;
3055 let returned_keys = dbtx
3056 .find_by_prefix(&DbPrefixTestPrefix)
3057 .await
3058 .fold(0, |returned_keys, (key, value)| async move {
3059 match key {
3060 TestKey(100) => {
3061 assert!(value.eq(&TestVal(101)));
3062 }
3063 TestKey(101) => {
3064 assert!(value.eq(&TestVal(102)));
3065 }
3066 _ => {}
3067 };
3068 returned_keys + 1
3069 })
3070 .await;
3071
3072 assert_eq!(returned_keys, expected_keys);
3073 }
3074
3075 pub async fn verify_module_db(db: Database, module_db: Database) {
3076 let mut dbtx = db.begin_transaction().await;
3077
3078 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3079
3080 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3081
3082 dbtx.commit_tx().await;
3083
3084 let mut module_dbtx = module_db.begin_transaction().await;
3086 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3087
3088 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3089
3090 let mut dbtx = db.begin_transaction().await;
3092 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3093
3094 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3095
3096 let mut module_dbtx = module_db.begin_transaction().await;
3097
3098 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3099
3100 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3101
3102 module_dbtx.commit_tx().await;
3103
3104 let expected_keys = 2;
3105 let mut dbtx = db.begin_transaction().await;
3106 let returned_keys = dbtx
3107 .find_by_prefix(&DbPrefixTestPrefix)
3108 .await
3109 .fold(0, |returned_keys, (key, value)| async move {
3110 match key {
3111 TestKey(100) => {
3112 assert!(value.eq(&TestVal(101)));
3113 }
3114 TestKey(101) => {
3115 assert!(value.eq(&TestVal(102)));
3116 }
3117 _ => {}
3118 };
3119 returned_keys + 1
3120 })
3121 .await;
3122
3123 assert_eq!(returned_keys, expected_keys);
3124
3125 let removed = dbtx.remove_entry(&TestKey(100)).await;
3126 assert_eq!(removed, Some(TestVal(101)));
3127 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3128
3129 let mut module_dbtx = module_db.begin_transaction().await;
3130 assert_eq!(
3131 module_dbtx.get_value(&TestKey(100)).await,
3132 Some(TestVal(103))
3133 );
3134 }
3135
3136 pub async fn verify_module_prefix(db: Database) {
3137 let mut test_dbtx = db.begin_transaction().await;
3138 {
3139 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3140
3141 test_module_dbtx
3142 .insert_entry(&TestKey(100), &TestVal(101))
3143 .await;
3144
3145 test_module_dbtx
3146 .insert_entry(&TestKey(101), &TestVal(102))
3147 .await;
3148 }
3149
3150 test_dbtx.commit_tx().await;
3151
3152 let mut alt_dbtx = db.begin_transaction().await;
3153 {
3154 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3155
3156 alt_module_dbtx
3157 .insert_entry(&TestKey(100), &TestVal(103))
3158 .await;
3159
3160 alt_module_dbtx
3161 .insert_entry(&TestKey(101), &TestVal(104))
3162 .await;
3163 }
3164
3165 alt_dbtx.commit_tx().await;
3166
3167 let mut test_dbtx = db.begin_transaction().await;
3169 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3170 assert_eq!(
3171 test_module_dbtx.get_value(&TestKey(100)).await,
3172 Some(TestVal(101))
3173 );
3174
3175 assert_eq!(
3176 test_module_dbtx.get_value(&TestKey(101)).await,
3177 Some(TestVal(102))
3178 );
3179
3180 let expected_keys = 2;
3181 let returned_keys = test_module_dbtx
3182 .find_by_prefix(&DbPrefixTestPrefix)
3183 .await
3184 .fold(0, |returned_keys, (key, value)| async move {
3185 match key {
3186 TestKey(100) => {
3187 assert!(value.eq(&TestVal(101)));
3188 }
3189 TestKey(101) => {
3190 assert!(value.eq(&TestVal(102)));
3191 }
3192 _ => {}
3193 };
3194 returned_keys + 1
3195 })
3196 .await;
3197
3198 assert_eq!(returned_keys, expected_keys);
3199
3200 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3201 assert_eq!(removed, Some(TestVal(101)));
3202 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3203
3204 let mut test_dbtx = db.begin_transaction().await;
3207 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3208
3209 test_dbtx.commit_tx().await;
3210 }
3211
3212 #[cfg(test)]
3213 #[tokio::test]
3214 pub async fn verify_test_migration() {
3215 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3217 let expected_test_keys_size: usize = 100;
3218 let mut dbtx = db.begin_transaction().await;
3219 for i in 0..expected_test_keys_size {
3220 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3221 .await;
3222 }
3223
3224 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3226 .await;
3227 dbtx.commit_tx().await;
3228
3229 let mut migrations: BTreeMap<DatabaseVersion, CoreMigrationFn> = BTreeMap::new();
3230
3231 migrations.insert(DatabaseVersion(0), |ctx| {
3232 migrate_test_db_version_0(ctx).boxed()
3233 });
3234
3235 apply_migrations(&db, "TestModule".to_string(), migrations, None, None)
3236 .await
3237 .expect("Error applying migrations for TestModule");
3238
3239 let mut dbtx = db.begin_transaction().await;
3241
3242 assert!(
3245 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3246 .await
3247 .is_some()
3248 );
3249
3250 let test_keys = dbtx
3252 .find_by_prefix(&DbPrefixTestPrefix)
3253 .await
3254 .collect::<Vec<_>>()
3255 .await;
3256 let test_keys_size = test_keys.len();
3257 assert_eq!(test_keys_size, expected_test_keys_size);
3258 for (key, val) in test_keys {
3259 assert_eq!(key.0, val.0 + 1);
3260 }
3261 }
3262
3263 #[allow(dead_code)]
3264 async fn migrate_test_db_version_0(mut ctx: MigrationContext<'_>) -> Result<(), anyhow::Error> {
3265 let mut dbtx = ctx.dbtx();
3266 let example_keys_v0 = dbtx
3267 .find_by_prefix(&DbPrefixTestPrefixV0)
3268 .await
3269 .collect::<Vec<_>>()
3270 .await;
3271 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3272 for (key, val) in example_keys_v0 {
3273 let key_v2 = TestKey(key.1);
3274 dbtx.insert_new_entry(&key_v2, &val).await;
3275 }
3276 Ok(())
3277 }
3278
3279 #[cfg(test)]
3280 #[tokio::test]
3281 async fn test_autocommit() {
3282 use std::marker::PhantomData;
3283 use std::ops::Range;
3284 use std::path::Path;
3285
3286 use anyhow::anyhow;
3287 use async_trait::async_trait;
3288
3289 use crate::ModuleDecoderRegistry;
3290 use crate::db::{
3291 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3292 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3293 IRawDatabaseTransaction,
3294 };
3295
3296 #[derive(Debug)]
3297 struct FakeDatabase;
3298
3299 #[async_trait]
3300 impl IRawDatabase for FakeDatabase {
3301 type Transaction<'a> = FakeTransaction<'a>;
3302 async fn begin_transaction(&self) -> FakeTransaction {
3303 FakeTransaction(PhantomData)
3304 }
3305
3306 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3307 Ok(())
3308 }
3309 }
3310
3311 #[derive(Debug)]
3312 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3313
3314 #[async_trait]
3315 impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3316 async fn raw_insert_bytes(
3317 &mut self,
3318 _key: &[u8],
3319 _value: &[u8],
3320 ) -> anyhow::Result<Option<Vec<u8>>> {
3321 unimplemented!()
3322 }
3323
3324 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3325 unimplemented!()
3326 }
3327
3328 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3329 unimplemented!()
3330 }
3331
3332 async fn raw_find_by_range(
3333 &mut self,
3334 _key_range: Range<&[u8]>,
3335 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3336 unimplemented!()
3337 }
3338
3339 async fn raw_find_by_prefix(
3340 &mut self,
3341 _key_prefix: &[u8],
3342 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3343 unimplemented!()
3344 }
3345
3346 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3347 unimplemented!()
3348 }
3349
3350 async fn raw_find_by_prefix_sorted_descending(
3351 &mut self,
3352 _key_prefix: &[u8],
3353 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3354 unimplemented!()
3355 }
3356 }
3357
3358 #[async_trait]
3359 impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3360 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3361 unimplemented!()
3362 }
3363
3364 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3365 unimplemented!()
3366 }
3367 }
3368
3369 #[async_trait]
3370 impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3371 async fn commit_tx(self) -> anyhow::Result<()> {
3372 Err(anyhow!("Can't commit!"))
3373 }
3374 }
3375
3376 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3377 let err = db
3378 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3379 .await
3380 .unwrap_err();
3381
3382 match err {
3383 AutocommitError::CommitFailed {
3384 attempts: failed_attempts,
3385 ..
3386 } => {
3387 assert_eq!(failed_attempts, 5);
3388 }
3389 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3390 }
3391 }
3392}
3393
3394pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3395 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3396 decoders: ModuleDecoderRegistry,
3397 key_prefix: &KP,
3398) -> impl Stream<
3399 Item = (
3400 KP::Record,
3401 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3402 ),
3403>
3404+ 'r
3405+ use<'r, KP>
3406where
3407 'inner: 'r,
3408 KP: DatabaseLookup,
3409 KP::Record: DatabaseKey,
3410{
3411 debug!(target: LOG_DB, "find by prefix sorted descending");
3412 let prefix_bytes = key_prefix.to_bytes();
3413 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3414 .await
3415 .expect("Error doing prefix search in database")
3416 .map(move |(key_bytes, value_bytes)| {
3417 let key = decode_key_expect(&key_bytes, &decoders);
3418 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3419 (key, value)
3420 })
3421}
3422
3423pub async fn verify_module_db_integrity_dbtx(
3424 dbtx: &mut DatabaseTransaction<'_>,
3425 module_id: ModuleInstanceId,
3426 module_kind: ModuleKind,
3427 prefixes: &BTreeSet<u8>,
3428) {
3429 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3430 if module_id < 250 {
3431 assert_eq!(module_db_prefix.len(), 2);
3432 }
3433 let mut records = dbtx
3434 .raw_find_by_prefix(&module_db_prefix)
3435 .await
3436 .expect("DB fail");
3437 while let Some((k, v)) = records.next().await {
3438 assert!(
3439 prefixes.contains(&k[module_db_prefix.len()]),
3440 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3441 k.as_hex(),
3442 v.as_hex()
3443 );
3444 }
3445}
3446
3447#[cfg(test)]
3448mod tests;