1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use bitcoin::hex::DisplayHex as _;
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use fedimint_util_error::FmtCompact as _;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144pub type DatabaseResult<T> = std::result::Result<T, DatabaseError>;
146
147pub trait DatabaseKeyPrefix: Debug {
148 fn to_bytes(&self) -> Vec<u8>;
149}
150
151pub trait DatabaseRecord: DatabaseKeyPrefix {
154 const DB_PREFIX: u8;
155 const NOTIFY_ON_MODIFY: bool = false;
156 type Key: DatabaseKey + Debug;
157 type Value: DatabaseValue + Debug;
158}
159
160pub trait DatabaseLookup: DatabaseKeyPrefix {
163 type Record: DatabaseRecord;
164}
165
166impl<Record> DatabaseLookup for Record
168where
169 Record: DatabaseRecord + Debug + Decodable + Encodable,
170{
171 type Record = Record;
172}
173
174pub trait DatabaseKey: Sized {
177 const NOTIFY_ON_MODIFY: bool = false;
185 fn from_bytes(
186 data: &[u8],
187 modules: &ModuleDecoderRegistry,
188 ) -> std::result::Result<Self, DecodingError>;
189}
190
191pub trait DatabaseKeyWithNotify {}
193
194pub trait DatabaseValue: Sized + Debug {
196 fn from_bytes(
197 data: &[u8],
198 modules: &ModuleDecoderRegistry,
199 ) -> std::result::Result<Self, DecodingError>;
200 fn to_bytes(&self) -> Vec<u8>;
201}
202
203pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
204
205pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
209
210#[derive(Debug, Error)]
212pub enum AutocommitError<E> {
213 #[error("Commit Failed: {last_error}")]
215 CommitFailed {
216 attempts: usize,
218 last_error: DatabaseError,
220 },
221 #[error("Closure error: {error}")]
224 ClosureError {
225 attempts: usize,
231 error: E,
233 },
234}
235
236pub trait AutocommitResultExt<T, E> {
237 fn unwrap_autocommit(self) -> std::result::Result<T, E>;
241}
242
243impl<T, E> AutocommitResultExt<T, E> for std::result::Result<T, AutocommitError<E>> {
244 fn unwrap_autocommit(self) -> std::result::Result<T, E> {
245 match self {
246 Ok(value) => Ok(value),
247 Err(AutocommitError::CommitFailed { .. }) => {
248 panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
249 }
250 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
251 }
252 }
253}
254
255#[apply(async_trait_maybe_send!)]
264pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
265 type Transaction<'a>: IRawDatabaseTransaction + Debug;
267
268 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
270
271 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
273}
274
275#[apply(async_trait_maybe_send!)]
276impl<T> IRawDatabase for Box<T>
277where
278 T: IRawDatabase,
279{
280 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
281
282 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
283 (**self).begin_transaction().await
284 }
285
286 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287 (**self).checkpoint(backup_path)
288 }
289}
290
291pub trait IRawDatabaseExt: IRawDatabase + Sized {
293 fn into_database(self) -> Database {
297 Database::new(self, ModuleRegistry::default())
298 }
299}
300
301impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
302
303impl<T> From<T> for Database
304where
305 T: IRawDatabase,
306{
307 fn from(raw: T) -> Self {
308 Self::new(raw, ModuleRegistry::default())
309 }
310}
311
312#[apply(async_trait_maybe_send!)]
315pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
316 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
318 async fn register(&self, key: &[u8]);
320 async fn notify(&self, key: &[u8]);
322
323 fn is_global(&self) -> bool;
326
327 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
329}
330
331#[apply(async_trait_maybe_send!)]
332impl<T> IDatabase for Arc<T>
333where
334 T: IDatabase + ?Sized,
335{
336 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
337 (**self).begin_transaction().await
338 }
339 async fn register(&self, key: &[u8]) {
340 (**self).register(key).await;
341 }
342 async fn notify(&self, key: &[u8]) {
343 (**self).notify(key).await;
344 }
345
346 fn is_global(&self) -> bool {
347 (**self).is_global()
348 }
349
350 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
351 (**self).checkpoint(backup_path)
352 }
353}
354
355struct BaseDatabase<RawDatabase> {
359 notifications: Arc<Notifications>,
360 raw: RawDatabase,
361}
362
363impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 f.write_str("BaseDatabase")
366 }
367}
368
369#[apply(async_trait_maybe_send!)]
370impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
371 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
372 Box::new(BaseDatabaseTransaction::new(
373 self.raw.begin_transaction().await,
374 self.notifications.clone(),
375 ))
376 }
377 async fn register(&self, key: &[u8]) {
378 self.notifications.register(key).await;
379 }
380 async fn notify(&self, key: &[u8]) {
381 self.notifications.notify(key);
382 }
383
384 fn is_global(&self) -> bool {
385 true
386 }
387
388 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
389 self.raw.checkpoint(backup_path)
390 }
391}
392
393#[derive(Clone, Debug)]
399pub struct Database {
400 inner: Arc<dyn IDatabase + 'static>,
401 module_decoders: ModuleDecoderRegistry,
402}
403
404impl Database {
405 pub fn strong_count(&self) -> usize {
406 Arc::strong_count(&self.inner)
407 }
408
409 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
410 self.inner
411 }
412}
413
414impl Database {
415 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
420 let inner = BaseDatabase {
421 raw,
422 notifications: Arc::new(Notifications::new()),
423 };
424 Self::new_from_arc(
425 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
426 module_decoders,
427 )
428 }
429
430 pub fn new_from_arc(
432 inner: Arc<dyn IDatabase + 'static>,
433 module_decoders: ModuleDecoderRegistry,
434 ) -> Self {
435 Self {
436 inner,
437 module_decoders,
438 }
439 }
440
441 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
443 Self {
444 inner: Arc::new(PrefixDatabase {
445 inner: self.inner.clone(),
446 global_dbtx_access_token: None,
447 prefix,
448 }),
449 module_decoders: self.module_decoders.clone(),
450 }
451 }
452
453 pub fn with_prefix_module_id(
457 &self,
458 module_instance_id: ModuleInstanceId,
459 ) -> (Self, GlobalDBTxAccessToken) {
460 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
461 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
462 (
463 Self {
464 inner: Arc::new(PrefixDatabase {
465 inner: self.inner.clone(),
466 global_dbtx_access_token: Some(global_dbtx_access_token),
467 prefix,
468 }),
469 module_decoders: self.module_decoders.clone(),
470 },
471 global_dbtx_access_token,
472 )
473 }
474
475 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
476 Self {
477 inner: self.inner.clone(),
478 module_decoders,
479 }
480 }
481
482 pub fn is_global(&self) -> bool {
484 self.inner.is_global()
485 }
486
487 pub fn ensure_global(&self) -> DatabaseResult<()> {
489 if !self.is_global() {
490 return Err(DatabaseError::Other(anyhow::anyhow!(
491 "Database instance not global"
492 )));
493 }
494
495 Ok(())
496 }
497
498 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
500 if self.is_global() {
501 return Err(DatabaseError::Other(anyhow::anyhow!(
502 "Database instance not isolated"
503 )));
504 }
505
506 Ok(())
507 }
508
509 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
511 where
512 's: 'tx,
513 {
514 DatabaseTransaction::<Committable>::new(
515 self.inner.begin_transaction().await,
516 self.module_decoders.clone(),
517 )
518 }
519
520 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
522 where
523 's: 'tx,
524 {
525 self.begin_transaction().await.into_nc()
526 }
527
528 pub fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
529 self.inner.checkpoint(backup_path)
530 }
531
532 pub async fn autocommit<'s, 'dbtx, F, T, E>(
560 &'s self,
561 tx_fn: F,
562 max_attempts: Option<usize>,
563 ) -> std::result::Result<T, AutocommitError<E>>
564 where
565 's: 'dbtx,
566 for<'r, 'o> F: Fn(
567 &'r mut DatabaseTransaction<'o>,
568 PhantomBound<'dbtx, 'o>,
569 ) -> BoxFuture<'r, std::result::Result<T, E>>,
570 {
571 assert_ne!(max_attempts, Some(0));
572 let mut curr_attempts: usize = 0;
573
574 loop {
575 curr_attempts = curr_attempts
580 .checked_add(1)
581 .expect("db autocommit attempt counter overflowed");
582
583 let mut dbtx = self.begin_transaction().await;
584
585 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
586 let val = match tx_fn_res {
587 Ok(val) => val,
588 Err(err) => {
589 dbtx.ignore_uncommitted();
590 return Err(AutocommitError::ClosureError {
591 attempts: curr_attempts,
592 error: err,
593 });
594 }
595 };
596
597 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
598
599 match dbtx.commit_tx_result().await {
600 Ok(()) => {
601 return Ok(val);
602 }
603 Err(err) => {
604 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
605 warn!(
606 target: LOG_DB,
607 curr_attempts,
608 err = %err.fmt_compact(),
609 "Database commit failed in an autocommit block - terminating"
610 );
611 return Err(AutocommitError::CommitFailed {
612 attempts: curr_attempts,
613 last_error: err,
614 });
615 }
616
617 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
618 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
619 warn!(
620 target: LOG_DB,
621 curr_attempts,
622 err = %err.fmt_compact(),
623 delay_ms = %delay,
624 "Database commit failed in an autocommit block - retrying"
625 );
626 crate::runtime::sleep(Duration::from_millis(delay)).await;
627 }
628 }
629 }
630 }
631
632 pub async fn wait_key_check<'a, K, T>(
637 &'a self,
638 key: &K,
639 checker: impl Fn(Option<K::Value>) -> Option<T>,
640 ) -> (T, DatabaseTransaction<'a, Committable>)
641 where
642 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
643 {
644 let key_bytes = key.to_bytes();
645 loop {
646 let notify = self.inner.register(&key_bytes);
648
649 let mut tx = self.inner.begin_transaction().await;
651
652 let maybe_value_bytes = tx
653 .raw_get_bytes(&key_bytes)
654 .await
655 .expect("Unrecoverable error when reading from database")
656 .map(|value_bytes| {
657 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
658 });
659
660 if let Some(value) = checker(maybe_value_bytes) {
661 return (
662 value,
663 DatabaseTransaction::new(tx, self.module_decoders.clone()),
664 );
665 }
666
667 notify.await;
669 }
672 }
673
674 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
676 where
677 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
678 {
679 self.wait_key_check(key, std::convert::identity).await.0
680 }
681}
682
683fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
684 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
685 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
686 bytes
687}
688
689#[derive(Clone, Debug)]
692struct PrefixDatabase<Inner>
693where
694 Inner: Debug,
695{
696 prefix: Vec<u8>,
697 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
698 inner: Inner,
699}
700
701impl<Inner> PrefixDatabase<Inner>
702where
703 Inner: Debug,
704{
705 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
709 let mut full_key = self.prefix.clone();
710 full_key.extend_from_slice(key);
711 full_key
712 }
713}
714
715#[apply(async_trait_maybe_send!)]
716impl<Inner> IDatabase for PrefixDatabase<Inner>
717where
718 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
719{
720 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
721 Box::new(PrefixDatabaseTransaction {
722 inner: self.inner.begin_transaction().await,
723 global_dbtx_access_token: self.global_dbtx_access_token,
724 prefix: self.prefix.clone(),
725 })
726 }
727 async fn register(&self, key: &[u8]) {
728 self.inner.register(&self.get_full_key(key)).await;
729 }
730
731 async fn notify(&self, key: &[u8]) {
732 self.inner.notify(&self.get_full_key(key)).await;
733 }
734
735 fn is_global(&self) -> bool {
736 if self.global_dbtx_access_token.is_some() {
737 false
738 } else {
739 self.inner.is_global()
740 }
741 }
742
743 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
744 self.inner.checkpoint(backup_path)
745 }
746}
747
748#[derive(Debug)]
753struct PrefixDatabaseTransaction<Inner> {
754 inner: Inner,
755 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
756 prefix: Vec<u8>,
757}
758
759impl<Inner> PrefixDatabaseTransaction<Inner> {
760 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
764 let mut full_key = self.prefix.clone();
765 full_key.extend_from_slice(key);
766 full_key
767 }
768
769 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
770 Range {
771 start: self.get_full_key(range.start),
772 end: self.get_full_key(range.end),
773 }
774 }
775
776 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
777 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
778 }
779}
780
781#[apply(async_trait_maybe_send!)]
782impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
783where
784 Inner: IDatabaseTransaction,
785{
786 async fn commit_tx(&mut self) -> DatabaseResult<()> {
787 self.inner.commit_tx().await
788 }
789
790 fn is_global(&self) -> bool {
791 if self.global_dbtx_access_token.is_some() {
792 false
793 } else {
794 self.inner.is_global()
795 }
796 }
797
798 fn global_dbtx(
799 &mut self,
800 access_token: GlobalDBTxAccessToken,
801 ) -> &mut dyn IDatabaseTransaction {
802 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
803 assert_eq!(
804 access_token, self_global_dbtx_access_token,
805 "Invalid access key used to access global_dbtx"
806 );
807 &mut self.inner
808 } else {
809 self.inner.global_dbtx(access_token)
810 }
811 }
812}
813
814#[apply(async_trait_maybe_send!)]
815impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
816where
817 Inner: IDatabaseTransactionOpsCore,
818{
819 async fn raw_insert_bytes(
820 &mut self,
821 key: &[u8],
822 value: &[u8],
823 ) -> DatabaseResult<Option<Vec<u8>>> {
824 let key = self.get_full_key(key);
825 self.inner.raw_insert_bytes(&key, value).await
826 }
827
828 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
829 let key = self.get_full_key(key);
830 self.inner.raw_get_bytes(&key).await
831 }
832
833 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
834 let key = self.get_full_key(key);
835 self.inner.raw_remove_entry(&key).await
836 }
837
838 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
839 let key = self.get_full_key(key_prefix);
840 let stream = self.inner.raw_find_by_prefix(&key).await?;
841 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
842 }
843
844 async fn raw_find_by_prefix_sorted_descending(
845 &mut self,
846 key_prefix: &[u8],
847 ) -> DatabaseResult<PrefixStream<'_>> {
848 let key = self.get_full_key(key_prefix);
849 let stream = self
850 .inner
851 .raw_find_by_prefix_sorted_descending(&key)
852 .await?;
853 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
854 }
855
856 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
857 let range = self.get_full_range(range);
858 let stream = self
859 .inner
860 .raw_find_by_range(Range {
861 start: &range.start,
862 end: &range.end,
863 })
864 .await?;
865 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
866 }
867
868 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
869 let key = self.get_full_key(key_prefix);
870 self.inner.raw_remove_by_prefix(&key).await
871 }
872}
873
874impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner> where
875 Inner: IDatabaseTransactionOps
876{
877}
878
879#[apply(async_trait_maybe_send!)]
883pub trait IDatabaseTransactionOpsCore: MaybeSend {
884 async fn raw_insert_bytes(
886 &mut self,
887 key: &[u8],
888 value: &[u8],
889 ) -> DatabaseResult<Option<Vec<u8>>>;
890
891 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
893
894 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
896
897 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>>;
900
901 async fn raw_find_by_prefix_sorted_descending(
903 &mut self,
904 key_prefix: &[u8],
905 ) -> DatabaseResult<PrefixStream<'_>>;
906
907 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>>;
911
912 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()>;
914}
915
916#[apply(async_trait_maybe_send!)]
917impl<T> IDatabaseTransactionOpsCore for Box<T>
918where
919 T: IDatabaseTransactionOpsCore + ?Sized,
920{
921 async fn raw_insert_bytes(
922 &mut self,
923 key: &[u8],
924 value: &[u8],
925 ) -> DatabaseResult<Option<Vec<u8>>> {
926 (**self).raw_insert_bytes(key, value).await
927 }
928
929 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
930 (**self).raw_get_bytes(key).await
931 }
932
933 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
934 (**self).raw_remove_entry(key).await
935 }
936
937 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
938 (**self).raw_find_by_prefix(key_prefix).await
939 }
940
941 async fn raw_find_by_prefix_sorted_descending(
942 &mut self,
943 key_prefix: &[u8],
944 ) -> DatabaseResult<PrefixStream<'_>> {
945 (**self)
946 .raw_find_by_prefix_sorted_descending(key_prefix)
947 .await
948 }
949
950 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
951 (**self).raw_find_by_range(range).await
952 }
953
954 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
955 (**self).raw_remove_by_prefix(key_prefix).await
956 }
957}
958
959#[apply(async_trait_maybe_send!)]
960impl<T> IDatabaseTransactionOpsCore for &mut T
961where
962 T: IDatabaseTransactionOpsCore + ?Sized,
963{
964 async fn raw_insert_bytes(
965 &mut self,
966 key: &[u8],
967 value: &[u8],
968 ) -> DatabaseResult<Option<Vec<u8>>> {
969 (**self).raw_insert_bytes(key, value).await
970 }
971
972 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
973 (**self).raw_get_bytes(key).await
974 }
975
976 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
977 (**self).raw_remove_entry(key).await
978 }
979
980 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
981 (**self).raw_find_by_prefix(key_prefix).await
982 }
983
984 async fn raw_find_by_prefix_sorted_descending(
985 &mut self,
986 key_prefix: &[u8],
987 ) -> DatabaseResult<PrefixStream<'_>> {
988 (**self)
989 .raw_find_by_prefix_sorted_descending(key_prefix)
990 .await
991 }
992
993 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
994 (**self).raw_find_by_range(range).await
995 }
996
997 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
998 (**self).raw_remove_by_prefix(key_prefix).await
999 }
1000}
1001
1002pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {}
1008
1009impl<T> IDatabaseTransactionOps for Box<T> where T: IDatabaseTransactionOps + ?Sized {}
1010
1011impl<T> IDatabaseTransactionOps for &mut T where T: IDatabaseTransactionOps + ?Sized {}
1012
1013#[apply(async_trait_maybe_send!)]
1019pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1020 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1021 where
1022 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1023
1024 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1025 where
1026 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1027 K::Value: MaybeSend + MaybeSync;
1028
1029 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1030 where
1031 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1032 K::Value: MaybeSend + MaybeSync;
1033
1034 async fn find_by_range<K>(
1035 &mut self,
1036 key_range: Range<K>,
1037 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1038 where
1039 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1040 K::Value: MaybeSend + MaybeSync;
1041
1042 async fn find_by_prefix<KP>(
1043 &mut self,
1044 key_prefix: &KP,
1045 ) -> Pin<
1046 Box<
1047 maybe_add_send!(
1048 dyn Stream<
1049 Item = (
1050 KP::Record,
1051 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1052 ),
1053 > + '_
1054 ),
1055 >,
1056 >
1057 where
1058 KP: DatabaseLookup + MaybeSend + MaybeSync,
1059 KP::Record: DatabaseKey;
1060
1061 async fn find_by_prefix_sorted_descending<KP>(
1062 &mut self,
1063 key_prefix: &KP,
1064 ) -> Pin<
1065 Box<
1066 maybe_add_send!(
1067 dyn Stream<
1068 Item = (
1069 KP::Record,
1070 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1071 ),
1072 > + '_
1073 ),
1074 >,
1075 >
1076 where
1077 KP: DatabaseLookup + MaybeSend + MaybeSync,
1078 KP::Record: DatabaseKey;
1079
1080 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1081 where
1082 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1083
1084 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1085 where
1086 KP: DatabaseLookup + MaybeSend + MaybeSync;
1087}
1088
1089#[apply(async_trait_maybe_send!)]
1092impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1093where
1094 T: IDatabaseTransactionOpsCore + WithDecoders,
1095{
1096 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1097 where
1098 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1099 {
1100 let key_bytes = key.to_bytes();
1101 let raw = self
1102 .raw_get_bytes(&key_bytes)
1103 .await
1104 .expect("Unrecoverable error occurred while reading and entry from the database");
1105 raw.map(|value_bytes| {
1106 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1107 })
1108 }
1109
1110 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1111 where
1112 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1113 K::Value: MaybeSend + MaybeSync,
1114 {
1115 let key_bytes = key.to_bytes();
1116 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1117 .await
1118 .expect("Unrecoverable error occurred while inserting entry into the database")
1119 .map(|value_bytes| {
1120 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1121 })
1122 }
1123
1124 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1125 where
1126 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1127 K::Value: MaybeSend + MaybeSync,
1128 {
1129 if let Some(prev) = self.insert_entry(key, value).await {
1130 panic!(
1131 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1132 );
1133 }
1134 }
1135
1136 async fn find_by_range<K>(
1137 &mut self,
1138 key_range: Range<K>,
1139 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1140 where
1141 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1142 K::Value: MaybeSend + MaybeSync,
1143 {
1144 let decoders = self.decoders().clone();
1145 Box::pin(
1146 self.raw_find_by_range(Range {
1147 start: &key_range.start.to_bytes(),
1148 end: &key_range.end.to_bytes(),
1149 })
1150 .await
1151 .expect("Unrecoverable error occurred while listing entries from the database")
1152 .map(move |(key_bytes, value_bytes)| {
1153 let key = decode_key_expect(&key_bytes, &decoders);
1154 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1155 (key, value)
1156 }),
1157 )
1158 }
1159
1160 async fn find_by_prefix<KP>(
1161 &mut self,
1162 key_prefix: &KP,
1163 ) -> Pin<
1164 Box<
1165 maybe_add_send!(
1166 dyn Stream<
1167 Item = (
1168 KP::Record,
1169 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1170 ),
1171 > + '_
1172 ),
1173 >,
1174 >
1175 where
1176 KP: DatabaseLookup + MaybeSend + MaybeSync,
1177 KP::Record: DatabaseKey,
1178 {
1179 let decoders = self.decoders().clone();
1180 Box::pin(
1181 self.raw_find_by_prefix(&key_prefix.to_bytes())
1182 .await
1183 .expect("Unrecoverable error occurred while listing entries from the database")
1184 .map(move |(key_bytes, value_bytes)| {
1185 let key = decode_key_expect(&key_bytes, &decoders);
1186 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1187 (key, value)
1188 }),
1189 )
1190 }
1191
1192 async fn find_by_prefix_sorted_descending<KP>(
1193 &mut self,
1194 key_prefix: &KP,
1195 ) -> Pin<
1196 Box<
1197 maybe_add_send!(
1198 dyn Stream<
1199 Item = (
1200 KP::Record,
1201 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1202 ),
1203 > + '_
1204 ),
1205 >,
1206 >
1207 where
1208 KP: DatabaseLookup + MaybeSend + MaybeSync,
1209 KP::Record: DatabaseKey,
1210 {
1211 let decoders = self.decoders().clone();
1212 Box::pin(
1213 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1214 .await
1215 .expect("Unrecoverable error occurred while listing entries from the database")
1216 .map(move |(key_bytes, value_bytes)| {
1217 let key = decode_key_expect(&key_bytes, &decoders);
1218 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1219 (key, value)
1220 }),
1221 )
1222 }
1223 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1224 where
1225 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1226 {
1227 let key_bytes = key.to_bytes();
1228 self.raw_remove_entry(&key_bytes)
1229 .await
1230 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1231 .map(|value_bytes| {
1232 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1233 })
1234 }
1235 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1236 where
1237 KP: DatabaseLookup + MaybeSend + MaybeSync,
1238 {
1239 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1240 .await
1241 .expect("Unrecoverable error when removing entries from the database");
1242 }
1243}
1244
1245pub trait WithDecoders {
1248 fn decoders(&self) -> &ModuleDecoderRegistry;
1249}
1250
1251#[apply(async_trait_maybe_send!)]
1253pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1254 async fn commit_tx(self) -> DatabaseResult<()>;
1255}
1256
1257#[apply(async_trait_maybe_send!)]
1261pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1262 async fn commit_tx(&mut self) -> DatabaseResult<()>;
1264
1265 fn is_global(&self) -> bool;
1267
1268 #[doc(hidden)]
1273 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1274 -> &mut dyn IDatabaseTransaction;
1275}
1276
1277#[apply(async_trait_maybe_send!)]
1278impl<T> IDatabaseTransaction for Box<T>
1279where
1280 T: IDatabaseTransaction + ?Sized,
1281{
1282 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1283 (**self).commit_tx().await
1284 }
1285
1286 fn is_global(&self) -> bool {
1287 (**self).is_global()
1288 }
1289
1290 fn global_dbtx(
1291 &mut self,
1292 access_token: GlobalDBTxAccessToken,
1293 ) -> &mut dyn IDatabaseTransaction {
1294 (**self).global_dbtx(access_token)
1295 }
1296}
1297
1298#[apply(async_trait_maybe_send!)]
1299impl<'a, T> IDatabaseTransaction for &'a mut T
1300where
1301 T: IDatabaseTransaction + ?Sized,
1302{
1303 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1304 (**self).commit_tx().await
1305 }
1306
1307 fn is_global(&self) -> bool {
1308 (**self).is_global()
1309 }
1310
1311 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1312 (**self).global_dbtx(access_key)
1313 }
1314}
1315
1316struct BaseDatabaseTransaction<Tx> {
1319 raw: Option<Tx>,
1321 notify_queue: Option<NotifyQueue>,
1322 notifications: Arc<Notifications>,
1323}
1324
1325impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1326where
1327 Tx: fmt::Debug,
1328{
1329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330 f.write_fmt(format_args!(
1331 "BaseDatabaseTransaction{{ raw={:?} }}",
1332 self.raw
1333 ))
1334 }
1335}
1336impl<Tx> BaseDatabaseTransaction<Tx>
1337where
1338 Tx: IRawDatabaseTransaction,
1339{
1340 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1341 Self {
1342 raw: Some(dbtx),
1343 notifications,
1344 notify_queue: Some(NotifyQueue::new()),
1345 }
1346 }
1347
1348 fn add_notification_key(&mut self, key: &[u8]) -> DatabaseResult<()> {
1349 self.notify_queue
1350 .as_mut()
1351 .ok_or(DatabaseError::TransactionConsumed)?
1352 .add(key);
1353 Ok(())
1354 }
1355}
1356
1357#[apply(async_trait_maybe_send!)]
1358impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1359 async fn raw_insert_bytes(
1360 &mut self,
1361 key: &[u8],
1362 value: &[u8],
1363 ) -> DatabaseResult<Option<Vec<u8>>> {
1364 self.add_notification_key(key)?;
1365 self.raw
1366 .as_mut()
1367 .ok_or(DatabaseError::TransactionConsumed)?
1368 .raw_insert_bytes(key, value)
1369 .await
1370 }
1371
1372 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1373 self.raw
1374 .as_mut()
1375 .ok_or(DatabaseError::TransactionConsumed)?
1376 .raw_get_bytes(key)
1377 .await
1378 }
1379
1380 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1381 self.add_notification_key(key)?;
1382 self.raw
1383 .as_mut()
1384 .ok_or(DatabaseError::TransactionConsumed)?
1385 .raw_remove_entry(key)
1386 .await
1387 }
1388
1389 async fn raw_find_by_range(
1390 &mut self,
1391 key_range: Range<&[u8]>,
1392 ) -> DatabaseResult<PrefixStream<'_>> {
1393 self.raw
1394 .as_mut()
1395 .ok_or(DatabaseError::TransactionConsumed)?
1396 .raw_find_by_range(key_range)
1397 .await
1398 }
1399
1400 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1401 self.raw
1402 .as_mut()
1403 .ok_or(DatabaseError::TransactionConsumed)?
1404 .raw_find_by_prefix(key_prefix)
1405 .await
1406 }
1407
1408 async fn raw_find_by_prefix_sorted_descending(
1409 &mut self,
1410 key_prefix: &[u8],
1411 ) -> DatabaseResult<PrefixStream<'_>> {
1412 self.raw
1413 .as_mut()
1414 .ok_or(DatabaseError::TransactionConsumed)?
1415 .raw_find_by_prefix_sorted_descending(key_prefix)
1416 .await
1417 }
1418
1419 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1420 self.raw
1421 .as_mut()
1422 .ok_or(DatabaseError::TransactionConsumed)?
1423 .raw_remove_by_prefix(key_prefix)
1424 .await
1425 }
1426}
1427
1428impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {}
1429
1430#[apply(async_trait_maybe_send!)]
1431impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1432 for BaseDatabaseTransaction<Tx>
1433{
1434 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1435 self.raw
1436 .take()
1437 .ok_or(DatabaseError::TransactionConsumed)?
1438 .commit_tx()
1439 .await?;
1440 self.notifications.submit_queue(
1441 &self
1442 .notify_queue
1443 .take()
1444 .expect("commit must be called only once"),
1445 );
1446 Ok(())
1447 }
1448
1449 fn is_global(&self) -> bool {
1450 true
1451 }
1452
1453 fn global_dbtx(
1454 &mut self,
1455 _access_token: GlobalDBTxAccessToken,
1456 ) -> &mut dyn IDatabaseTransaction {
1457 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1458 }
1459}
1460
1461#[derive(Clone)]
1464struct CommitTracker {
1465 is_committed: bool,
1467 has_writes: bool,
1469 ignore_uncommitted: bool,
1471}
1472
1473impl Drop for CommitTracker {
1474 fn drop(&mut self) {
1475 if self.has_writes && !self.is_committed {
1476 if self.ignore_uncommitted {
1477 trace!(
1478 target: LOG_DB,
1479 "DatabaseTransaction has writes and has not called commit, but that's expected."
1480 );
1481 } else {
1482 warn!(
1483 target: LOG_DB,
1484 location = ?backtrace::Backtrace::new(),
1485 "DatabaseTransaction has writes and has not called commit."
1486 );
1487 }
1488 }
1489 }
1490}
1491
1492enum MaybeRef<'a, T> {
1493 Owned(T),
1494 Borrowed(&'a mut T),
1495}
1496
1497impl<T> ops::Deref for MaybeRef<'_, T> {
1498 type Target = T;
1499
1500 fn deref(&self) -> &Self::Target {
1501 match self {
1502 MaybeRef::Owned(o) => o,
1503 MaybeRef::Borrowed(r) => r,
1504 }
1505 }
1506}
1507
1508impl<T> ops::DerefMut for MaybeRef<'_, T> {
1509 fn deref_mut(&mut self) -> &mut Self::Target {
1510 match self {
1511 MaybeRef::Owned(o) => o,
1512 MaybeRef::Borrowed(r) => r,
1513 }
1514 }
1515}
1516
1517pub struct Committable;
1521
1522pub struct NonCommittable;
1526
1527pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1531 tx: Box<dyn IDatabaseTransaction + 'tx>,
1532 decoders: ModuleDecoderRegistry,
1533 commit_tracker: MaybeRef<'tx, CommitTracker>,
1534 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1535 capability: marker::PhantomData<Cap>,
1536}
1537
1538impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540 f.write_fmt(format_args!(
1541 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1542 self.tx, self.decoders
1543 ))
1544 }
1545}
1546
1547impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1548 fn decoders(&self) -> &ModuleDecoderRegistry {
1549 &self.decoders
1550 }
1551}
1552
1553#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1554fn decode_value<V: DatabaseValue>(
1555 value_bytes: &[u8],
1556 decoders: &ModuleDecoderRegistry,
1557) -> std::result::Result<V, DecodingError> {
1558 trace!(
1559 bytes = %AbbreviateHexBytes(value_bytes),
1560 "decoding value",
1561 );
1562 V::from_bytes(value_bytes, decoders)
1563}
1564
1565#[track_caller]
1566fn decode_value_expect<V: DatabaseValue>(
1567 value_bytes: &[u8],
1568 decoders: &ModuleDecoderRegistry,
1569 key_bytes: &[u8],
1570) -> V {
1571 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1572 panic!(
1573 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1574 any::type_name::<V>(),
1575 err,
1576 AbbreviateHexBytes(key_bytes),
1577 AbbreviateHexBytes(value_bytes),
1578 )
1579 })
1580}
1581
1582#[track_caller]
1583fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1584 trace!(
1585 bytes = %AbbreviateHexBytes(key_bytes),
1586 "decoding key",
1587 );
1588 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1589 panic!(
1590 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1591 any::type_name::<K>(),
1592 err,
1593 AbbreviateHexBytes(key_bytes)
1594 )
1595 })
1596}
1597
1598impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1599 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1601 DatabaseTransaction {
1602 tx: self.tx,
1603 decoders: self.decoders,
1604 commit_tracker: self.commit_tracker,
1605 on_commit_hooks: self.on_commit_hooks,
1606 capability: PhantomData::<NonCommittable>,
1607 }
1608 }
1609
1610 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1612 where
1613 's: 'a,
1614 {
1615 self.to_ref().into_nc()
1616 }
1617
1618 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1620 where
1621 'tx: 'a,
1622 {
1623 DatabaseTransaction {
1624 tx: Box::new(PrefixDatabaseTransaction {
1625 inner: self.tx,
1626 global_dbtx_access_token: None,
1627 prefix,
1628 }),
1629 decoders: self.decoders,
1630 commit_tracker: self.commit_tracker,
1631 on_commit_hooks: self.on_commit_hooks,
1632 capability: self.capability,
1633 }
1634 }
1635
1636 pub fn with_prefix_module_id<'a: 'tx>(
1640 self,
1641 module_instance_id: ModuleInstanceId,
1642 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1643 where
1644 'tx: 'a,
1645 {
1646 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1647 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1648 (
1649 DatabaseTransaction {
1650 tx: Box::new(PrefixDatabaseTransaction {
1651 inner: self.tx,
1652 global_dbtx_access_token: Some(global_dbtx_access_token),
1653 prefix,
1654 }),
1655 decoders: self.decoders,
1656 commit_tracker: self.commit_tracker,
1657 on_commit_hooks: self.on_commit_hooks,
1658 capability: self.capability,
1659 },
1660 global_dbtx_access_token,
1661 )
1662 }
1663
1664 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1666 where
1667 's: 'a,
1668 {
1669 let decoders = self.decoders.clone();
1670
1671 DatabaseTransaction {
1672 tx: Box::new(&mut self.tx),
1673 decoders,
1674 commit_tracker: match self.commit_tracker {
1675 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1676 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1677 },
1678 on_commit_hooks: match self.on_commit_hooks {
1679 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1680 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1681 },
1682 capability: self.capability,
1683 }
1684 }
1685
1686 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1688 where
1689 'tx: 'a,
1690 {
1691 DatabaseTransaction {
1692 tx: Box::new(PrefixDatabaseTransaction {
1693 inner: &mut self.tx,
1694 global_dbtx_access_token: None,
1695 prefix,
1696 }),
1697 decoders: self.decoders.clone(),
1698 commit_tracker: match self.commit_tracker {
1699 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1700 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1701 },
1702 on_commit_hooks: match self.on_commit_hooks {
1703 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1704 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1705 },
1706 capability: self.capability,
1707 }
1708 }
1709
1710 pub fn to_ref_with_prefix_module_id<'a>(
1711 &'a mut self,
1712 module_instance_id: ModuleInstanceId,
1713 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1714 where
1715 'tx: 'a,
1716 {
1717 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1718 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1719 (
1720 DatabaseTransaction {
1721 tx: Box::new(PrefixDatabaseTransaction {
1722 inner: &mut self.tx,
1723 global_dbtx_access_token: Some(global_dbtx_access_token),
1724 prefix,
1725 }),
1726 decoders: self.decoders.clone(),
1727 commit_tracker: match self.commit_tracker {
1728 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1729 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1730 },
1731 on_commit_hooks: match self.on_commit_hooks {
1732 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1733 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1734 },
1735 capability: self.capability,
1736 },
1737 global_dbtx_access_token,
1738 )
1739 }
1740
1741 pub fn is_global(&self) -> bool {
1743 self.tx.is_global()
1744 }
1745
1746 pub fn ensure_global(&self) -> DatabaseResult<()> {
1748 if !self.is_global() {
1749 return Err(DatabaseError::Other(anyhow::anyhow!(
1750 "Database instance not global"
1751 )));
1752 }
1753
1754 Ok(())
1755 }
1756
1757 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
1759 if self.is_global() {
1760 return Err(DatabaseError::Other(anyhow::anyhow!(
1761 "Database instance not isolated"
1762 )));
1763 }
1764
1765 Ok(())
1766 }
1767
1768 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1770 self.commit_tracker.ignore_uncommitted = true;
1771 self
1772 }
1773
1774 pub fn warn_uncommitted(&mut self) -> &mut Self {
1776 self.commit_tracker.ignore_uncommitted = false;
1777 self
1778 }
1779
1780 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1782 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1783 self.on_commit_hooks.push(Box::new(f));
1784 }
1785
1786 pub fn global_dbtx<'a>(
1787 &'a mut self,
1788 access_token: GlobalDBTxAccessToken,
1789 ) -> DatabaseTransaction<'a, Cap>
1790 where
1791 'tx: 'a,
1792 {
1793 let decoders = self.decoders.clone();
1794
1795 DatabaseTransaction {
1796 tx: Box::new(self.tx.global_dbtx(access_token)),
1797 decoders,
1798 commit_tracker: match self.commit_tracker {
1799 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1800 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1801 },
1802 on_commit_hooks: match self.on_commit_hooks {
1803 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1804 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1805 },
1806 capability: self.capability,
1807 }
1808 }
1809}
1810
1811#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1813pub struct GlobalDBTxAccessToken(u32);
1814
1815impl GlobalDBTxAccessToken {
1816 fn from_prefix(prefix: &[u8]) -> Self {
1827 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1828 }
1829}
1830
1831impl<'tx> DatabaseTransaction<'tx, Committable> {
1832 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1833 Self {
1834 tx: dbtx,
1835 decoders,
1836 commit_tracker: MaybeRef::Owned(CommitTracker {
1837 is_committed: false,
1838 has_writes: false,
1839 ignore_uncommitted: false,
1840 }),
1841 on_commit_hooks: MaybeRef::Owned(vec![]),
1842 capability: PhantomData,
1843 }
1844 }
1845
1846 pub async fn commit_tx_result(mut self) -> DatabaseResult<()> {
1847 self.commit_tracker.is_committed = true;
1848 let commit_result = self.tx.commit_tx().await;
1849
1850 if commit_result.is_ok() {
1852 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1853 hook();
1854 }
1855 }
1856
1857 commit_result
1858 }
1859
1860 pub async fn commit_tx(mut self) {
1861 self.commit_tracker.is_committed = true;
1862 self.commit_tx_result()
1863 .await
1864 .expect("Unrecoverable error occurred while committing to the database.");
1865 }
1866}
1867
1868#[apply(async_trait_maybe_send!)]
1869impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1870where
1871 Cap: Send,
1872{
1873 async fn raw_insert_bytes(
1874 &mut self,
1875 key: &[u8],
1876 value: &[u8],
1877 ) -> DatabaseResult<Option<Vec<u8>>> {
1878 self.commit_tracker.has_writes = true;
1879 self.tx.raw_insert_bytes(key, value).await
1880 }
1881
1882 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1883 self.tx.raw_get_bytes(key).await
1884 }
1885
1886 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1887 self.tx.raw_remove_entry(key).await
1888 }
1889
1890 async fn raw_find_by_range(
1891 &mut self,
1892 key_range: Range<&[u8]>,
1893 ) -> DatabaseResult<PrefixStream<'_>> {
1894 self.tx.raw_find_by_range(key_range).await
1895 }
1896
1897 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1898 self.tx.raw_find_by_prefix(key_prefix).await
1899 }
1900
1901 async fn raw_find_by_prefix_sorted_descending(
1902 &mut self,
1903 key_prefix: &[u8],
1904 ) -> DatabaseResult<PrefixStream<'_>> {
1905 self.tx
1906 .raw_find_by_prefix_sorted_descending(key_prefix)
1907 .await
1908 }
1909
1910 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1911 self.commit_tracker.has_writes = true;
1912 self.tx.raw_remove_by_prefix(key_prefix).await
1913 }
1914}
1915impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {}
1916
1917impl<T> DatabaseKeyPrefix for T
1918where
1919 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1920{
1921 fn to_bytes(&self) -> Vec<u8> {
1922 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1923 data.append(&mut self.consensus_encode_to_vec());
1924 data
1925 }
1926}
1927
1928impl<T> DatabaseKey for T
1929where
1930 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1933{
1934 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1935 fn from_bytes(
1936 data: &[u8],
1937 modules: &ModuleDecoderRegistry,
1938 ) -> std::result::Result<Self, DecodingError> {
1939 if data.is_empty() {
1940 return Err(DecodingError::wrong_length(1, 0));
1942 }
1943
1944 if data[0] != Self::DB_PREFIX {
1945 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1946 }
1947
1948 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1949 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1950 }
1951}
1952
1953impl<T> DatabaseValue for T
1954where
1955 T: Debug + Encodable + Decodable,
1956{
1957 fn from_bytes(
1958 data: &[u8],
1959 modules: &ModuleDecoderRegistry,
1960 ) -> std::result::Result<Self, DecodingError> {
1961 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1962 }
1963
1964 fn to_bytes(&self) -> Vec<u8> {
1965 self.consensus_encode_to_vec()
1966 }
1967}
1968
1969#[macro_export]
2030macro_rules! impl_db_record {
2031 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2032 impl $crate::db::DatabaseRecord for $key {
2033 const DB_PREFIX: u8 = $db_prefix as u8;
2034 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2035 type Key = Self;
2036 type Value = $val;
2037 }
2038 $(
2039 impl_db_record! {
2040 @impl_notify_marker key = $key, notify_on_modify = $notify
2041 }
2042 )?
2043 };
2044 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2046 impl $crate::db::DatabaseKeyWithNotify for $key {}
2047 };
2048 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2050}
2051
2052#[macro_export]
2053macro_rules! impl_db_lookup{
2054 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2055 $(
2056 impl $crate::db::DatabaseLookup for $query_prefix {
2057 type Record = $key;
2058 }
2059 )*
2060 };
2061}
2062
2063#[derive(Debug, Encodable, Decodable, Serialize)]
2065pub struct DatabaseVersionKeyV0;
2066
2067#[derive(Debug, Encodable, Decodable, Serialize)]
2068pub struct DatabaseVersionKey(pub ModuleInstanceId);
2069
2070#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2071pub struct DatabaseVersion(pub u64);
2072
2073impl_db_record!(
2074 key = DatabaseVersionKeyV0,
2075 value = DatabaseVersion,
2076 db_prefix = DbKeyPrefix::DatabaseVersion
2077);
2078
2079impl_db_record!(
2080 key = DatabaseVersionKey,
2081 value = DatabaseVersion,
2082 db_prefix = DbKeyPrefix::DatabaseVersion
2083);
2084
2085impl std::fmt::Display for DatabaseVersion {
2086 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2087 write!(f, "{}", self.0)
2088 }
2089}
2090
2091impl DatabaseVersion {
2092 pub fn increment(&self) -> Self {
2093 Self(self.0 + 1)
2094 }
2095}
2096
2097impl std::fmt::Display for DbKeyPrefix {
2098 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2099 write!(f, "{self:?}")
2100 }
2101}
2102
2103#[repr(u8)]
2104#[derive(Clone, EnumIter, Debug)]
2105pub enum DbKeyPrefix {
2106 DatabaseVersion = 0x50,
2107 ClientBackup = 0x51,
2108}
2109
2110#[derive(Debug, Error)]
2111pub enum DecodingError {
2112 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2113 WrongPrefix { expected: u8, found: u8 },
2114 #[error("Key had a wrong length, expected {expected} but got {found}")]
2115 WrongLength { expected: usize, found: usize },
2116 #[error("Other decoding error: {0:#}")]
2117 Other(anyhow::Error),
2118}
2119
2120impl DecodingError {
2121 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2122 Self::Other(anyhow::Error::from(error))
2123 }
2124
2125 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2126 Self::WrongPrefix { expected, found }
2127 }
2128
2129 pub fn wrong_length(expected: usize, found: usize) -> Self {
2130 Self::WrongLength { expected, found }
2131 }
2132}
2133
2134#[derive(Debug, Error)]
2136pub enum DatabaseError {
2137 #[error("Write-write conflict detected")]
2140 WriteConflict,
2141
2142 #[error("Transaction already consumed")]
2145 TransactionConsumed,
2146
2147 #[error("Database backend error: {0}")]
2149 DatabaseBackend(#[from] Box<dyn Error + Send + Sync>),
2150
2151 #[error("Database error: {0:#}")]
2153 Other(anyhow::Error),
2154}
2155
2156impl DatabaseError {
2157 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2159 Self::Other(anyhow::Error::from(error))
2160 }
2161
2162 pub fn backend<E: Error + Send + Sync + 'static>(error: E) -> Self {
2164 Self::DatabaseBackend(Box::new(error))
2165 }
2166}
2167
2168impl From<anyhow::Error> for DatabaseError {
2169 fn from(error: anyhow::Error) -> Self {
2170 Self::Other(error)
2171 }
2172}
2173
2174#[macro_export]
2175macro_rules! push_db_pair_items {
2176 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2177 let db_items =
2178 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2179 .await
2180 .map(|(key, val)| {
2181 (
2182 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2183 val,
2184 )
2185 })
2186 .collect::<BTreeMap<String, $value_type>>()
2187 .await;
2188
2189 $map.insert($key_literal.to_string(), Box::new(db_items));
2190 };
2191}
2192
2193#[macro_export]
2194macro_rules! push_db_key_items {
2195 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2196 let db_items =
2197 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2198 .await
2199 .map(|(key, _)| key)
2200 .collect::<Vec<$key_type>>()
2201 .await;
2202
2203 $map.insert($key_literal.to_string(), Box::new(db_items));
2204 };
2205}
2206
2207pub struct DbMigrationFnContext<'tx, C> {
2221 dbtx: DatabaseTransaction<'tx>,
2222 module_instance_id: Option<ModuleInstanceId>,
2223 ctx: C,
2224 __please_use_constructor: (),
2225}
2226
2227impl<'tx, C> DbMigrationFnContext<'tx, C> {
2228 pub fn new(
2229 dbtx: DatabaseTransaction<'tx>,
2230 module_instance_id: Option<ModuleInstanceId>,
2231 ctx: C,
2232 ) -> Self {
2233 dbtx.ensure_global().expect("Must pass global dbtx");
2234 Self {
2235 dbtx,
2236 module_instance_id,
2237 ctx,
2238 __please_use_constructor: (),
2240 }
2241 }
2242
2243 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2244 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2245 }
2246
2247 #[doc(hidden)]
2249 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2250 let Self { dbtx, ctx, .. } = self;
2251
2252 (dbtx, ctx)
2253 }
2254
2255 pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2256 if let Some(module_instance_id) = self.module_instance_id {
2257 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2258 } else {
2259 self.dbtx.to_ref_nc()
2260 }
2261 }
2262
2263 #[doc(hidden)]
2265 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2266 self.module_instance_id
2267 }
2268}
2269
2270pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2272pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2273
2274pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2279pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2280
2281pub type DbMigrationFn<C> = Box<
2292 maybe_add_send_sync!(
2293 dyn for<'tx> Fn(
2294 DbMigrationFnContext<'tx, C>,
2295 ) -> Pin<
2296 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2297 >
2298 ),
2299>;
2300
2301pub fn get_current_database_version<F>(
2305 migrations: &BTreeMap<DatabaseVersion, F>,
2306) -> DatabaseVersion {
2307 let versions = migrations.keys().copied().collect::<Vec<_>>();
2308
2309 if !versions
2312 .windows(2)
2313 .all(|window| window[0].increment() == window[1])
2314 {
2315 panic!("Database Migrations are not defined contiguously");
2316 }
2317
2318 versions
2319 .last()
2320 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2321}
2322
2323pub async fn apply_migrations<C>(
2324 db: &Database,
2325 ctx: C,
2326 kind: String,
2327 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2328 module_instance_id: Option<ModuleInstanceId>,
2329 external_prefixes_above: Option<u8>,
2332) -> std::result::Result<(), anyhow::Error>
2333where
2334 C: Clone,
2335{
2336 let mut dbtx = db.begin_transaction().await;
2337 apply_migrations_dbtx(
2338 &mut dbtx.to_ref_nc(),
2339 ctx,
2340 kind,
2341 migrations,
2342 module_instance_id,
2343 external_prefixes_above,
2344 )
2345 .await?;
2346
2347 dbtx.commit_tx_result()
2348 .await
2349 .map_err(|e| anyhow::Error::msg(e.to_string()))
2350}
2351pub async fn apply_migrations_dbtx<C>(
2363 global_dbtx: &mut DatabaseTransaction<'_>,
2364 ctx: C,
2365 kind: String,
2366 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2367 module_instance_id: Option<ModuleInstanceId>,
2368 external_prefixes_above: Option<u8>,
2371) -> std::result::Result<(), anyhow::Error>
2372where
2373 C: Clone,
2374{
2375 let is_new_db = global_dbtx
2378 .raw_find_by_prefix(&[])
2379 .await?
2380 .filter(|(key, _v)| {
2381 std::future::ready(
2382 external_prefixes_above.is_none_or(|external_prefixes_above| {
2383 !key.is_empty() && key[0] < external_prefixes_above
2384 }),
2385 )
2386 })
2387 .next()
2388 .await
2389 .is_none();
2390
2391 let target_db_version = get_current_database_version(&migrations);
2392
2393 create_database_version_dbtx(
2395 global_dbtx,
2396 target_db_version,
2397 module_instance_id,
2398 kind.clone(),
2399 is_new_db,
2400 )
2401 .await?;
2402
2403 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2404
2405 let disk_version = global_dbtx
2406 .get_value(&DatabaseVersionKey(module_instance_id_key))
2407 .await;
2408
2409 let db_version = if let Some(disk_version) = disk_version {
2410 let mut current_db_version = disk_version;
2411
2412 if current_db_version > target_db_version {
2413 return Err(anyhow::anyhow!(format!(
2414 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2415 )));
2416 }
2417
2418 while current_db_version < target_db_version {
2419 if let Some(migration) = migrations.get(¤t_db_version) {
2420 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2421 migration(DbMigrationFnContext::new(
2422 global_dbtx.to_ref_nc(),
2423 module_instance_id,
2424 ctx.clone(),
2425 ))
2426 .await?;
2427 } else {
2428 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2429 }
2430
2431 current_db_version = current_db_version.increment();
2432
2433 global_dbtx
2434 .insert_entry(
2435 &DatabaseVersionKey(module_instance_id_key),
2436 ¤t_db_version,
2437 )
2438 .await;
2439 }
2440
2441 current_db_version
2442 } else {
2443 target_db_version
2444 };
2445
2446 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2447 Ok(())
2448}
2449
2450pub async fn create_database_version(
2451 db: &Database,
2452 target_db_version: DatabaseVersion,
2453 module_instance_id: Option<ModuleInstanceId>,
2454 kind: String,
2455 is_new_db: bool,
2456) -> std::result::Result<(), anyhow::Error> {
2457 let mut dbtx = db.begin_transaction().await;
2458
2459 create_database_version_dbtx(
2460 &mut dbtx.to_ref_nc(),
2461 target_db_version,
2462 module_instance_id,
2463 kind,
2464 is_new_db,
2465 )
2466 .await?;
2467
2468 dbtx.commit_tx_result().await?;
2469 Ok(())
2470}
2471
2472pub async fn create_database_version_dbtx(
2476 global_dbtx: &mut DatabaseTransaction<'_>,
2477 target_db_version: DatabaseVersion,
2478 module_instance_id: Option<ModuleInstanceId>,
2479 kind: String,
2480 is_new_db: bool,
2481) -> std::result::Result<(), anyhow::Error> {
2482 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2483
2484 if global_dbtx
2488 .get_value(&DatabaseVersionKey(key_module_instance_id))
2489 .await
2490 .is_none()
2491 {
2492 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2501 remove_current_db_version_if_exists(
2502 &mut global_dbtx
2503 .to_ref_with_prefix_module_id(module_instance_id)
2504 .0
2505 .into_nc(),
2506 is_new_db,
2507 target_db_version,
2508 )
2509 .await
2510 } else {
2511 remove_current_db_version_if_exists(
2512 &mut global_dbtx.to_ref().into_nc(),
2513 is_new_db,
2514 target_db_version,
2515 )
2516 .await
2517 };
2518
2519 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2521 global_dbtx
2522 .insert_new_entry(
2523 &DatabaseVersionKey(key_module_instance_id),
2524 ¤t_version_in_module,
2525 )
2526 .await;
2527 }
2528
2529 Ok(())
2530}
2531
2532async fn remove_current_db_version_if_exists(
2537 version_dbtx: &mut DatabaseTransaction<'_>,
2538 is_new_db: bool,
2539 target_db_version: DatabaseVersion,
2540) -> DatabaseVersion {
2541 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2545 match current_version_in_module {
2546 Some(database_version) => database_version,
2547 None if is_new_db => target_db_version,
2548 None => DatabaseVersion(0),
2549 }
2550}
2551
2552fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2555 module_instance_id.unwrap_or_else(|| MODULE_GLOBAL_PREFIX.into())
2557}
2558#[allow(unused_imports)]
2559mod test_utils {
2560 use std::collections::BTreeMap;
2561 use std::time::Duration;
2562
2563 use fedimint_core::db::DbMigrationFnContext;
2564 use futures::future::ready;
2565 use futures::{Future, FutureExt, StreamExt};
2566 use rand::Rng;
2567 use tokio::join;
2568
2569 use super::{
2570 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2571 DbMigrationFn, apply_migrations,
2572 };
2573 use crate::core::ModuleKind;
2574 use crate::db::mem_impl::MemDatabase;
2575 use crate::db::{
2576 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2577 };
2578 use crate::encoding::{Decodable, Encodable};
2579 use crate::module::registry::ModuleDecoderRegistry;
2580
2581 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2582 crate::runtime::timeout(Duration::from_millis(10), fut)
2583 .await
2584 .ok()
2585 }
2586
2587 #[repr(u8)]
2588 #[derive(Clone)]
2589 pub enum TestDbKeyPrefix {
2590 Test = 0x42,
2591 AltTest = 0x43,
2592 PercentTestKey = 0x25,
2593 }
2594
2595 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2596 pub(super) struct TestKey(pub u64);
2597
2598 #[derive(Debug, Encodable, Decodable)]
2599 struct DbPrefixTestPrefix;
2600
2601 impl_db_record!(
2602 key = TestKey,
2603 value = TestVal,
2604 db_prefix = TestDbKeyPrefix::Test,
2605 notify_on_modify = true,
2606 );
2607 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2608
2609 #[derive(Debug, Encodable, Decodable)]
2610 struct TestKeyV0(u64, u64);
2611
2612 #[derive(Debug, Encodable, Decodable)]
2613 struct DbPrefixTestPrefixV0;
2614
2615 impl_db_record!(
2616 key = TestKeyV0,
2617 value = TestVal,
2618 db_prefix = TestDbKeyPrefix::Test,
2619 );
2620 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2621
2622 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2623 struct AltTestKey(u64);
2624
2625 #[derive(Debug, Encodable, Decodable)]
2626 struct AltDbPrefixTestPrefix;
2627
2628 impl_db_record!(
2629 key = AltTestKey,
2630 value = TestVal,
2631 db_prefix = TestDbKeyPrefix::AltTest,
2632 );
2633 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2634
2635 #[derive(Debug, Encodable, Decodable)]
2636 struct PercentTestKey(u64);
2637
2638 #[derive(Debug, Encodable, Decodable)]
2639 struct PercentPrefixTestPrefix;
2640
2641 impl_db_record!(
2642 key = PercentTestKey,
2643 value = TestVal,
2644 db_prefix = TestDbKeyPrefix::PercentTestKey,
2645 );
2646
2647 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2648 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2649 pub(super) struct TestVal(pub u64);
2650
2651 const TEST_MODULE_PREFIX: u16 = 1;
2652 const ALT_MODULE_PREFIX: u16 = 2;
2653
2654 pub async fn verify_insert_elements(db: Database) {
2655 let mut dbtx = db.begin_transaction().await;
2656 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2657 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2658 dbtx.commit_tx().await;
2659
2660 let mut dbtx = db.begin_transaction().await;
2662 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2663 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2664 dbtx.commit_tx().await;
2665
2666 let mut dbtx = db.begin_transaction().await;
2668 assert_eq!(
2669 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2670 Some(TestVal(2))
2671 );
2672 assert_eq!(
2673 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2674 Some(TestVal(3))
2675 );
2676 dbtx.commit_tx().await;
2677
2678 let mut dbtx = db.begin_transaction().await;
2679 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2680 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2681 dbtx.commit_tx().await;
2682 }
2683
2684 pub async fn verify_remove_nonexisting(db: Database) {
2685 let mut dbtx = db.begin_transaction().await;
2686 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2687 let removed = dbtx.remove_entry(&TestKey(1)).await;
2688 assert!(removed.is_none());
2689
2690 dbtx.commit_tx().await;
2692 }
2693
2694 pub async fn verify_remove_existing(db: Database) {
2695 let mut dbtx = db.begin_transaction().await;
2696
2697 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2698
2699 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2700
2701 let removed = dbtx.remove_entry(&TestKey(1)).await;
2702 assert_eq!(removed, Some(TestVal(2)));
2703 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2704
2705 dbtx.commit_tx().await;
2707 }
2708
2709 pub async fn verify_read_own_writes(db: Database) {
2710 let mut dbtx = db.begin_transaction().await;
2711
2712 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2713
2714 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2715
2716 dbtx.commit_tx().await;
2718 }
2719
2720 pub async fn verify_prevent_dirty_reads(db: Database) {
2721 let mut dbtx = db.begin_transaction().await;
2722
2723 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2724
2725 let mut dbtx2 = db.begin_transaction().await;
2727 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2728
2729 dbtx.commit_tx().await;
2731 }
2732
2733 pub async fn verify_find_by_range(db: Database) {
2734 let mut dbtx = db.begin_transaction().await;
2735 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2736 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2737 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2738
2739 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2740 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2741
2742 {
2743 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2744 module_dbtx
2745 .insert_entry(&TestKey(300), &TestVal(3000))
2746 .await;
2747 }
2748
2749 dbtx.commit_tx().await;
2750
2751 let mut dbtx = db.begin_transaction_nc().await;
2753
2754 let returned_keys = dbtx
2755 .find_by_range(TestKey(55)..TestKey(56))
2756 .await
2757 .collect::<Vec<_>>()
2758 .await;
2759
2760 let expected = vec![(TestKey(55), TestVal(9999))];
2761
2762 assert_eq!(returned_keys, expected);
2763
2764 let returned_keys = dbtx
2765 .find_by_range(TestKey(54)..TestKey(56))
2766 .await
2767 .collect::<Vec<_>>()
2768 .await;
2769
2770 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2771 assert_eq!(returned_keys, expected);
2772
2773 let returned_keys = dbtx
2774 .find_by_range(TestKey(54)..TestKey(57))
2775 .await
2776 .collect::<Vec<_>>()
2777 .await;
2778
2779 let expected = vec![
2780 (TestKey(54), TestVal(8888)),
2781 (TestKey(55), TestVal(9999)),
2782 (TestKey(56), TestVal(7777)),
2783 ];
2784 assert_eq!(returned_keys, expected);
2785
2786 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2787 let test_range = module_dbtx
2788 .find_by_range(TestKey(300)..TestKey(301))
2789 .await
2790 .collect::<Vec<_>>()
2791 .await;
2792 assert!(test_range.len() == 1);
2793 }
2794
2795 pub async fn verify_find_by_prefix(db: Database) {
2796 let mut dbtx = db.begin_transaction().await;
2797 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2798 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2799
2800 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2801 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2802 dbtx.commit_tx().await;
2803
2804 let mut dbtx = db.begin_transaction().await;
2806
2807 let returned_keys = dbtx
2808 .find_by_prefix(&DbPrefixTestPrefix)
2809 .await
2810 .collect::<Vec<_>>()
2811 .await;
2812
2813 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2814 assert_eq!(returned_keys, expected);
2815
2816 let reversed = dbtx
2817 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2818 .await
2819 .collect::<Vec<_>>()
2820 .await;
2821 let mut reversed_expected = expected;
2822 reversed_expected.reverse();
2823 assert_eq!(reversed, reversed_expected);
2824
2825 let returned_keys = dbtx
2826 .find_by_prefix(&AltDbPrefixTestPrefix)
2827 .await
2828 .collect::<Vec<_>>()
2829 .await;
2830
2831 let expected = vec![
2832 (AltTestKey(54), TestVal(6666)),
2833 (AltTestKey(55), TestVal(7777)),
2834 ];
2835 assert_eq!(returned_keys, expected);
2836
2837 let reversed = dbtx
2838 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2839 .await
2840 .collect::<Vec<_>>()
2841 .await;
2842 let mut reversed_expected = expected;
2843 reversed_expected.reverse();
2844 assert_eq!(reversed, reversed_expected);
2845 }
2846
2847 pub async fn verify_commit(db: Database) {
2848 let mut dbtx = db.begin_transaction().await;
2849
2850 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2851 dbtx.commit_tx().await;
2852
2853 let mut dbtx2 = db.begin_transaction().await;
2855 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2856 }
2857
2858 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2859 let mut dbtx = db.begin_transaction().await;
2860 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2861
2862 let mut dbtx2 = db.begin_transaction().await;
2863
2864 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2865
2866 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2867
2868 dbtx2.commit_tx().await;
2869
2870 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2873
2874 let expected_keys = 0;
2875 let returned_keys = dbtx
2876 .find_by_prefix(&DbPrefixTestPrefix)
2877 .await
2878 .fold(0, |returned_keys, (key, value)| async move {
2879 if key == TestKey(100) {
2880 assert!(value.eq(&TestVal(101)));
2881 }
2882 returned_keys + 1
2883 })
2884 .await;
2885
2886 assert_eq!(returned_keys, expected_keys);
2887 }
2888
2889 pub async fn verify_snapshot_isolation(db: Database) {
2890 async fn random_yield() {
2891 let times = if rand::thread_rng().gen_bool(0.5) {
2892 0
2893 } else {
2894 10
2895 };
2896 for _ in 0..times {
2897 tokio::task::yield_now().await;
2898 }
2899 }
2900
2901 for i in 0..1000 {
2903 let base_key = i * 2;
2904 let tx_accepted_key = base_key;
2905 let spent_input_key = base_key + 1;
2906
2907 join!(
2908 async {
2909 random_yield().await;
2910 let mut dbtx = db.begin_transaction().await;
2911
2912 random_yield().await;
2913 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2914 random_yield().await;
2915 let s = match i % 5 {
2918 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2919 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2920 2 => {
2921 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2922 .await
2923 }
2924 3 => {
2925 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2926 .await
2927 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2928 .map(|(_k, v)| v)
2929 .next()
2930 .await
2931 }
2932 4 => {
2933 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2934 .await
2935 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2936 .map(|(_k, v)| v)
2937 .next()
2938 .await
2939 }
2940 _ => {
2941 panic!("woot?");
2942 }
2943 };
2944
2945 match (a, s) {
2946 (None, None) | (Some(_), Some(_)) => {}
2947 (None, Some(_)) => panic!("none some?! {i}"),
2948 (Some(_), None) => panic!("some none?! {i}"),
2949 }
2950 },
2951 async {
2952 random_yield().await;
2953
2954 let mut dbtx = db.begin_transaction().await;
2955 random_yield().await;
2956 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2957
2958 random_yield().await;
2959 assert_eq!(
2960 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2961 .await,
2962 None
2963 );
2964
2965 random_yield().await;
2966 assert_eq!(
2967 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2968 .await,
2969 None
2970 );
2971 random_yield().await;
2972 dbtx.commit_tx().await;
2973 }
2974 );
2975 }
2976 }
2977
2978 pub async fn verify_phantom_entry(db: Database) {
2979 let mut dbtx = db.begin_transaction().await;
2980
2981 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2982
2983 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2984
2985 dbtx.commit_tx().await;
2986
2987 let mut dbtx = db.begin_transaction().await;
2988 let expected_keys = 2;
2989 let returned_keys = dbtx
2990 .find_by_prefix(&DbPrefixTestPrefix)
2991 .await
2992 .fold(0, |returned_keys, (key, value)| async move {
2993 match key {
2994 TestKey(100) => {
2995 assert!(value.eq(&TestVal(101)));
2996 }
2997 TestKey(101) => {
2998 assert!(value.eq(&TestVal(102)));
2999 }
3000 _ => {}
3001 }
3002 returned_keys + 1
3003 })
3004 .await;
3005
3006 assert_eq!(returned_keys, expected_keys);
3007
3008 let mut dbtx2 = db.begin_transaction().await;
3009
3010 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3011
3012 dbtx2.commit_tx().await;
3013
3014 let returned_keys = dbtx
3015 .find_by_prefix(&DbPrefixTestPrefix)
3016 .await
3017 .fold(0, |returned_keys, (key, value)| async move {
3018 match key {
3019 TestKey(100) => {
3020 assert!(value.eq(&TestVal(101)));
3021 }
3022 TestKey(101) => {
3023 assert!(value.eq(&TestVal(102)));
3024 }
3025 _ => {}
3026 }
3027 returned_keys + 1
3028 })
3029 .await;
3030
3031 assert_eq!(returned_keys, expected_keys);
3032 }
3033
3034 pub async fn expect_write_conflict(db: Database) {
3035 let mut dbtx = db.begin_transaction().await;
3036 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3037 dbtx.commit_tx().await;
3038
3039 let mut dbtx2 = db.begin_transaction().await;
3040 let mut dbtx3 = db.begin_transaction().await;
3041
3042 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3043
3044 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3048
3049 dbtx2.commit_tx().await;
3050 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3051 }
3052
3053 pub async fn verify_string_prefix(db: Database) {
3054 let mut dbtx = db.begin_transaction().await;
3055 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3056
3057 assert_eq!(
3058 dbtx.get_value(&PercentTestKey(100)).await,
3059 Some(TestVal(101))
3060 );
3061
3062 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3063
3064 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3065
3066 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3067
3068 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3071
3072 let expected_keys = 4;
3073 let returned_keys = dbtx
3074 .find_by_prefix(&PercentPrefixTestPrefix)
3075 .await
3076 .fold(0, |returned_keys, (key, value)| async move {
3077 if matches!(key, PercentTestKey(101)) {
3078 assert!(value.eq(&TestVal(100)));
3079 }
3080 returned_keys + 1
3081 })
3082 .await;
3083
3084 assert_eq!(returned_keys, expected_keys);
3085 }
3086
3087 pub async fn verify_remove_by_prefix(db: Database) {
3088 let mut dbtx = db.begin_transaction().await;
3089
3090 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3091
3092 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3093
3094 dbtx.commit_tx().await;
3095
3096 let mut remove_dbtx = db.begin_transaction().await;
3097 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3098 remove_dbtx.commit_tx().await;
3099
3100 let mut dbtx = db.begin_transaction().await;
3101 let expected_keys = 0;
3102 let returned_keys = dbtx
3103 .find_by_prefix(&DbPrefixTestPrefix)
3104 .await
3105 .fold(0, |returned_keys, (key, value)| async move {
3106 match key {
3107 TestKey(100) => {
3108 assert!(value.eq(&TestVal(101)));
3109 }
3110 TestKey(101) => {
3111 assert!(value.eq(&TestVal(102)));
3112 }
3113 _ => {}
3114 }
3115 returned_keys + 1
3116 })
3117 .await;
3118
3119 assert_eq!(returned_keys, expected_keys);
3120 }
3121
3122 pub async fn verify_module_db(db: Database, module_db: Database) {
3123 let mut dbtx = db.begin_transaction().await;
3124
3125 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3126
3127 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3128
3129 dbtx.commit_tx().await;
3130
3131 let mut module_dbtx = module_db.begin_transaction().await;
3133 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3134
3135 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3136
3137 let mut dbtx = db.begin_transaction().await;
3139 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3140
3141 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3142
3143 let mut module_dbtx = module_db.begin_transaction().await;
3144
3145 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3146
3147 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3148
3149 module_dbtx.commit_tx().await;
3150
3151 let expected_keys = 2;
3152 let mut dbtx = db.begin_transaction().await;
3153 let returned_keys = dbtx
3154 .find_by_prefix(&DbPrefixTestPrefix)
3155 .await
3156 .fold(0, |returned_keys, (key, value)| async move {
3157 match key {
3158 TestKey(100) => {
3159 assert!(value.eq(&TestVal(101)));
3160 }
3161 TestKey(101) => {
3162 assert!(value.eq(&TestVal(102)));
3163 }
3164 _ => {}
3165 }
3166 returned_keys + 1
3167 })
3168 .await;
3169
3170 assert_eq!(returned_keys, expected_keys);
3171
3172 let removed = dbtx.remove_entry(&TestKey(100)).await;
3173 assert_eq!(removed, Some(TestVal(101)));
3174 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3175
3176 let mut module_dbtx = module_db.begin_transaction().await;
3177 assert_eq!(
3178 module_dbtx.get_value(&TestKey(100)).await,
3179 Some(TestVal(103))
3180 );
3181 }
3182
3183 pub async fn verify_module_prefix(db: Database) {
3184 let mut test_dbtx = db.begin_transaction().await;
3185 {
3186 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3187
3188 test_module_dbtx
3189 .insert_entry(&TestKey(100), &TestVal(101))
3190 .await;
3191
3192 test_module_dbtx
3193 .insert_entry(&TestKey(101), &TestVal(102))
3194 .await;
3195 }
3196
3197 test_dbtx.commit_tx().await;
3198
3199 let mut alt_dbtx = db.begin_transaction().await;
3200 {
3201 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3202
3203 alt_module_dbtx
3204 .insert_entry(&TestKey(100), &TestVal(103))
3205 .await;
3206
3207 alt_module_dbtx
3208 .insert_entry(&TestKey(101), &TestVal(104))
3209 .await;
3210 }
3211
3212 alt_dbtx.commit_tx().await;
3213
3214 let mut test_dbtx = db.begin_transaction().await;
3216 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3217 assert_eq!(
3218 test_module_dbtx.get_value(&TestKey(100)).await,
3219 Some(TestVal(101))
3220 );
3221
3222 assert_eq!(
3223 test_module_dbtx.get_value(&TestKey(101)).await,
3224 Some(TestVal(102))
3225 );
3226
3227 let expected_keys = 2;
3228 let returned_keys = test_module_dbtx
3229 .find_by_prefix(&DbPrefixTestPrefix)
3230 .await
3231 .fold(0, |returned_keys, (key, value)| async move {
3232 match key {
3233 TestKey(100) => {
3234 assert!(value.eq(&TestVal(101)));
3235 }
3236 TestKey(101) => {
3237 assert!(value.eq(&TestVal(102)));
3238 }
3239 _ => {}
3240 }
3241 returned_keys + 1
3242 })
3243 .await;
3244
3245 assert_eq!(returned_keys, expected_keys);
3246
3247 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3248 assert_eq!(removed, Some(TestVal(101)));
3249 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3250
3251 let mut test_dbtx = db.begin_transaction().await;
3254 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3255
3256 test_dbtx.commit_tx().await;
3257 }
3258
3259 #[cfg(test)]
3260 #[tokio::test]
3261 pub async fn verify_test_migration() {
3262 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3264 let expected_test_keys_size: usize = 100;
3265 let mut dbtx = db.begin_transaction().await;
3266 for i in 0..expected_test_keys_size {
3267 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3268 .await;
3269 }
3270
3271 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3273 .await;
3274 dbtx.commit_tx().await;
3275
3276 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3277
3278 migrations.insert(
3279 DatabaseVersion(0),
3280 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3281 );
3282
3283 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3284 .await
3285 .expect("Error applying migrations for TestModule");
3286
3287 let mut dbtx = db.begin_transaction().await;
3289
3290 assert!(
3293 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3294 .await
3295 .is_some()
3296 );
3297
3298 let test_keys = dbtx
3300 .find_by_prefix(&DbPrefixTestPrefix)
3301 .await
3302 .collect::<Vec<_>>()
3303 .await;
3304 let test_keys_size = test_keys.len();
3305 assert_eq!(test_keys_size, expected_test_keys_size);
3306 for (key, val) in test_keys {
3307 assert_eq!(key.0, val.0 + 1);
3308 }
3309 }
3310
3311 #[allow(dead_code)]
3312 async fn migrate_test_db_version_0(
3313 mut ctx: DbMigrationFnContext<'_, ()>,
3314 ) -> std::result::Result<(), anyhow::Error> {
3315 let mut dbtx = ctx.dbtx();
3316 let example_keys_v0 = dbtx
3317 .find_by_prefix(&DbPrefixTestPrefixV0)
3318 .await
3319 .collect::<Vec<_>>()
3320 .await;
3321 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3322 for (key, val) in example_keys_v0 {
3323 let key_v2 = TestKey(key.1);
3324 dbtx.insert_new_entry(&key_v2, &val).await;
3325 }
3326 Ok(())
3327 }
3328
3329 #[cfg(test)]
3330 #[tokio::test]
3331 async fn test_autocommit() {
3332 use std::marker::PhantomData;
3333 use std::ops::Range;
3334 use std::path::Path;
3335
3336 use anyhow::anyhow;
3337 use async_trait::async_trait;
3338
3339 use crate::ModuleDecoderRegistry;
3340 use crate::db::{
3341 AutocommitError, BaseDatabaseTransaction, DatabaseError, DatabaseResult,
3342 IDatabaseTransaction, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
3343 IRawDatabase, IRawDatabaseTransaction,
3344 };
3345
3346 #[derive(Debug)]
3347 struct FakeDatabase;
3348
3349 #[async_trait]
3350 impl IRawDatabase for FakeDatabase {
3351 type Transaction<'a> = FakeTransaction<'a>;
3352 async fn begin_transaction(&self) -> FakeTransaction {
3353 FakeTransaction(PhantomData)
3354 }
3355
3356 fn checkpoint(&self, _backup_path: &Path) -> DatabaseResult<()> {
3357 Ok(())
3358 }
3359 }
3360
3361 #[derive(Debug)]
3362 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3363
3364 #[async_trait]
3365 impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3366 async fn raw_insert_bytes(
3367 &mut self,
3368 _key: &[u8],
3369 _value: &[u8],
3370 ) -> DatabaseResult<Option<Vec<u8>>> {
3371 unimplemented!()
3372 }
3373
3374 async fn raw_get_bytes(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3375 unimplemented!()
3376 }
3377
3378 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3379 unimplemented!()
3380 }
3381
3382 async fn raw_find_by_range(
3383 &mut self,
3384 _key_range: Range<&[u8]>,
3385 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3386 unimplemented!()
3387 }
3388
3389 async fn raw_find_by_prefix(
3390 &mut self,
3391 _key_prefix: &[u8],
3392 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3393 unimplemented!()
3394 }
3395
3396 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
3397 unimplemented!()
3398 }
3399
3400 async fn raw_find_by_prefix_sorted_descending(
3401 &mut self,
3402 _key_prefix: &[u8],
3403 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3404 unimplemented!()
3405 }
3406 }
3407
3408 impl IDatabaseTransactionOps for FakeTransaction<'_> {}
3409
3410 #[async_trait]
3411 impl IRawDatabaseTransaction for FakeTransaction<'_> {
3412 async fn commit_tx(self) -> DatabaseResult<()> {
3413 use crate::db::DatabaseError;
3414
3415 Err(DatabaseError::Other(anyhow::anyhow!("Can't commit!")))
3416 }
3417 }
3418
3419 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3420 let err = db
3421 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3422 .await
3423 .unwrap_err();
3424
3425 match err {
3426 AutocommitError::CommitFailed {
3427 attempts: failed_attempts,
3428 ..
3429 } => {
3430 assert_eq!(failed_attempts, 5);
3431 }
3432 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3433 }
3434 }
3435}
3436
3437pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3438 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3439 decoders: ModuleDecoderRegistry,
3440 key_prefix: &KP,
3441) -> impl Stream<
3442 Item = (
3443 KP::Record,
3444 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3445 ),
3446>
3447+ 'r
3448+ use<'r, KP>
3449where
3450 'inner: 'r,
3451 KP: DatabaseLookup,
3452 KP::Record: DatabaseKey,
3453{
3454 debug!(target: LOG_DB, "find by prefix sorted descending");
3455 let prefix_bytes = key_prefix.to_bytes();
3456 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3457 .await
3458 .expect("Error doing prefix search in database")
3459 .map(move |(key_bytes, value_bytes)| {
3460 let key = decode_key_expect(&key_bytes, &decoders);
3461 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3462 (key, value)
3463 })
3464}
3465
3466pub async fn verify_module_db_integrity_dbtx(
3467 dbtx: &mut DatabaseTransaction<'_>,
3468 module_id: ModuleInstanceId,
3469 module_kind: ModuleKind,
3470 prefixes: &BTreeSet<u8>,
3471) {
3472 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3473 if module_id < 250 {
3474 assert_eq!(module_db_prefix.len(), 2);
3475 }
3476 let mut records = dbtx
3477 .raw_find_by_prefix(&module_db_prefix)
3478 .await
3479 .expect("DB fail");
3480 while let Some((k, v)) = records.next().await {
3481 assert!(
3482 prefixes.contains(&k[module_db_prefix.len()]),
3483 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3484 k.as_hex(),
3485 v.as_hex()
3486 );
3487 }
3488}
3489
3490#[cfg(test)]
3491mod tests;