1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use anyhow::{Context, Result, bail};
117use bitcoin::hex::DisplayHex as _;
118use fedimint_core::util::BoxFuture;
119use fedimint_logging::LOG_DB;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::util::FmtCompactAnyhow as _;
133use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
134
135pub mod mem_impl;
136pub mod notifications;
137
138pub use test_utils::*;
139
140use self::notifications::{Notifications, NotifyQueue};
141use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
142
143pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
144
145pub trait DatabaseKeyPrefix: Debug {
146 fn to_bytes(&self) -> Vec<u8>;
147}
148
149pub trait DatabaseRecord: DatabaseKeyPrefix {
152 const DB_PREFIX: u8;
153 const NOTIFY_ON_MODIFY: bool = false;
154 type Key: DatabaseKey + Debug;
155 type Value: DatabaseValue + Debug;
156}
157
158pub trait DatabaseLookup: DatabaseKeyPrefix {
161 type Record: DatabaseRecord;
162}
163
164impl<Record> DatabaseLookup for Record
166where
167 Record: DatabaseRecord + Debug + Decodable + Encodable,
168{
169 type Record = Record;
170}
171
172pub trait DatabaseKey: Sized {
175 const NOTIFY_ON_MODIFY: bool = false;
183 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
184}
185
186pub trait DatabaseKeyWithNotify {}
188
189pub trait DatabaseValue: Sized + Debug {
191 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
192 fn to_bytes(&self) -> Vec<u8>;
193}
194
195pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
196
197pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
201
202#[derive(Debug, Error)]
204pub enum AutocommitError<E> {
205 #[error("Commit Failed: {last_error}")]
207 CommitFailed {
208 attempts: usize,
210 last_error: anyhow::Error,
212 },
213 #[error("Closure error: {error}")]
216 ClosureError {
217 attempts: usize,
223 error: E,
225 },
226}
227
228#[apply(async_trait_maybe_send!)]
237pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
238 type Transaction<'a>: IRawDatabaseTransaction + Debug;
240
241 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
243
244 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
246}
247
248#[apply(async_trait_maybe_send!)]
249impl<T> IRawDatabase for Box<T>
250where
251 T: IRawDatabase,
252{
253 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
254
255 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
256 (**self).begin_transaction().await
257 }
258
259 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
260 (**self).checkpoint(backup_path)
261 }
262}
263
264pub trait IRawDatabaseExt: IRawDatabase + Sized {
266 fn into_database(self) -> Database {
270 Database::new(self, ModuleRegistry::default())
271 }
272}
273
274impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
275
276impl<T> From<T> for Database
277where
278 T: IRawDatabase,
279{
280 fn from(raw: T) -> Self {
281 Self::new(raw, ModuleRegistry::default())
282 }
283}
284
285#[apply(async_trait_maybe_send!)]
288pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
289 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
291 async fn register(&self, key: &[u8]);
293 async fn notify(&self, key: &[u8]);
295
296 fn is_global(&self) -> bool;
299
300 fn checkpoint(&self, backup_path: &Path) -> Result<()>;
302}
303
304#[apply(async_trait_maybe_send!)]
305impl<T> IDatabase for Arc<T>
306where
307 T: IDatabase + ?Sized,
308{
309 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
310 (**self).begin_transaction().await
311 }
312 async fn register(&self, key: &[u8]) {
313 (**self).register(key).await;
314 }
315 async fn notify(&self, key: &[u8]) {
316 (**self).notify(key).await;
317 }
318
319 fn is_global(&self) -> bool {
320 (**self).is_global()
321 }
322
323 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
324 (**self).checkpoint(backup_path)
325 }
326}
327
328struct BaseDatabase<RawDatabase> {
332 notifications: Arc<Notifications>,
333 raw: RawDatabase,
334}
335
336impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 f.write_str("BaseDatabase")
339 }
340}
341
342#[apply(async_trait_maybe_send!)]
343impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
344 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
345 Box::new(BaseDatabaseTransaction::new(
346 self.raw.begin_transaction().await,
347 self.notifications.clone(),
348 ))
349 }
350 async fn register(&self, key: &[u8]) {
351 self.notifications.register(key).await;
352 }
353 async fn notify(&self, key: &[u8]) {
354 self.notifications.notify(key);
355 }
356
357 fn is_global(&self) -> bool {
358 true
359 }
360
361 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
362 self.raw.checkpoint(backup_path)
363 }
364}
365
366#[derive(Clone, Debug)]
372pub struct Database {
373 inner: Arc<dyn IDatabase + 'static>,
374 module_decoders: ModuleDecoderRegistry,
375}
376
377impl Database {
378 pub fn strong_count(&self) -> usize {
379 Arc::strong_count(&self.inner)
380 }
381
382 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
383 self.inner
384 }
385}
386
387impl Database {
388 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
393 let inner = BaseDatabase {
394 raw,
395 notifications: Arc::new(Notifications::new()),
396 };
397 Self::new_from_arc(
398 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
399 module_decoders,
400 )
401 }
402
403 pub fn new_from_arc(
405 inner: Arc<dyn IDatabase + 'static>,
406 module_decoders: ModuleDecoderRegistry,
407 ) -> Self {
408 Self {
409 inner,
410 module_decoders,
411 }
412 }
413
414 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
416 Self {
417 inner: Arc::new(PrefixDatabase {
418 inner: self.inner.clone(),
419 global_dbtx_access_token: None,
420 prefix,
421 }),
422 module_decoders: self.module_decoders.clone(),
423 }
424 }
425
426 pub fn with_prefix_module_id(
430 &self,
431 module_instance_id: ModuleInstanceId,
432 ) -> (Self, GlobalDBTxAccessToken) {
433 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
434 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
435 (
436 Self {
437 inner: Arc::new(PrefixDatabase {
438 inner: self.inner.clone(),
439 global_dbtx_access_token: Some(global_dbtx_access_token),
440 prefix,
441 }),
442 module_decoders: self.module_decoders.clone(),
443 },
444 global_dbtx_access_token,
445 )
446 }
447
448 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
449 Self {
450 inner: self.inner.clone(),
451 module_decoders,
452 }
453 }
454
455 pub fn is_global(&self) -> bool {
457 self.inner.is_global()
458 }
459
460 pub fn ensure_global(&self) -> Result<()> {
462 if !self.is_global() {
463 bail!("Database instance not global");
464 }
465
466 Ok(())
467 }
468
469 pub fn ensure_isolated(&self) -> Result<()> {
471 if self.is_global() {
472 bail!("Database instance not isolated");
473 }
474
475 Ok(())
476 }
477
478 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
480 where
481 's: 'tx,
482 {
483 DatabaseTransaction::<Committable>::new(
484 self.inner.begin_transaction().await,
485 self.module_decoders.clone(),
486 )
487 }
488
489 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
491 where
492 's: 'tx,
493 {
494 self.begin_transaction().await.into_nc()
495 }
496
497 pub fn checkpoint(&self, backup_path: &Path) -> Result<()> {
498 self.inner.checkpoint(backup_path)
499 }
500
501 pub async fn autocommit<'s, 'dbtx, F, T, E>(
529 &'s self,
530 tx_fn: F,
531 max_attempts: Option<usize>,
532 ) -> Result<T, AutocommitError<E>>
533 where
534 's: 'dbtx,
535 for<'r, 'o> F: Fn(
536 &'r mut DatabaseTransaction<'o>,
537 PhantomBound<'dbtx, 'o>,
538 ) -> BoxFuture<'r, Result<T, E>>,
539 {
540 assert_ne!(max_attempts, Some(0));
541 let mut curr_attempts: usize = 0;
542
543 loop {
544 curr_attempts = curr_attempts
549 .checked_add(1)
550 .expect("db autocommit attempt counter overflowed");
551
552 let mut dbtx = self.begin_transaction().await;
553
554 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
555 let val = match tx_fn_res {
556 Ok(val) => val,
557 Err(err) => {
558 dbtx.ignore_uncommitted();
559 return Err(AutocommitError::ClosureError {
560 attempts: curr_attempts,
561 error: err,
562 });
563 }
564 };
565
566 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
567
568 match dbtx.commit_tx_result().await {
569 Ok(()) => {
570 return Ok(val);
571 }
572 Err(err) => {
573 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
574 warn!(
575 target: LOG_DB,
576 curr_attempts,
577 ?err,
578 "Database commit failed in an autocommit block - terminating"
579 );
580 return Err(AutocommitError::CommitFailed {
581 attempts: curr_attempts,
582 last_error: err,
583 });
584 }
585
586 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
587 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
588 warn!(
589 target: LOG_DB,
590 curr_attempts,
591 err = %err.fmt_compact_anyhow(),
592 delay_ms = %delay,
593 "Database commit failed in an autocommit block - retrying"
594 );
595 crate::runtime::sleep(Duration::from_millis(delay)).await;
596 }
597 }
598 }
599 }
600
601 pub async fn wait_key_check<'a, K, T>(
606 &'a self,
607 key: &K,
608 checker: impl Fn(Option<K::Value>) -> Option<T>,
609 ) -> (T, DatabaseTransaction<'a, Committable>)
610 where
611 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
612 {
613 let key_bytes = key.to_bytes();
614 loop {
615 let notify = self.inner.register(&key_bytes);
617
618 let mut tx = self.inner.begin_transaction().await;
620
621 let maybe_value_bytes = tx
622 .raw_get_bytes(&key_bytes)
623 .await
624 .expect("Unrecoverable error when reading from database")
625 .map(|value_bytes| {
626 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
627 });
628
629 if let Some(value) = checker(maybe_value_bytes) {
630 return (
631 value,
632 DatabaseTransaction::new(tx, self.module_decoders.clone()),
633 );
634 }
635
636 notify.await;
638 }
641 }
642
643 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
645 where
646 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
647 {
648 self.wait_key_check(key, std::convert::identity).await.0
649 }
650}
651
652fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
653 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
654 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
655 bytes
656}
657
658#[derive(Clone, Debug)]
661struct PrefixDatabase<Inner>
662where
663 Inner: Debug,
664{
665 prefix: Vec<u8>,
666 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
667 inner: Inner,
668}
669
670impl<Inner> PrefixDatabase<Inner>
671where
672 Inner: Debug,
673{
674 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
678 let mut full_key = self.prefix.clone();
679 full_key.extend_from_slice(key);
680 full_key
681 }
682}
683
684#[apply(async_trait_maybe_send!)]
685impl<Inner> IDatabase for PrefixDatabase<Inner>
686where
687 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
688{
689 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
690 Box::new(PrefixDatabaseTransaction {
691 inner: self.inner.begin_transaction().await,
692 global_dbtx_access_token: self.global_dbtx_access_token,
693 prefix: self.prefix.clone(),
694 })
695 }
696 async fn register(&self, key: &[u8]) {
697 self.inner.register(&self.get_full_key(key)).await;
698 }
699
700 async fn notify(&self, key: &[u8]) {
701 self.inner.notify(&self.get_full_key(key)).await;
702 }
703
704 fn is_global(&self) -> bool {
705 if self.global_dbtx_access_token.is_some() {
706 false
707 } else {
708 self.inner.is_global()
709 }
710 }
711
712 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
713 self.inner.checkpoint(backup_path)
714 }
715}
716
717#[derive(Debug)]
722struct PrefixDatabaseTransaction<Inner> {
723 inner: Inner,
724 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
725 prefix: Vec<u8>,
726}
727
728impl<Inner> PrefixDatabaseTransaction<Inner> {
729 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
733 let mut full_key = self.prefix.clone();
734 full_key.extend_from_slice(key);
735 full_key
736 }
737
738 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
739 Range {
740 start: self.get_full_key(range.start),
741 end: self.get_full_key(range.end),
742 }
743 }
744
745 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
746 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
747 }
748}
749
750#[apply(async_trait_maybe_send!)]
751impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
752where
753 Inner: IDatabaseTransaction,
754{
755 async fn commit_tx(&mut self) -> Result<()> {
756 self.inner.commit_tx().await
757 }
758
759 fn is_global(&self) -> bool {
760 if self.global_dbtx_access_token.is_some() {
761 false
762 } else {
763 self.inner.is_global()
764 }
765 }
766
767 fn global_dbtx(
768 &mut self,
769 access_token: GlobalDBTxAccessToken,
770 ) -> &mut dyn IDatabaseTransaction {
771 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
772 assert_eq!(
773 access_token, self_global_dbtx_access_token,
774 "Invalid access key used to access global_dbtx"
775 );
776 &mut self.inner
777 } else {
778 self.inner.global_dbtx(access_token)
779 }
780 }
781}
782
783#[apply(async_trait_maybe_send!)]
784impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
785where
786 Inner: IDatabaseTransactionOpsCore,
787{
788 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
789 let key = self.get_full_key(key);
790 self.inner.raw_insert_bytes(&key, value).await
791 }
792
793 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794 let key = self.get_full_key(key);
795 self.inner.raw_get_bytes(&key).await
796 }
797
798 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
799 let key = self.get_full_key(key);
800 self.inner.raw_remove_entry(&key).await
801 }
802
803 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
804 let key = self.get_full_key(key_prefix);
805 let stream = self.inner.raw_find_by_prefix(&key).await?;
806 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
807 }
808
809 async fn raw_find_by_prefix_sorted_descending(
810 &mut self,
811 key_prefix: &[u8],
812 ) -> Result<PrefixStream<'_>> {
813 let key = self.get_full_key(key_prefix);
814 let stream = self
815 .inner
816 .raw_find_by_prefix_sorted_descending(&key)
817 .await?;
818 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
819 }
820
821 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
822 let range = self.get_full_range(range);
823 let stream = self
824 .inner
825 .raw_find_by_range(Range {
826 start: &range.start,
827 end: &range.end,
828 })
829 .await?;
830 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
831 }
832
833 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
834 let key = self.get_full_key(key_prefix);
835 self.inner.raw_remove_by_prefix(&key).await
836 }
837}
838
839#[apply(async_trait_maybe_send!)]
840impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
841where
842 Inner: IDatabaseTransactionOps,
843{
844 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
845 self.inner.rollback_tx_to_savepoint().await
846 }
847
848 async fn set_tx_savepoint(&mut self) -> Result<()> {
849 self.set_tx_savepoint().await
850 }
851}
852
853#[apply(async_trait_maybe_send!)]
857pub trait IDatabaseTransactionOpsCore: MaybeSend {
858 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
860
861 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
863
864 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
866
867 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
870
871 async fn raw_find_by_prefix_sorted_descending(
873 &mut self,
874 key_prefix: &[u8],
875 ) -> Result<PrefixStream<'_>>;
876
877 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>>;
881
882 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
884}
885
886#[apply(async_trait_maybe_send!)]
887impl<T> IDatabaseTransactionOpsCore for Box<T>
888where
889 T: IDatabaseTransactionOpsCore + ?Sized,
890{
891 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
892 (**self).raw_insert_bytes(key, value).await
893 }
894
895 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
896 (**self).raw_get_bytes(key).await
897 }
898
899 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
900 (**self).raw_remove_entry(key).await
901 }
902
903 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
904 (**self).raw_find_by_prefix(key_prefix).await
905 }
906
907 async fn raw_find_by_prefix_sorted_descending(
908 &mut self,
909 key_prefix: &[u8],
910 ) -> Result<PrefixStream<'_>> {
911 (**self)
912 .raw_find_by_prefix_sorted_descending(key_prefix)
913 .await
914 }
915
916 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
917 (**self).raw_find_by_range(range).await
918 }
919
920 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
921 (**self).raw_remove_by_prefix(key_prefix).await
922 }
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for &mut T
927where
928 T: IDatabaseTransactionOpsCore + ?Sized,
929{
930 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
931 (**self).raw_insert_bytes(key, value).await
932 }
933
934 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
935 (**self).raw_get_bytes(key).await
936 }
937
938 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
939 (**self).raw_remove_entry(key).await
940 }
941
942 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
943 (**self).raw_find_by_prefix(key_prefix).await
944 }
945
946 async fn raw_find_by_prefix_sorted_descending(
947 &mut self,
948 key_prefix: &[u8],
949 ) -> Result<PrefixStream<'_>> {
950 (**self)
951 .raw_find_by_prefix_sorted_descending(key_prefix)
952 .await
953 }
954
955 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
956 (**self).raw_find_by_range(range).await
957 }
958
959 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
960 (**self).raw_remove_by_prefix(key_prefix).await
961 }
962}
963
964#[apply(async_trait_maybe_send!)]
970pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
971 async fn set_tx_savepoint(&mut self) -> Result<()>;
980
981 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
982}
983
984#[apply(async_trait_maybe_send!)]
985impl<T> IDatabaseTransactionOps for Box<T>
986where
987 T: IDatabaseTransactionOps + ?Sized,
988{
989 async fn set_tx_savepoint(&mut self) -> Result<()> {
990 (**self).set_tx_savepoint().await
991 }
992
993 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
994 (**self).rollback_tx_to_savepoint().await
995 }
996}
997
998#[apply(async_trait_maybe_send!)]
999impl<T> IDatabaseTransactionOps for &mut T
1000where
1001 T: IDatabaseTransactionOps + ?Sized,
1002{
1003 async fn set_tx_savepoint(&mut self) -> Result<()> {
1004 (**self).set_tx_savepoint().await
1005 }
1006
1007 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1008 (**self).rollback_tx_to_savepoint().await
1009 }
1010}
1011
1012#[apply(async_trait_maybe_send!)]
1018pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1019 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1020 where
1021 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1022
1023 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1024 where
1025 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1026 K::Value: MaybeSend + MaybeSync;
1027
1028 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1029 where
1030 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1031 K::Value: MaybeSend + MaybeSync;
1032
1033 async fn find_by_range<K>(
1034 &mut self,
1035 key_range: Range<K>,
1036 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1037 where
1038 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1039 K::Value: MaybeSend + MaybeSync;
1040
1041 async fn find_by_prefix<KP>(
1042 &mut self,
1043 key_prefix: &KP,
1044 ) -> Pin<
1045 Box<
1046 maybe_add_send!(
1047 dyn Stream<
1048 Item = (
1049 KP::Record,
1050 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1051 ),
1052 > + '_
1053 ),
1054 >,
1055 >
1056 where
1057 KP: DatabaseLookup + MaybeSend + MaybeSync,
1058 KP::Record: DatabaseKey;
1059
1060 async fn find_by_prefix_sorted_descending<KP>(
1061 &mut self,
1062 key_prefix: &KP,
1063 ) -> Pin<
1064 Box<
1065 maybe_add_send!(
1066 dyn Stream<
1067 Item = (
1068 KP::Record,
1069 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1070 ),
1071 > + '_
1072 ),
1073 >,
1074 >
1075 where
1076 KP: DatabaseLookup + MaybeSend + MaybeSync,
1077 KP::Record: DatabaseKey;
1078
1079 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1080 where
1081 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1082
1083 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1084 where
1085 KP: DatabaseLookup + MaybeSend + MaybeSync;
1086}
1087
1088#[apply(async_trait_maybe_send!)]
1091impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
1092where
1093 T: IDatabaseTransactionOpsCore + WithDecoders,
1094{
1095 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1096 where
1097 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1098 {
1099 let key_bytes = key.to_bytes();
1100 let raw = self
1101 .raw_get_bytes(&key_bytes)
1102 .await
1103 .expect("Unrecoverable error occurred while reading and entry from the database");
1104 raw.map(|value_bytes| {
1105 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1106 })
1107 }
1108
1109 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1110 where
1111 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1112 K::Value: MaybeSend + MaybeSync,
1113 {
1114 let key_bytes = key.to_bytes();
1115 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1116 .await
1117 .expect("Unrecoverable error occurred while inserting entry into the database")
1118 .map(|value_bytes| {
1119 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1120 })
1121 }
1122
1123 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1124 where
1125 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1126 K::Value: MaybeSend + MaybeSync,
1127 {
1128 if let Some(prev) = self.insert_entry(key, value).await {
1129 panic!(
1130 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1131 );
1132 }
1133 }
1134
1135 async fn find_by_range<K>(
1136 &mut self,
1137 key_range: Range<K>,
1138 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1139 where
1140 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1141 K::Value: MaybeSend + MaybeSync,
1142 {
1143 let decoders = self.decoders().clone();
1144 Box::pin(
1145 self.raw_find_by_range(Range {
1146 start: &key_range.start.to_bytes(),
1147 end: &key_range.end.to_bytes(),
1148 })
1149 .await
1150 .expect("Unrecoverable error occurred while listing entries from the database")
1151 .map(move |(key_bytes, value_bytes)| {
1152 let key = decode_key_expect(&key_bytes, &decoders);
1153 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1154 (key, value)
1155 }),
1156 )
1157 }
1158
1159 async fn find_by_prefix<KP>(
1160 &mut self,
1161 key_prefix: &KP,
1162 ) -> Pin<
1163 Box<
1164 maybe_add_send!(
1165 dyn Stream<
1166 Item = (
1167 KP::Record,
1168 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1169 ),
1170 > + '_
1171 ),
1172 >,
1173 >
1174 where
1175 KP: DatabaseLookup + MaybeSend + MaybeSync,
1176 KP::Record: DatabaseKey,
1177 {
1178 let decoders = self.decoders().clone();
1179 Box::pin(
1180 self.raw_find_by_prefix(&key_prefix.to_bytes())
1181 .await
1182 .expect("Unrecoverable error occurred while listing entries from the database")
1183 .map(move |(key_bytes, value_bytes)| {
1184 let key = decode_key_expect(&key_bytes, &decoders);
1185 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1186 (key, value)
1187 }),
1188 )
1189 }
1190
1191 async fn find_by_prefix_sorted_descending<KP>(
1192 &mut self,
1193 key_prefix: &KP,
1194 ) -> Pin<
1195 Box<
1196 maybe_add_send!(
1197 dyn Stream<
1198 Item = (
1199 KP::Record,
1200 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1201 ),
1202 > + '_
1203 ),
1204 >,
1205 >
1206 where
1207 KP: DatabaseLookup + MaybeSend + MaybeSync,
1208 KP::Record: DatabaseKey,
1209 {
1210 let decoders = self.decoders().clone();
1211 Box::pin(
1212 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1213 .await
1214 .expect("Unrecoverable error occurred while listing entries from the database")
1215 .map(move |(key_bytes, value_bytes)| {
1216 let key = decode_key_expect(&key_bytes, &decoders);
1217 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1218 (key, value)
1219 }),
1220 )
1221 }
1222 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1223 where
1224 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1225 {
1226 let key_bytes = key.to_bytes();
1227 self.raw_remove_entry(&key_bytes)
1228 .await
1229 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1230 .map(|value_bytes| {
1231 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1232 })
1233 }
1234 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1235 where
1236 KP: DatabaseLookup + MaybeSend + MaybeSync,
1237 {
1238 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1239 .await
1240 .expect("Unrecoverable error when removing entries from the database");
1241 }
1242}
1243
1244pub trait WithDecoders {
1247 fn decoders(&self) -> &ModuleDecoderRegistry;
1248}
1249
1250#[apply(async_trait_maybe_send!)]
1252pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1253 async fn commit_tx(self) -> Result<()>;
1254}
1255
1256#[apply(async_trait_maybe_send!)]
1260pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1261 async fn commit_tx(&mut self) -> Result<()>;
1263
1264 fn is_global(&self) -> bool;
1266
1267 #[doc(hidden)]
1272 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1273 -> &mut dyn IDatabaseTransaction;
1274}
1275
1276#[apply(async_trait_maybe_send!)]
1277impl<T> IDatabaseTransaction for Box<T>
1278where
1279 T: IDatabaseTransaction + ?Sized,
1280{
1281 async fn commit_tx(&mut self) -> Result<()> {
1282 (**self).commit_tx().await
1283 }
1284
1285 fn is_global(&self) -> bool {
1286 (**self).is_global()
1287 }
1288
1289 fn global_dbtx(
1290 &mut self,
1291 access_token: GlobalDBTxAccessToken,
1292 ) -> &mut dyn IDatabaseTransaction {
1293 (**self).global_dbtx(access_token)
1294 }
1295}
1296
1297#[apply(async_trait_maybe_send!)]
1298impl<'a, T> IDatabaseTransaction for &'a mut T
1299where
1300 T: IDatabaseTransaction + ?Sized,
1301{
1302 async fn commit_tx(&mut self) -> Result<()> {
1303 (**self).commit_tx().await
1304 }
1305
1306 fn is_global(&self) -> bool {
1307 (**self).is_global()
1308 }
1309
1310 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1311 (**self).global_dbtx(access_key)
1312 }
1313}
1314
1315struct BaseDatabaseTransaction<Tx> {
1318 raw: Option<Tx>,
1320 notify_queue: Option<NotifyQueue>,
1321 notifications: Arc<Notifications>,
1322}
1323
1324impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1325where
1326 Tx: fmt::Debug,
1327{
1328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329 f.write_fmt(format_args!(
1330 "BaseDatabaseTransaction{{ raw={:?} }}",
1331 self.raw
1332 ))
1333 }
1334}
1335impl<Tx> BaseDatabaseTransaction<Tx>
1336where
1337 Tx: IRawDatabaseTransaction,
1338{
1339 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1340 Self {
1341 raw: Some(dbtx),
1342 notifications,
1343 notify_queue: Some(NotifyQueue::new()),
1344 }
1345 }
1346
1347 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1348 self.notify_queue
1349 .as_mut()
1350 .context("can not call add_notification_key after commit")?
1351 .add(&key);
1352 Ok(())
1353 }
1354}
1355
1356#[apply(async_trait_maybe_send!)]
1357impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1358 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1359 self.add_notification_key(key)?;
1360 self.raw
1361 .as_mut()
1362 .context("Cannot insert into already consumed transaction")?
1363 .raw_insert_bytes(key, value)
1364 .await
1365 }
1366
1367 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1368 self.raw
1369 .as_mut()
1370 .context("Cannot retrieve from already consumed transaction")?
1371 .raw_get_bytes(key)
1372 .await
1373 }
1374
1375 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1376 self.add_notification_key(key)?;
1377 self.raw
1378 .as_mut()
1379 .context("Cannot remove from already consumed transaction")?
1380 .raw_remove_entry(key)
1381 .await
1382 }
1383
1384 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1385 self.raw
1386 .as_mut()
1387 .context("Cannot retrieve from already consumed transaction")?
1388 .raw_find_by_range(key_range)
1389 .await
1390 }
1391
1392 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1393 self.raw
1394 .as_mut()
1395 .context("Cannot retrieve from already consumed transaction")?
1396 .raw_find_by_prefix(key_prefix)
1397 .await
1398 }
1399
1400 async fn raw_find_by_prefix_sorted_descending(
1401 &mut self,
1402 key_prefix: &[u8],
1403 ) -> Result<PrefixStream<'_>> {
1404 self.raw
1405 .as_mut()
1406 .context("Cannot retrieve from already consumed transaction")?
1407 .raw_find_by_prefix_sorted_descending(key_prefix)
1408 .await
1409 }
1410
1411 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1412 self.raw
1413 .as_mut()
1414 .context("Cannot remove from already consumed transaction")?
1415 .raw_remove_by_prefix(key_prefix)
1416 .await
1417 }
1418}
1419
1420#[apply(async_trait_maybe_send!)]
1421impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1422 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1423 self.raw
1424 .as_mut()
1425 .context("Cannot rollback to a savepoint on an already consumed transaction")?
1426 .rollback_tx_to_savepoint()
1427 .await?;
1428 Ok(())
1429 }
1430
1431 async fn set_tx_savepoint(&mut self) -> Result<()> {
1432 self.raw
1433 .as_mut()
1434 .context("Cannot set a tx savepoint on an already consumed transaction")?
1435 .set_tx_savepoint()
1436 .await?;
1437 Ok(())
1438 }
1439}
1440
1441#[apply(async_trait_maybe_send!)]
1442impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1443 for BaseDatabaseTransaction<Tx>
1444{
1445 async fn commit_tx(&mut self) -> Result<()> {
1446 self.raw
1447 .take()
1448 .context("Cannot commit an already committed transaction")?
1449 .commit_tx()
1450 .await?;
1451 self.notifications.submit_queue(
1452 &self
1453 .notify_queue
1454 .take()
1455 .expect("commit must be called only once"),
1456 );
1457 Ok(())
1458 }
1459
1460 fn is_global(&self) -> bool {
1461 true
1462 }
1463
1464 fn global_dbtx(
1465 &mut self,
1466 _access_token: GlobalDBTxAccessToken,
1467 ) -> &mut dyn IDatabaseTransaction {
1468 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1469 }
1470}
1471
1472#[derive(Clone)]
1475struct CommitTracker {
1476 is_committed: bool,
1478 has_writes: bool,
1480 ignore_uncommitted: bool,
1482}
1483
1484impl Drop for CommitTracker {
1485 fn drop(&mut self) {
1486 if self.has_writes && !self.is_committed {
1487 if self.ignore_uncommitted {
1488 trace!(
1489 target: LOG_DB,
1490 "DatabaseTransaction has writes and has not called commit, but that's expected."
1491 );
1492 } else {
1493 warn!(
1494 target: LOG_DB,
1495 location = ?backtrace::Backtrace::new(),
1496 "DatabaseTransaction has writes and has not called commit."
1497 );
1498 }
1499 }
1500 }
1501}
1502
1503enum MaybeRef<'a, T> {
1504 Owned(T),
1505 Borrowed(&'a mut T),
1506}
1507
1508impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1509 type Target = T;
1510
1511 fn deref(&self) -> &Self::Target {
1512 match self {
1513 MaybeRef::Owned(o) => o,
1514 MaybeRef::Borrowed(r) => r,
1515 }
1516 }
1517}
1518
1519impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1520 fn deref_mut(&mut self) -> &mut Self::Target {
1521 match self {
1522 MaybeRef::Owned(o) => o,
1523 MaybeRef::Borrowed(r) => r,
1524 }
1525 }
1526}
1527
1528pub struct Committable;
1532
1533pub struct NonCommittable;
1537
1538pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1542 tx: Box<dyn IDatabaseTransaction + 'tx>,
1543 decoders: ModuleDecoderRegistry,
1544 commit_tracker: MaybeRef<'tx, CommitTracker>,
1545 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1546 capability: marker::PhantomData<Cap>,
1547}
1548
1549impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1551 f.write_fmt(format_args!(
1552 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1553 self.tx, self.decoders
1554 ))
1555 }
1556}
1557
1558impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1559 fn decoders(&self) -> &ModuleDecoderRegistry {
1560 &self.decoders
1561 }
1562}
1563
1564#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1565fn decode_value<V: DatabaseValue>(
1566 value_bytes: &[u8],
1567 decoders: &ModuleDecoderRegistry,
1568) -> Result<V, DecodingError> {
1569 trace!(
1570 bytes = %AbbreviateHexBytes(value_bytes),
1571 "decoding value",
1572 );
1573 V::from_bytes(value_bytes, decoders)
1574}
1575
1576fn decode_value_expect<V: DatabaseValue>(
1577 value_bytes: &[u8],
1578 decoders: &ModuleDecoderRegistry,
1579 key_bytes: &[u8],
1580) -> V {
1581 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1582 panic!(
1583 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1584 any::type_name::<V>(),
1585 err,
1586 AbbreviateHexBytes(key_bytes),
1587 AbbreviateHexBytes(value_bytes),
1588 )
1589 })
1590}
1591
1592fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1593 trace!(
1594 bytes = %AbbreviateHexBytes(key_bytes),
1595 "decoding key",
1596 );
1597 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1598 panic!(
1599 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1600 any::type_name::<K>(),
1601 err,
1602 AbbreviateHexBytes(key_bytes)
1603 )
1604 })
1605}
1606
1607impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1608 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1610 DatabaseTransaction {
1611 tx: self.tx,
1612 decoders: self.decoders,
1613 commit_tracker: self.commit_tracker,
1614 on_commit_hooks: self.on_commit_hooks,
1615 capability: PhantomData::<NonCommittable>,
1616 }
1617 }
1618
1619 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1621 where
1622 's: 'a,
1623 {
1624 self.to_ref().into_nc()
1625 }
1626
1627 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1629 where
1630 'tx: 'a,
1631 {
1632 DatabaseTransaction {
1633 tx: Box::new(PrefixDatabaseTransaction {
1634 inner: self.tx,
1635 global_dbtx_access_token: None,
1636 prefix,
1637 }),
1638 decoders: self.decoders,
1639 commit_tracker: self.commit_tracker,
1640 on_commit_hooks: self.on_commit_hooks,
1641 capability: self.capability,
1642 }
1643 }
1644
1645 pub fn with_prefix_module_id<'a: 'tx>(
1649 self,
1650 module_instance_id: ModuleInstanceId,
1651 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1652 where
1653 'tx: 'a,
1654 {
1655 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1656 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1657 (
1658 DatabaseTransaction {
1659 tx: Box::new(PrefixDatabaseTransaction {
1660 inner: self.tx,
1661 global_dbtx_access_token: Some(global_dbtx_access_token),
1662 prefix,
1663 }),
1664 decoders: self.decoders,
1665 commit_tracker: self.commit_tracker,
1666 on_commit_hooks: self.on_commit_hooks,
1667 capability: self.capability,
1668 },
1669 global_dbtx_access_token,
1670 )
1671 }
1672
1673 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1675 where
1676 's: 'a,
1677 {
1678 let decoders = self.decoders.clone();
1679
1680 DatabaseTransaction {
1681 tx: Box::new(&mut self.tx),
1682 decoders,
1683 commit_tracker: match self.commit_tracker {
1684 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1685 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1686 },
1687 on_commit_hooks: match self.on_commit_hooks {
1688 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1689 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1690 },
1691 capability: self.capability,
1692 }
1693 }
1694
1695 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1697 where
1698 'tx: 'a,
1699 {
1700 DatabaseTransaction {
1701 tx: Box::new(PrefixDatabaseTransaction {
1702 inner: &mut self.tx,
1703 global_dbtx_access_token: None,
1704 prefix,
1705 }),
1706 decoders: self.decoders.clone(),
1707 commit_tracker: match self.commit_tracker {
1708 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1709 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1710 },
1711 on_commit_hooks: match self.on_commit_hooks {
1712 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1713 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1714 },
1715 capability: self.capability,
1716 }
1717 }
1718
1719 pub fn to_ref_with_prefix_module_id<'a>(
1720 &'a mut self,
1721 module_instance_id: ModuleInstanceId,
1722 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1723 where
1724 'tx: 'a,
1725 {
1726 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1727 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1728 (
1729 DatabaseTransaction {
1730 tx: Box::new(PrefixDatabaseTransaction {
1731 inner: &mut self.tx,
1732 global_dbtx_access_token: Some(global_dbtx_access_token),
1733 prefix,
1734 }),
1735 decoders: self.decoders.clone(),
1736 commit_tracker: match self.commit_tracker {
1737 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1738 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1739 },
1740 on_commit_hooks: match self.on_commit_hooks {
1741 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1742 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1743 },
1744 capability: self.capability,
1745 },
1746 global_dbtx_access_token,
1747 )
1748 }
1749
1750 pub fn is_global(&self) -> bool {
1752 self.tx.is_global()
1753 }
1754
1755 pub fn ensure_global(&self) -> Result<()> {
1757 if !self.is_global() {
1758 bail!("Database instance not global");
1759 }
1760
1761 Ok(())
1762 }
1763
1764 pub fn ensure_isolated(&self) -> Result<()> {
1766 if self.is_global() {
1767 bail!("Database instance not isolated");
1768 }
1769
1770 Ok(())
1771 }
1772
1773 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1775 self.commit_tracker.ignore_uncommitted = true;
1776 self
1777 }
1778
1779 pub fn warn_uncommitted(&mut self) -> &mut Self {
1781 self.commit_tracker.ignore_uncommitted = false;
1782 self
1783 }
1784
1785 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1787 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1788 self.on_commit_hooks.push(Box::new(f));
1789 }
1790
1791 pub fn global_dbtx<'a>(
1792 &'a mut self,
1793 access_token: GlobalDBTxAccessToken,
1794 ) -> DatabaseTransaction<'a, Cap>
1795 where
1796 'tx: 'a,
1797 {
1798 let decoders = self.decoders.clone();
1799
1800 DatabaseTransaction {
1801 tx: Box::new(self.tx.global_dbtx(access_token)),
1802 decoders,
1803 commit_tracker: match self.commit_tracker {
1804 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1805 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1806 },
1807 on_commit_hooks: match self.on_commit_hooks {
1808 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1809 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1810 },
1811 capability: self.capability,
1812 }
1813 }
1814}
1815
1816#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1818pub struct GlobalDBTxAccessToken(u32);
1819
1820impl GlobalDBTxAccessToken {
1821 fn from_prefix(prefix: &[u8]) -> Self {
1832 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1833 }
1834}
1835
1836impl<'tx> DatabaseTransaction<'tx, Committable> {
1837 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1838 Self {
1839 tx: dbtx,
1840 decoders,
1841 commit_tracker: MaybeRef::Owned(CommitTracker {
1842 is_committed: false,
1843 has_writes: false,
1844 ignore_uncommitted: false,
1845 }),
1846 on_commit_hooks: MaybeRef::Owned(vec![]),
1847 capability: PhantomData,
1848 }
1849 }
1850
1851 pub async fn commit_tx_result(mut self) -> Result<()> {
1852 self.commit_tracker.is_committed = true;
1853 let commit_result = self.tx.commit_tx().await;
1854
1855 if commit_result.is_ok() {
1857 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1858 hook();
1859 }
1860 }
1861
1862 commit_result
1863 }
1864
1865 pub async fn commit_tx(mut self) {
1866 self.commit_tracker.is_committed = true;
1867 self.commit_tx_result()
1868 .await
1869 .expect("Unrecoverable error occurred while committing to the database.");
1870 }
1871}
1872
1873#[apply(async_trait_maybe_send!)]
1874impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1875where
1876 Cap: Send,
1877{
1878 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1879 self.commit_tracker.has_writes = true;
1880 self.tx.raw_insert_bytes(key, value).await
1881 }
1882
1883 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1884 self.tx.raw_get_bytes(key).await
1885 }
1886
1887 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1888 self.tx.raw_remove_entry(key).await
1889 }
1890
1891 async fn raw_find_by_range(&mut self, key_range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
1892 self.tx.raw_find_by_range(key_range).await
1893 }
1894
1895 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1896 self.tx.raw_find_by_prefix(key_prefix).await
1897 }
1898
1899 async fn raw_find_by_prefix_sorted_descending(
1900 &mut self,
1901 key_prefix: &[u8],
1902 ) -> Result<PrefixStream<'_>> {
1903 self.tx
1904 .raw_find_by_prefix_sorted_descending(key_prefix)
1905 .await
1906 }
1907
1908 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1909 self.commit_tracker.has_writes = true;
1910 self.tx.raw_remove_by_prefix(key_prefix).await
1911 }
1912}
1913#[apply(async_trait_maybe_send!)]
1914impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1915 async fn set_tx_savepoint(&mut self) -> Result<()> {
1916 self.tx.set_tx_savepoint().await
1917 }
1918
1919 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1920 self.tx.rollback_tx_to_savepoint().await
1921 }
1922}
1923
1924impl<T> DatabaseKeyPrefix for T
1925where
1926 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1927{
1928 fn to_bytes(&self) -> Vec<u8> {
1929 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1930 data.append(&mut self.consensus_encode_to_vec());
1931 data
1932 }
1933}
1934
1935impl<T> DatabaseKey for T
1936where
1937 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1940{
1941 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1942 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1943 if data.is_empty() {
1944 return Err(DecodingError::wrong_length(1, 0));
1946 }
1947
1948 if data[0] != Self::DB_PREFIX {
1949 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1950 }
1951
1952 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1953 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1954 }
1955}
1956
1957impl<T> DatabaseValue for T
1958where
1959 T: Debug + Encodable + Decodable,
1960{
1961 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1962 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1963 }
1964
1965 fn to_bytes(&self) -> Vec<u8> {
1966 self.consensus_encode_to_vec()
1967 }
1968}
1969
1970#[macro_export]
2031macro_rules! impl_db_record {
2032 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2033 impl $crate::db::DatabaseRecord for $key {
2034 const DB_PREFIX: u8 = $db_prefix as u8;
2035 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2036 type Key = Self;
2037 type Value = $val;
2038 }
2039 $(
2040 impl_db_record! {
2041 @impl_notify_marker key = $key, notify_on_modify = $notify
2042 }
2043 )?
2044 };
2045 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2047 impl $crate::db::DatabaseKeyWithNotify for $key {}
2048 };
2049 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2051}
2052
2053#[macro_export]
2054macro_rules! impl_db_lookup{
2055 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2056 $(
2057 impl $crate::db::DatabaseLookup for $query_prefix {
2058 type Record = $key;
2059 }
2060 )*
2061 };
2062}
2063
2064#[derive(Debug, Encodable, Decodable, Serialize)]
2066pub struct DatabaseVersionKeyV0;
2067
2068#[derive(Debug, Encodable, Decodable, Serialize)]
2069pub struct DatabaseVersionKey(pub ModuleInstanceId);
2070
2071#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2072pub struct DatabaseVersion(pub u64);
2073
2074impl_db_record!(
2075 key = DatabaseVersionKeyV0,
2076 value = DatabaseVersion,
2077 db_prefix = DbKeyPrefix::DatabaseVersion
2078);
2079
2080impl_db_record!(
2081 key = DatabaseVersionKey,
2082 value = DatabaseVersion,
2083 db_prefix = DbKeyPrefix::DatabaseVersion
2084);
2085
2086impl std::fmt::Display for DatabaseVersion {
2087 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2088 write!(f, "{}", self.0)
2089 }
2090}
2091
2092impl DatabaseVersion {
2093 pub fn increment(&self) -> Self {
2094 Self(self.0 + 1)
2095 }
2096}
2097
2098impl std::fmt::Display for DbKeyPrefix {
2099 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2100 write!(f, "{self:?}")
2101 }
2102}
2103
2104#[repr(u8)]
2105#[derive(Clone, EnumIter, Debug)]
2106pub enum DbKeyPrefix {
2107 DatabaseVersion = 0x50,
2108 ClientBackup = 0x51,
2109}
2110
2111#[derive(Debug, Error)]
2112pub enum DecodingError {
2113 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2114 WrongPrefix { expected: u8, found: u8 },
2115 #[error("Key had a wrong length, expected {expected} but got {found}")]
2116 WrongLength { expected: usize, found: usize },
2117 #[error("Other decoding error: {0:#}")]
2118 Other(anyhow::Error),
2119}
2120
2121impl DecodingError {
2122 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2123 Self::Other(anyhow::Error::from(error))
2124 }
2125
2126 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2127 Self::WrongPrefix { expected, found }
2128 }
2129
2130 pub fn wrong_length(expected: usize, found: usize) -> Self {
2131 Self::WrongLength { expected, found }
2132 }
2133}
2134
2135#[macro_export]
2136macro_rules! push_db_pair_items {
2137 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2138 let db_items =
2139 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2140 .await
2141 .map(|(key, val)| {
2142 (
2143 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2144 val,
2145 )
2146 })
2147 .collect::<BTreeMap<String, $value_type>>()
2148 .await;
2149
2150 $map.insert($key_literal.to_string(), Box::new(db_items));
2151 };
2152}
2153
2154#[macro_export]
2155macro_rules! push_db_key_items {
2156 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2157 let db_items =
2158 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2159 .await
2160 .map(|(key, _)| key)
2161 .collect::<Vec<$key_type>>()
2162 .await;
2163
2164 $map.insert($key_literal.to_string(), Box::new(db_items));
2165 };
2166}
2167
2168pub struct DbMigrationFnContext<'tx, C> {
2182 dbtx: DatabaseTransaction<'tx>,
2183 module_instance_id: Option<ModuleInstanceId>,
2184 ctx: C,
2185 __please_use_constructor: (),
2186}
2187
2188impl<'tx, C> DbMigrationFnContext<'tx, C> {
2189 pub fn new(
2190 dbtx: DatabaseTransaction<'tx>,
2191 module_instance_id: Option<ModuleInstanceId>,
2192 ctx: C,
2193 ) -> Self {
2194 dbtx.ensure_global().expect("Must pass global dbtx");
2195 Self {
2196 dbtx,
2197 module_instance_id,
2198 ctx,
2199 __please_use_constructor: (),
2201 }
2202 }
2203
2204 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2205 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2206 }
2207
2208 #[doc(hidden)]
2210 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2211 let Self { dbtx, ctx, .. } = self;
2212
2213 (dbtx, ctx)
2214 }
2215
2216 pub fn dbtx(&mut self) -> DatabaseTransaction {
2217 if let Some(module_instance_id) = self.module_instance_id {
2218 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2219 } else {
2220 self.dbtx.to_ref_nc()
2221 }
2222 }
2223
2224 #[doc(hidden)]
2226 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2227 self.module_instance_id
2228 }
2229}
2230
2231pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2233pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2234
2235pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2240pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2241
2242pub type DbMigrationFn<C> = Box<
2253 maybe_add_send_sync!(
2254 dyn for<'tx> Fn(
2255 DbMigrationFnContext<'tx, C>,
2256 ) -> Pin<
2257 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2258 >
2259 ),
2260>;
2261
2262pub fn get_current_database_version<F>(
2266 migrations: &BTreeMap<DatabaseVersion, F>,
2267) -> DatabaseVersion {
2268 let versions = migrations.keys().copied().collect::<Vec<_>>();
2269
2270 if !versions
2273 .windows(2)
2274 .all(|window| window[0].increment() == window[1])
2275 {
2276 panic!("Database Migrations are not defined contiguously");
2277 }
2278
2279 versions
2280 .last()
2281 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2282}
2283
2284pub async fn apply_migrations<C>(
2285 db: &Database,
2286 ctx: C,
2287 kind: String,
2288 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2289 module_instance_id: Option<ModuleInstanceId>,
2290 external_prefixes_above: Option<u8>,
2293) -> Result<(), anyhow::Error>
2294where
2295 C: Clone,
2296{
2297 let mut dbtx = db.begin_transaction().await;
2298 apply_migrations_dbtx(
2299 &mut dbtx.to_ref_nc(),
2300 ctx,
2301 kind,
2302 migrations,
2303 module_instance_id,
2304 external_prefixes_above,
2305 )
2306 .await?;
2307
2308 dbtx.commit_tx_result().await
2309}
2310pub async fn apply_migrations_dbtx<C>(
2322 global_dbtx: &mut DatabaseTransaction<'_>,
2323 ctx: C,
2324 kind: String,
2325 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2326 module_instance_id: Option<ModuleInstanceId>,
2327 external_prefixes_above: Option<u8>,
2330) -> Result<(), anyhow::Error>
2331where
2332 C: Clone,
2333{
2334 let is_new_db = global_dbtx
2337 .raw_find_by_prefix(&[])
2338 .await?
2339 .filter(|(key, _v)| {
2340 std::future::ready(
2341 external_prefixes_above.is_none_or(|external_prefixes_above| {
2342 !key.is_empty() && key[0] < external_prefixes_above
2343 }),
2344 )
2345 })
2346 .next()
2347 .await
2348 .is_none();
2349
2350 let target_db_version = get_current_database_version(&migrations);
2351
2352 create_database_version_dbtx(
2354 global_dbtx,
2355 target_db_version,
2356 module_instance_id,
2357 kind.clone(),
2358 is_new_db,
2359 )
2360 .await?;
2361
2362 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2363
2364 let disk_version = global_dbtx
2365 .get_value(&DatabaseVersionKey(module_instance_id_key))
2366 .await;
2367
2368 let db_version = if let Some(disk_version) = disk_version {
2369 let mut current_db_version = disk_version;
2370
2371 if current_db_version > target_db_version {
2372 return Err(anyhow::anyhow!(format!(
2373 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2374 )));
2375 }
2376
2377 while current_db_version < target_db_version {
2378 if let Some(migration) = migrations.get(¤t_db_version) {
2379 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2380 migration(DbMigrationFnContext::new(
2381 global_dbtx.to_ref_nc(),
2382 module_instance_id,
2383 ctx.clone(),
2384 ))
2385 .await?;
2386 } else {
2387 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2388 }
2389
2390 current_db_version = current_db_version.increment();
2391
2392 global_dbtx
2393 .insert_entry(
2394 &DatabaseVersionKey(module_instance_id_key),
2395 ¤t_db_version,
2396 )
2397 .await;
2398 }
2399
2400 current_db_version
2401 } else {
2402 target_db_version
2403 };
2404
2405 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2406 Ok(())
2407}
2408
2409pub async fn create_database_version(
2410 db: &Database,
2411 target_db_version: DatabaseVersion,
2412 module_instance_id: Option<ModuleInstanceId>,
2413 kind: String,
2414 is_new_db: bool,
2415) -> Result<(), anyhow::Error> {
2416 let mut dbtx = db.begin_transaction().await;
2417
2418 create_database_version_dbtx(
2419 &mut dbtx.to_ref_nc(),
2420 target_db_version,
2421 module_instance_id,
2422 kind,
2423 is_new_db,
2424 )
2425 .await?;
2426
2427 dbtx.commit_tx_result().await?;
2428 Ok(())
2429}
2430
2431pub async fn create_database_version_dbtx(
2435 global_dbtx: &mut DatabaseTransaction<'_>,
2436 target_db_version: DatabaseVersion,
2437 module_instance_id: Option<ModuleInstanceId>,
2438 kind: String,
2439 is_new_db: bool,
2440) -> Result<(), anyhow::Error> {
2441 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2442
2443 if global_dbtx
2447 .get_value(&DatabaseVersionKey(key_module_instance_id))
2448 .await
2449 .is_none()
2450 {
2451 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2460 remove_current_db_version_if_exists(
2461 &mut global_dbtx
2462 .to_ref_with_prefix_module_id(module_instance_id)
2463 .0
2464 .into_nc(),
2465 is_new_db,
2466 target_db_version,
2467 )
2468 .await
2469 } else {
2470 remove_current_db_version_if_exists(
2471 &mut global_dbtx.to_ref().into_nc(),
2472 is_new_db,
2473 target_db_version,
2474 )
2475 .await
2476 };
2477
2478 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2480 global_dbtx
2481 .insert_new_entry(
2482 &DatabaseVersionKey(key_module_instance_id),
2483 ¤t_version_in_module,
2484 )
2485 .await;
2486 }
2487
2488 Ok(())
2489}
2490
2491async fn remove_current_db_version_if_exists(
2496 version_dbtx: &mut DatabaseTransaction<'_>,
2497 is_new_db: bool,
2498 target_db_version: DatabaseVersion,
2499) -> DatabaseVersion {
2500 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2504 match current_version_in_module {
2505 Some(database_version) => database_version,
2506 None if is_new_db => target_db_version,
2507 None => DatabaseVersion(0),
2508 }
2509}
2510
2511fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2514 module_instance_id.map_or_else(
2516 || MODULE_GLOBAL_PREFIX.into(),
2517 |module_instance_id| module_instance_id,
2518 )
2519}
2520#[allow(unused_imports)]
2521mod test_utils {
2522 use std::collections::BTreeMap;
2523 use std::time::Duration;
2524
2525 use fedimint_core::db::DbMigrationFnContext;
2526 use futures::future::ready;
2527 use futures::{Future, FutureExt, StreamExt};
2528 use rand::Rng;
2529 use tokio::join;
2530
2531 use super::{
2532 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2533 DbMigrationFn, apply_migrations,
2534 };
2535 use crate::core::ModuleKind;
2536 use crate::db::mem_impl::MemDatabase;
2537 use crate::db::{
2538 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2539 };
2540 use crate::encoding::{Decodable, Encodable};
2541 use crate::module::registry::ModuleDecoderRegistry;
2542
2543 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2544 crate::runtime::timeout(Duration::from_millis(10), fut)
2545 .await
2546 .ok()
2547 }
2548
2549 #[repr(u8)]
2550 #[derive(Clone)]
2551 pub enum TestDbKeyPrefix {
2552 Test = 0x42,
2553 AltTest = 0x43,
2554 PercentTestKey = 0x25,
2555 }
2556
2557 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2558 pub(super) struct TestKey(pub u64);
2559
2560 #[derive(Debug, Encodable, Decodable)]
2561 struct DbPrefixTestPrefix;
2562
2563 impl_db_record!(
2564 key = TestKey,
2565 value = TestVal,
2566 db_prefix = TestDbKeyPrefix::Test,
2567 notify_on_modify = true,
2568 );
2569 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2570
2571 #[derive(Debug, Encodable, Decodable)]
2572 struct TestKeyV0(u64, u64);
2573
2574 #[derive(Debug, Encodable, Decodable)]
2575 struct DbPrefixTestPrefixV0;
2576
2577 impl_db_record!(
2578 key = TestKeyV0,
2579 value = TestVal,
2580 db_prefix = TestDbKeyPrefix::Test,
2581 );
2582 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2583
2584 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2585 struct AltTestKey(u64);
2586
2587 #[derive(Debug, Encodable, Decodable)]
2588 struct AltDbPrefixTestPrefix;
2589
2590 impl_db_record!(
2591 key = AltTestKey,
2592 value = TestVal,
2593 db_prefix = TestDbKeyPrefix::AltTest,
2594 );
2595 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2596
2597 #[derive(Debug, Encodable, Decodable)]
2598 struct PercentTestKey(u64);
2599
2600 #[derive(Debug, Encodable, Decodable)]
2601 struct PercentPrefixTestPrefix;
2602
2603 impl_db_record!(
2604 key = PercentTestKey,
2605 value = TestVal,
2606 db_prefix = TestDbKeyPrefix::PercentTestKey,
2607 );
2608
2609 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2610 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2611 pub(super) struct TestVal(pub u64);
2612
2613 const TEST_MODULE_PREFIX: u16 = 1;
2614 const ALT_MODULE_PREFIX: u16 = 2;
2615
2616 pub async fn verify_insert_elements(db: Database) {
2617 let mut dbtx = db.begin_transaction().await;
2618 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2619 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2620 dbtx.commit_tx().await;
2621
2622 let mut dbtx = db.begin_transaction().await;
2624 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2625 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2626 dbtx.commit_tx().await;
2627
2628 let mut dbtx = db.begin_transaction().await;
2630 assert_eq!(
2631 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2632 Some(TestVal(2))
2633 );
2634 assert_eq!(
2635 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2636 Some(TestVal(3))
2637 );
2638 dbtx.commit_tx().await;
2639
2640 let mut dbtx = db.begin_transaction().await;
2641 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2642 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2643 dbtx.commit_tx().await;
2644 }
2645
2646 pub async fn verify_remove_nonexisting(db: Database) {
2647 let mut dbtx = db.begin_transaction().await;
2648 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2649 let removed = dbtx.remove_entry(&TestKey(1)).await;
2650 assert!(removed.is_none());
2651
2652 dbtx.commit_tx().await;
2654 }
2655
2656 pub async fn verify_remove_existing(db: Database) {
2657 let mut dbtx = db.begin_transaction().await;
2658
2659 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2660
2661 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2662
2663 let removed = dbtx.remove_entry(&TestKey(1)).await;
2664 assert_eq!(removed, Some(TestVal(2)));
2665 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2666
2667 dbtx.commit_tx().await;
2669 }
2670
2671 pub async fn verify_read_own_writes(db: Database) {
2672 let mut dbtx = db.begin_transaction().await;
2673
2674 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2675
2676 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2677
2678 dbtx.commit_tx().await;
2680 }
2681
2682 pub async fn verify_prevent_dirty_reads(db: Database) {
2683 let mut dbtx = db.begin_transaction().await;
2684
2685 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2686
2687 let mut dbtx2 = db.begin_transaction().await;
2689 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2690
2691 dbtx.commit_tx().await;
2693 }
2694
2695 pub async fn verify_find_by_range(db: Database) {
2696 let mut dbtx = db.begin_transaction().await;
2697 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2698 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2699 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2700
2701 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2702 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2703
2704 {
2705 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2706 module_dbtx
2707 .insert_entry(&TestKey(300), &TestVal(3000))
2708 .await;
2709 }
2710
2711 dbtx.commit_tx().await;
2712
2713 let mut dbtx = db.begin_transaction_nc().await;
2715
2716 let returned_keys = dbtx
2717 .find_by_range(TestKey(55)..TestKey(56))
2718 .await
2719 .collect::<Vec<_>>()
2720 .await;
2721
2722 let expected = vec![(TestKey(55), TestVal(9999))];
2723
2724 assert_eq!(returned_keys, expected);
2725
2726 let returned_keys = dbtx
2727 .find_by_range(TestKey(54)..TestKey(56))
2728 .await
2729 .collect::<Vec<_>>()
2730 .await;
2731
2732 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2733 assert_eq!(returned_keys, expected);
2734
2735 let returned_keys = dbtx
2736 .find_by_range(TestKey(54)..TestKey(57))
2737 .await
2738 .collect::<Vec<_>>()
2739 .await;
2740
2741 let expected = vec![
2742 (TestKey(54), TestVal(8888)),
2743 (TestKey(55), TestVal(9999)),
2744 (TestKey(56), TestVal(7777)),
2745 ];
2746 assert_eq!(returned_keys, expected);
2747
2748 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2749 let test_range = module_dbtx
2750 .find_by_range(TestKey(300)..TestKey(301))
2751 .await
2752 .collect::<Vec<_>>()
2753 .await;
2754 assert!(test_range.len() == 1);
2755 }
2756
2757 pub async fn verify_find_by_prefix(db: Database) {
2758 let mut dbtx = db.begin_transaction().await;
2759 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2760 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2761
2762 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2763 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2764 dbtx.commit_tx().await;
2765
2766 let mut dbtx = db.begin_transaction().await;
2768
2769 let returned_keys = dbtx
2770 .find_by_prefix(&DbPrefixTestPrefix)
2771 .await
2772 .collect::<Vec<_>>()
2773 .await;
2774
2775 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2776 assert_eq!(returned_keys, expected);
2777
2778 let reversed = dbtx
2779 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2780 .await
2781 .collect::<Vec<_>>()
2782 .await;
2783 let mut reversed_expected = expected;
2784 reversed_expected.reverse();
2785 assert_eq!(reversed, reversed_expected);
2786
2787 let returned_keys = dbtx
2788 .find_by_prefix(&AltDbPrefixTestPrefix)
2789 .await
2790 .collect::<Vec<_>>()
2791 .await;
2792
2793 let expected = vec![
2794 (AltTestKey(54), TestVal(6666)),
2795 (AltTestKey(55), TestVal(7777)),
2796 ];
2797 assert_eq!(returned_keys, expected);
2798
2799 let reversed = dbtx
2800 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2801 .await
2802 .collect::<Vec<_>>()
2803 .await;
2804 let mut reversed_expected = expected;
2805 reversed_expected.reverse();
2806 assert_eq!(reversed, reversed_expected);
2807 }
2808
2809 pub async fn verify_commit(db: Database) {
2810 let mut dbtx = db.begin_transaction().await;
2811
2812 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2813 dbtx.commit_tx().await;
2814
2815 let mut dbtx2 = db.begin_transaction().await;
2817 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2818 }
2819
2820 pub async fn verify_rollback_to_savepoint(db: Database) {
2821 let mut dbtx_rollback = db.begin_transaction().await;
2822
2823 dbtx_rollback
2824 .insert_entry(&TestKey(20), &TestVal(2000))
2825 .await;
2826
2827 dbtx_rollback
2828 .set_tx_savepoint()
2829 .await
2830 .expect("Error setting transaction savepoint");
2831
2832 dbtx_rollback
2833 .insert_entry(&TestKey(21), &TestVal(2001))
2834 .await;
2835
2836 assert_eq!(
2837 dbtx_rollback.get_value(&TestKey(20)).await,
2838 Some(TestVal(2000))
2839 );
2840 assert_eq!(
2841 dbtx_rollback.get_value(&TestKey(21)).await,
2842 Some(TestVal(2001))
2843 );
2844
2845 dbtx_rollback
2846 .rollback_tx_to_savepoint()
2847 .await
2848 .expect("Error setting transaction savepoint");
2849
2850 assert_eq!(
2851 dbtx_rollback.get_value(&TestKey(20)).await,
2852 Some(TestVal(2000))
2853 );
2854
2855 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2856
2857 dbtx_rollback.commit_tx().await;
2859 }
2860
2861 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2862 let mut dbtx = db.begin_transaction().await;
2863 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2864
2865 let mut dbtx2 = db.begin_transaction().await;
2866
2867 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2868
2869 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2870
2871 dbtx2.commit_tx().await;
2872
2873 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2876
2877 let expected_keys = 0;
2878 let returned_keys = dbtx
2879 .find_by_prefix(&DbPrefixTestPrefix)
2880 .await
2881 .fold(0, |returned_keys, (key, value)| async move {
2882 if key == TestKey(100) {
2883 assert!(value.eq(&TestVal(101)));
2884 }
2885 returned_keys + 1
2886 })
2887 .await;
2888
2889 assert_eq!(returned_keys, expected_keys);
2890 }
2891
2892 pub async fn verify_snapshot_isolation(db: Database) {
2893 async fn random_yield() {
2894 let times = if rand::thread_rng().gen_bool(0.5) {
2895 0
2896 } else {
2897 10
2898 };
2899 for _ in 0..times {
2900 tokio::task::yield_now().await;
2901 }
2902 }
2903
2904 for i in 0..1000 {
2906 let base_key = i * 2;
2907 let tx_accepted_key = base_key;
2908 let spent_input_key = base_key + 1;
2909
2910 join!(
2911 async {
2912 random_yield().await;
2913 let mut dbtx = db.begin_transaction().await;
2914
2915 random_yield().await;
2916 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2917 random_yield().await;
2918 let s = match i % 5 {
2921 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2922 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2923 2 => {
2924 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2925 .await
2926 }
2927 3 => {
2928 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2929 .await
2930 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2931 .map(|(_k, v)| v)
2932 .next()
2933 .await
2934 }
2935 4 => {
2936 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2937 .await
2938 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2939 .map(|(_k, v)| v)
2940 .next()
2941 .await
2942 }
2943 _ => {
2944 panic!("woot?");
2945 }
2946 };
2947
2948 match (a, s) {
2949 (None, None) | (Some(_), Some(_)) => {}
2950 (None, Some(_)) => panic!("none some?! {i}"),
2951 (Some(_), None) => panic!("some none?! {i}"),
2952 }
2953 },
2954 async {
2955 random_yield().await;
2956
2957 let mut dbtx = db.begin_transaction().await;
2958 random_yield().await;
2959 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2960
2961 random_yield().await;
2962 assert_eq!(
2963 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2964 .await,
2965 None
2966 );
2967
2968 random_yield().await;
2969 assert_eq!(
2970 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2971 .await,
2972 None
2973 );
2974 random_yield().await;
2975 dbtx.commit_tx().await;
2976 }
2977 );
2978 }
2979 }
2980
2981 pub async fn verify_phantom_entry(db: Database) {
2982 let mut dbtx = db.begin_transaction().await;
2983
2984 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2985
2986 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2987
2988 dbtx.commit_tx().await;
2989
2990 let mut dbtx = db.begin_transaction().await;
2991 let expected_keys = 2;
2992 let returned_keys = dbtx
2993 .find_by_prefix(&DbPrefixTestPrefix)
2994 .await
2995 .fold(0, |returned_keys, (key, value)| async move {
2996 match key {
2997 TestKey(100) => {
2998 assert!(value.eq(&TestVal(101)));
2999 }
3000 TestKey(101) => {
3001 assert!(value.eq(&TestVal(102)));
3002 }
3003 _ => {}
3004 };
3005 returned_keys + 1
3006 })
3007 .await;
3008
3009 assert_eq!(returned_keys, expected_keys);
3010
3011 let mut dbtx2 = db.begin_transaction().await;
3012
3013 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3014
3015 dbtx2.commit_tx().await;
3016
3017 let returned_keys = dbtx
3018 .find_by_prefix(&DbPrefixTestPrefix)
3019 .await
3020 .fold(0, |returned_keys, (key, value)| async move {
3021 match key {
3022 TestKey(100) => {
3023 assert!(value.eq(&TestVal(101)));
3024 }
3025 TestKey(101) => {
3026 assert!(value.eq(&TestVal(102)));
3027 }
3028 _ => {}
3029 };
3030 returned_keys + 1
3031 })
3032 .await;
3033
3034 assert_eq!(returned_keys, expected_keys);
3035 }
3036
3037 pub async fn expect_write_conflict(db: Database) {
3038 let mut dbtx = db.begin_transaction().await;
3039 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3040 dbtx.commit_tx().await;
3041
3042 let mut dbtx2 = db.begin_transaction().await;
3043 let mut dbtx3 = db.begin_transaction().await;
3044
3045 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3046
3047 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3051
3052 dbtx2.commit_tx().await;
3053 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3054 }
3055
3056 pub async fn verify_string_prefix(db: Database) {
3057 let mut dbtx = db.begin_transaction().await;
3058 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3059
3060 assert_eq!(
3061 dbtx.get_value(&PercentTestKey(100)).await,
3062 Some(TestVal(101))
3063 );
3064
3065 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3066
3067 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3068
3069 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3070
3071 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3074
3075 let expected_keys = 4;
3076 let returned_keys = dbtx
3077 .find_by_prefix(&PercentPrefixTestPrefix)
3078 .await
3079 .fold(0, |returned_keys, (key, value)| async move {
3080 if matches!(key, PercentTestKey(101)) {
3081 assert!(value.eq(&TestVal(100)));
3082 }
3083 returned_keys + 1
3084 })
3085 .await;
3086
3087 assert_eq!(returned_keys, expected_keys);
3088 }
3089
3090 pub async fn verify_remove_by_prefix(db: Database) {
3091 let mut dbtx = db.begin_transaction().await;
3092
3093 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3094
3095 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3096
3097 dbtx.commit_tx().await;
3098
3099 let mut remove_dbtx = db.begin_transaction().await;
3100 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3101 remove_dbtx.commit_tx().await;
3102
3103 let mut dbtx = db.begin_transaction().await;
3104 let expected_keys = 0;
3105 let returned_keys = dbtx
3106 .find_by_prefix(&DbPrefixTestPrefix)
3107 .await
3108 .fold(0, |returned_keys, (key, value)| async move {
3109 match key {
3110 TestKey(100) => {
3111 assert!(value.eq(&TestVal(101)));
3112 }
3113 TestKey(101) => {
3114 assert!(value.eq(&TestVal(102)));
3115 }
3116 _ => {}
3117 };
3118 returned_keys + 1
3119 })
3120 .await;
3121
3122 assert_eq!(returned_keys, expected_keys);
3123 }
3124
3125 pub async fn verify_module_db(db: Database, module_db: Database) {
3126 let mut dbtx = db.begin_transaction().await;
3127
3128 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3129
3130 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3131
3132 dbtx.commit_tx().await;
3133
3134 let mut module_dbtx = module_db.begin_transaction().await;
3136 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3137
3138 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3139
3140 let mut dbtx = db.begin_transaction().await;
3142 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3143
3144 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3145
3146 let mut module_dbtx = module_db.begin_transaction().await;
3147
3148 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3149
3150 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3151
3152 module_dbtx.commit_tx().await;
3153
3154 let expected_keys = 2;
3155 let mut dbtx = db.begin_transaction().await;
3156 let returned_keys = dbtx
3157 .find_by_prefix(&DbPrefixTestPrefix)
3158 .await
3159 .fold(0, |returned_keys, (key, value)| async move {
3160 match key {
3161 TestKey(100) => {
3162 assert!(value.eq(&TestVal(101)));
3163 }
3164 TestKey(101) => {
3165 assert!(value.eq(&TestVal(102)));
3166 }
3167 _ => {}
3168 };
3169 returned_keys + 1
3170 })
3171 .await;
3172
3173 assert_eq!(returned_keys, expected_keys);
3174
3175 let removed = dbtx.remove_entry(&TestKey(100)).await;
3176 assert_eq!(removed, Some(TestVal(101)));
3177 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3178
3179 let mut module_dbtx = module_db.begin_transaction().await;
3180 assert_eq!(
3181 module_dbtx.get_value(&TestKey(100)).await,
3182 Some(TestVal(103))
3183 );
3184 }
3185
3186 pub async fn verify_module_prefix(db: Database) {
3187 let mut test_dbtx = db.begin_transaction().await;
3188 {
3189 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3190
3191 test_module_dbtx
3192 .insert_entry(&TestKey(100), &TestVal(101))
3193 .await;
3194
3195 test_module_dbtx
3196 .insert_entry(&TestKey(101), &TestVal(102))
3197 .await;
3198 }
3199
3200 test_dbtx.commit_tx().await;
3201
3202 let mut alt_dbtx = db.begin_transaction().await;
3203 {
3204 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3205
3206 alt_module_dbtx
3207 .insert_entry(&TestKey(100), &TestVal(103))
3208 .await;
3209
3210 alt_module_dbtx
3211 .insert_entry(&TestKey(101), &TestVal(104))
3212 .await;
3213 }
3214
3215 alt_dbtx.commit_tx().await;
3216
3217 let mut test_dbtx = db.begin_transaction().await;
3219 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3220 assert_eq!(
3221 test_module_dbtx.get_value(&TestKey(100)).await,
3222 Some(TestVal(101))
3223 );
3224
3225 assert_eq!(
3226 test_module_dbtx.get_value(&TestKey(101)).await,
3227 Some(TestVal(102))
3228 );
3229
3230 let expected_keys = 2;
3231 let returned_keys = test_module_dbtx
3232 .find_by_prefix(&DbPrefixTestPrefix)
3233 .await
3234 .fold(0, |returned_keys, (key, value)| async move {
3235 match key {
3236 TestKey(100) => {
3237 assert!(value.eq(&TestVal(101)));
3238 }
3239 TestKey(101) => {
3240 assert!(value.eq(&TestVal(102)));
3241 }
3242 _ => {}
3243 };
3244 returned_keys + 1
3245 })
3246 .await;
3247
3248 assert_eq!(returned_keys, expected_keys);
3249
3250 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3251 assert_eq!(removed, Some(TestVal(101)));
3252 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3253
3254 let mut test_dbtx = db.begin_transaction().await;
3257 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3258
3259 test_dbtx.commit_tx().await;
3260 }
3261
3262 #[cfg(test)]
3263 #[tokio::test]
3264 pub async fn verify_test_migration() {
3265 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3267 let expected_test_keys_size: usize = 100;
3268 let mut dbtx = db.begin_transaction().await;
3269 for i in 0..expected_test_keys_size {
3270 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3271 .await;
3272 }
3273
3274 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3276 .await;
3277 dbtx.commit_tx().await;
3278
3279 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3280
3281 migrations.insert(
3282 DatabaseVersion(0),
3283 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3284 );
3285
3286 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3287 .await
3288 .expect("Error applying migrations for TestModule");
3289
3290 let mut dbtx = db.begin_transaction().await;
3292
3293 assert!(
3296 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3297 .await
3298 .is_some()
3299 );
3300
3301 let test_keys = dbtx
3303 .find_by_prefix(&DbPrefixTestPrefix)
3304 .await
3305 .collect::<Vec<_>>()
3306 .await;
3307 let test_keys_size = test_keys.len();
3308 assert_eq!(test_keys_size, expected_test_keys_size);
3309 for (key, val) in test_keys {
3310 assert_eq!(key.0, val.0 + 1);
3311 }
3312 }
3313
3314 #[allow(dead_code)]
3315 async fn migrate_test_db_version_0(
3316 mut ctx: DbMigrationFnContext<'_, ()>,
3317 ) -> Result<(), anyhow::Error> {
3318 let mut dbtx = ctx.dbtx();
3319 let example_keys_v0 = dbtx
3320 .find_by_prefix(&DbPrefixTestPrefixV0)
3321 .await
3322 .collect::<Vec<_>>()
3323 .await;
3324 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3325 for (key, val) in example_keys_v0 {
3326 let key_v2 = TestKey(key.1);
3327 dbtx.insert_new_entry(&key_v2, &val).await;
3328 }
3329 Ok(())
3330 }
3331
3332 #[cfg(test)]
3333 #[tokio::test]
3334 async fn test_autocommit() {
3335 use std::marker::PhantomData;
3336 use std::ops::Range;
3337 use std::path::Path;
3338
3339 use anyhow::anyhow;
3340 use async_trait::async_trait;
3341
3342 use crate::ModuleDecoderRegistry;
3343 use crate::db::{
3344 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
3345 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
3346 IRawDatabaseTransaction,
3347 };
3348
3349 #[derive(Debug)]
3350 struct FakeDatabase;
3351
3352 #[async_trait]
3353 impl IRawDatabase for FakeDatabase {
3354 type Transaction<'a> = FakeTransaction<'a>;
3355 async fn begin_transaction(&self) -> FakeTransaction {
3356 FakeTransaction(PhantomData)
3357 }
3358
3359 fn checkpoint(&self, _backup_path: &Path) -> anyhow::Result<()> {
3360 Ok(())
3361 }
3362 }
3363
3364 #[derive(Debug)]
3365 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3366
3367 #[async_trait]
3368 impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
3369 async fn raw_insert_bytes(
3370 &mut self,
3371 _key: &[u8],
3372 _value: &[u8],
3373 ) -> anyhow::Result<Option<Vec<u8>>> {
3374 unimplemented!()
3375 }
3376
3377 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3378 unimplemented!()
3379 }
3380
3381 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
3382 unimplemented!()
3383 }
3384
3385 async fn raw_find_by_range(
3386 &mut self,
3387 _key_range: Range<&[u8]>,
3388 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3389 unimplemented!()
3390 }
3391
3392 async fn raw_find_by_prefix(
3393 &mut self,
3394 _key_prefix: &[u8],
3395 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3396 unimplemented!()
3397 }
3398
3399 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
3400 unimplemented!()
3401 }
3402
3403 async fn raw_find_by_prefix_sorted_descending(
3404 &mut self,
3405 _key_prefix: &[u8],
3406 ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
3407 unimplemented!()
3408 }
3409 }
3410
3411 #[async_trait]
3412 impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
3413 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
3414 unimplemented!()
3415 }
3416
3417 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
3418 unimplemented!()
3419 }
3420 }
3421
3422 #[async_trait]
3423 impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
3424 async fn commit_tx(self) -> anyhow::Result<()> {
3425 Err(anyhow!("Can't commit!"))
3426 }
3427 }
3428
3429 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3430 let err = db
3431 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3432 .await
3433 .unwrap_err();
3434
3435 match err {
3436 AutocommitError::CommitFailed {
3437 attempts: failed_attempts,
3438 ..
3439 } => {
3440 assert_eq!(failed_attempts, 5);
3441 }
3442 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3443 }
3444 }
3445}
3446
3447pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3448 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3449 decoders: ModuleDecoderRegistry,
3450 key_prefix: &KP,
3451) -> impl Stream<
3452 Item = (
3453 KP::Record,
3454 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3455 ),
3456>
3457+ 'r
3458+ use<'r, KP>
3459where
3460 'inner: 'r,
3461 KP: DatabaseLookup,
3462 KP::Record: DatabaseKey,
3463{
3464 debug!(target: LOG_DB, "find by prefix sorted descending");
3465 let prefix_bytes = key_prefix.to_bytes();
3466 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3467 .await
3468 .expect("Error doing prefix search in database")
3469 .map(move |(key_bytes, value_bytes)| {
3470 let key = decode_key_expect(&key_bytes, &decoders);
3471 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3472 (key, value)
3473 })
3474}
3475
3476pub async fn verify_module_db_integrity_dbtx(
3477 dbtx: &mut DatabaseTransaction<'_>,
3478 module_id: ModuleInstanceId,
3479 module_kind: ModuleKind,
3480 prefixes: &BTreeSet<u8>,
3481) {
3482 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3483 if module_id < 250 {
3484 assert_eq!(module_db_prefix.len(), 2);
3485 }
3486 let mut records = dbtx
3487 .raw_find_by_prefix(&module_db_prefix)
3488 .await
3489 .expect("DB fail");
3490 while let Some((k, v)) = records.next().await {
3491 assert!(
3492 prefixes.contains(&k[module_db_prefix.len()]),
3493 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3494 k.as_hex(),
3495 v.as_hex()
3496 );
3497 }
3498}
3499
3500#[cfg(test)]
3501mod tests;