1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use bitcoin::hex::DisplayHex as _;
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use fedimint_util_error::FmtCompact as _;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144pub type DatabaseResult<T> = std::result::Result<T, DatabaseError>;
146
147pub trait DatabaseKeyPrefix: Debug {
148 fn to_bytes(&self) -> Vec<u8>;
149}
150
151pub trait DatabaseRecord: DatabaseKeyPrefix {
154 const DB_PREFIX: u8;
155 const NOTIFY_ON_MODIFY: bool = false;
156 type Key: DatabaseKey + Debug;
157 type Value: DatabaseValue + Debug;
158}
159
160pub trait DatabaseLookup: DatabaseKeyPrefix {
163 type Record: DatabaseRecord;
164}
165
166impl<Record> DatabaseLookup for Record
168where
169 Record: DatabaseRecord + Debug + Decodable + Encodable,
170{
171 type Record = Record;
172}
173
174pub trait DatabaseKey: Sized {
177 const NOTIFY_ON_MODIFY: bool = false;
185 fn from_bytes(
186 data: &[u8],
187 modules: &ModuleDecoderRegistry,
188 ) -> std::result::Result<Self, DecodingError>;
189}
190
191pub trait DatabaseKeyWithNotify {}
193
194pub trait DatabaseValue: Sized + Debug {
196 fn from_bytes(
197 data: &[u8],
198 modules: &ModuleDecoderRegistry,
199 ) -> std::result::Result<Self, DecodingError>;
200 fn to_bytes(&self) -> Vec<u8>;
201}
202
203pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
204
205pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
209
210#[derive(Debug, Error)]
212pub enum AutocommitError<E> {
213 #[error("Commit Failed: {last_error}")]
215 CommitFailed {
216 attempts: usize,
218 last_error: DatabaseError,
220 },
221 #[error("Closure error: {error}")]
224 ClosureError {
225 attempts: usize,
231 error: E,
233 },
234}
235
236pub trait AutocommitResultExt<T, E> {
237 fn unwrap_autocommit(self) -> std::result::Result<T, E>;
241}
242
243impl<T, E> AutocommitResultExt<T, E> for std::result::Result<T, AutocommitError<E>> {
244 fn unwrap_autocommit(self) -> std::result::Result<T, E> {
245 match self {
246 Ok(value) => Ok(value),
247 Err(AutocommitError::CommitFailed { .. }) => {
248 panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
249 }
250 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
251 }
252 }
253}
254
255#[apply(async_trait_maybe_send!)]
264pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
265 type Transaction<'a>: IRawDatabaseTransaction + Debug;
267
268 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
270
271 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
273}
274
275#[apply(async_trait_maybe_send!)]
276impl<T> IRawDatabase for Box<T>
277where
278 T: IRawDatabase,
279{
280 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
281
282 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
283 (**self).begin_transaction().await
284 }
285
286 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287 (**self).checkpoint(backup_path)
288 }
289}
290
291pub trait IRawDatabaseExt: IRawDatabase + Sized {
293 fn into_database(self) -> Database {
297 Database::new(self, ModuleRegistry::default())
298 }
299}
300
301impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
302
303impl<T> From<T> for Database
304where
305 T: IRawDatabase,
306{
307 fn from(raw: T) -> Self {
308 Self::new(raw, ModuleRegistry::default())
309 }
310}
311
312#[apply(async_trait_maybe_send!)]
315pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
316 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
318 async fn register(&self, key: &[u8]);
320 async fn notify(&self, key: &[u8]);
322
323 fn is_global(&self) -> bool;
326
327 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
329}
330
331#[apply(async_trait_maybe_send!)]
332impl<T> IDatabase for Arc<T>
333where
334 T: IDatabase + ?Sized,
335{
336 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
337 (**self).begin_transaction().await
338 }
339 async fn register(&self, key: &[u8]) {
340 (**self).register(key).await;
341 }
342 async fn notify(&self, key: &[u8]) {
343 (**self).notify(key).await;
344 }
345
346 fn is_global(&self) -> bool {
347 (**self).is_global()
348 }
349
350 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
351 (**self).checkpoint(backup_path)
352 }
353}
354
355struct BaseDatabase<RawDatabase> {
359 notifications: Arc<Notifications>,
360 raw: RawDatabase,
361}
362
363impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 f.write_str("BaseDatabase")
366 }
367}
368
369#[apply(async_trait_maybe_send!)]
370impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
371 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
372 Box::new(BaseDatabaseTransaction::new(
373 self.raw.begin_transaction().await,
374 self.notifications.clone(),
375 ))
376 }
377 async fn register(&self, key: &[u8]) {
378 self.notifications.register(key).await;
379 }
380 async fn notify(&self, key: &[u8]) {
381 self.notifications.notify(key);
382 }
383
384 fn is_global(&self) -> bool {
385 true
386 }
387
388 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
389 self.raw.checkpoint(backup_path)
390 }
391}
392
393#[derive(Clone, Debug)]
399pub struct Database {
400 inner: Arc<dyn IDatabase + 'static>,
401 module_decoders: ModuleDecoderRegistry,
402}
403
404impl Database {
405 pub fn strong_count(&self) -> usize {
406 Arc::strong_count(&self.inner)
407 }
408
409 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
410 self.inner
411 }
412}
413
414impl Database {
415 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
420 let inner = BaseDatabase {
421 raw,
422 notifications: Arc::new(Notifications::new()),
423 };
424 Self::new_from_arc(
425 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
426 module_decoders,
427 )
428 }
429
430 pub fn new_from_arc(
432 inner: Arc<dyn IDatabase + 'static>,
433 module_decoders: ModuleDecoderRegistry,
434 ) -> Self {
435 Self {
436 inner,
437 module_decoders,
438 }
439 }
440
441 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
443 Self {
444 inner: Arc::new(PrefixDatabase {
445 inner: self.inner.clone(),
446 global_dbtx_access_token: None,
447 prefix,
448 }),
449 module_decoders: self.module_decoders.clone(),
450 }
451 }
452
453 pub fn with_prefix_module_id(
457 &self,
458 module_instance_id: ModuleInstanceId,
459 ) -> (Self, GlobalDBTxAccessToken) {
460 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
461 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
462 (
463 Self {
464 inner: Arc::new(PrefixDatabase {
465 inner: self.inner.clone(),
466 global_dbtx_access_token: Some(global_dbtx_access_token),
467 prefix,
468 }),
469 module_decoders: self.module_decoders.clone(),
470 },
471 global_dbtx_access_token,
472 )
473 }
474
475 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
476 Self {
477 inner: self.inner.clone(),
478 module_decoders,
479 }
480 }
481
482 pub fn is_global(&self) -> bool {
484 self.inner.is_global()
485 }
486
487 pub fn ensure_global(&self) -> DatabaseResult<()> {
489 if !self.is_global() {
490 return Err(DatabaseError::Other(anyhow::anyhow!(
491 "Database instance not global"
492 )));
493 }
494
495 Ok(())
496 }
497
498 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
500 if self.is_global() {
501 return Err(DatabaseError::Other(anyhow::anyhow!(
502 "Database instance not isolated"
503 )));
504 }
505
506 Ok(())
507 }
508
509 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
511 where
512 's: 'tx,
513 {
514 DatabaseTransaction::<Committable>::new(
515 self.inner.begin_transaction().await,
516 self.module_decoders.clone(),
517 )
518 }
519
520 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
522 where
523 's: 'tx,
524 {
525 self.begin_transaction().await.into_nc()
526 }
527
528 pub fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
529 self.inner.checkpoint(backup_path)
530 }
531
532 pub async fn autocommit<'s, 'dbtx, F, T, E>(
560 &'s self,
561 tx_fn: F,
562 max_attempts: Option<usize>,
563 ) -> std::result::Result<T, AutocommitError<E>>
564 where
565 's: 'dbtx,
566 for<'r, 'o> F: Fn(
567 &'r mut DatabaseTransaction<'o>,
568 PhantomBound<'dbtx, 'o>,
569 ) -> BoxFuture<'r, std::result::Result<T, E>>,
570 {
571 assert_ne!(max_attempts, Some(0));
572 let mut curr_attempts: usize = 0;
573
574 loop {
575 curr_attempts = curr_attempts
580 .checked_add(1)
581 .expect("db autocommit attempt counter overflowed");
582
583 let mut dbtx = self.begin_transaction().await;
584
585 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
586 let val = match tx_fn_res {
587 Ok(val) => val,
588 Err(err) => {
589 dbtx.ignore_uncommitted();
590 return Err(AutocommitError::ClosureError {
591 attempts: curr_attempts,
592 error: err,
593 });
594 }
595 };
596
597 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
598
599 match dbtx.commit_tx_result().await {
600 Ok(()) => {
601 return Ok(val);
602 }
603 Err(err) => {
604 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
605 warn!(
606 target: LOG_DB,
607 curr_attempts,
608 err = %err.fmt_compact(),
609 "Database commit failed in an autocommit block - terminating"
610 );
611 return Err(AutocommitError::CommitFailed {
612 attempts: curr_attempts,
613 last_error: err,
614 });
615 }
616
617 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
618 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
619 warn!(
620 target: LOG_DB,
621 curr_attempts,
622 err = %err.fmt_compact(),
623 delay_ms = %delay,
624 "Database commit failed in an autocommit block - retrying"
625 );
626 crate::runtime::sleep(Duration::from_millis(delay)).await;
627 }
628 }
629 }
630 }
631
632 pub async fn wait_key_check<'a, K, T>(
637 &'a self,
638 key: &K,
639 checker: impl Fn(Option<K::Value>) -> Option<T>,
640 ) -> (T, DatabaseTransaction<'a, Committable>)
641 where
642 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
643 {
644 let key_bytes = key.to_bytes();
645 loop {
646 let notify = self.inner.register(&key_bytes);
648
649 let mut tx = self.inner.begin_transaction().await;
651
652 let maybe_value_bytes = tx
653 .raw_get_bytes(&key_bytes)
654 .await
655 .expect("Unrecoverable error when reading from database")
656 .map(|value_bytes| {
657 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
658 });
659
660 if let Some(value) = checker(maybe_value_bytes) {
661 return (
662 value,
663 DatabaseTransaction::new(tx, self.module_decoders.clone()),
664 );
665 }
666
667 notify.await;
669 }
672 }
673
674 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
676 where
677 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
678 {
679 self.wait_key_check(key, std::convert::identity).await.0
680 }
681}
682
683fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
684 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
685 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
686 bytes
687}
688
689#[derive(Clone, Debug)]
692struct PrefixDatabase<Inner>
693where
694 Inner: Debug,
695{
696 prefix: Vec<u8>,
697 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
698 inner: Inner,
699}
700
701impl<Inner> PrefixDatabase<Inner>
702where
703 Inner: Debug,
704{
705 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
709 let mut full_key = self.prefix.clone();
710 full_key.extend_from_slice(key);
711 full_key
712 }
713}
714
715#[apply(async_trait_maybe_send!)]
716impl<Inner> IDatabase for PrefixDatabase<Inner>
717where
718 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
719{
720 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
721 Box::new(PrefixDatabaseTransaction {
722 inner: self.inner.begin_transaction().await,
723 global_dbtx_access_token: self.global_dbtx_access_token,
724 prefix: self.prefix.clone(),
725 })
726 }
727 async fn register(&self, key: &[u8]) {
728 self.inner.register(&self.get_full_key(key)).await;
729 }
730
731 async fn notify(&self, key: &[u8]) {
732 self.inner.notify(&self.get_full_key(key)).await;
733 }
734
735 fn is_global(&self) -> bool {
736 if self.global_dbtx_access_token.is_some() {
737 false
738 } else {
739 self.inner.is_global()
740 }
741 }
742
743 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
744 self.inner.checkpoint(backup_path)
745 }
746}
747
748#[derive(Debug)]
753struct PrefixDatabaseTransaction<Inner> {
754 inner: Inner,
755 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
756 prefix: Vec<u8>,
757}
758
759impl<Inner> PrefixDatabaseTransaction<Inner> {
760 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
764 let mut full_key = self.prefix.clone();
765 full_key.extend_from_slice(key);
766 full_key
767 }
768
769 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
770 Range {
771 start: self.get_full_key(range.start),
772 end: self.get_full_key(range.end),
773 }
774 }
775
776 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
777 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
778 }
779}
780
781#[apply(async_trait_maybe_send!)]
782impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
783where
784 Inner: IDatabaseTransaction,
785{
786 async fn commit_tx(&mut self) -> DatabaseResult<()> {
787 self.inner.commit_tx().await
788 }
789
790 fn is_global(&self) -> bool {
791 if self.global_dbtx_access_token.is_some() {
792 false
793 } else {
794 self.inner.is_global()
795 }
796 }
797
798 fn global_dbtx(
799 &mut self,
800 access_token: GlobalDBTxAccessToken,
801 ) -> &mut dyn IDatabaseTransaction {
802 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
803 assert_eq!(
804 access_token, self_global_dbtx_access_token,
805 "Invalid access key used to access global_dbtx"
806 );
807 &mut self.inner
808 } else {
809 self.inner.global_dbtx(access_token)
810 }
811 }
812}
813
814#[apply(async_trait_maybe_send!)]
815impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
816where
817 Inner: IDatabaseTransactionOpsCore,
818{
819 async fn raw_insert_bytes(
820 &mut self,
821 key: &[u8],
822 value: &[u8],
823 ) -> DatabaseResult<Option<Vec<u8>>> {
824 let key = self.get_full_key(key);
825 self.inner.raw_insert_bytes(&key, value).await
826 }
827
828 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
829 let key = self.get_full_key(key);
830 self.inner.raw_get_bytes(&key).await
831 }
832
833 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
834 let key = self.get_full_key(key);
835 self.inner.raw_remove_entry(&key).await
836 }
837
838 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
839 let key = self.get_full_key(key_prefix);
840 let stream = self.inner.raw_find_by_prefix(&key).await?;
841 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
842 }
843
844 async fn raw_find_by_prefix_sorted_descending(
845 &mut self,
846 key_prefix: &[u8],
847 ) -> DatabaseResult<PrefixStream<'_>> {
848 let key = self.get_full_key(key_prefix);
849 let stream = self
850 .inner
851 .raw_find_by_prefix_sorted_descending(&key)
852 .await?;
853 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
854 }
855
856 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
857 let range = self.get_full_range(range);
858 let stream = self
859 .inner
860 .raw_find_by_range(Range {
861 start: &range.start,
862 end: &range.end,
863 })
864 .await?;
865 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
866 }
867
868 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
869 let key = self.get_full_key(key_prefix);
870 self.inner.raw_remove_by_prefix(&key).await
871 }
872}
873
874impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner> where
875 Inner: IDatabaseTransactionOps
876{
877}
878
879#[apply(async_trait_maybe_send!)]
883pub trait IDatabaseTransactionOpsCore: MaybeSend {
884 async fn raw_insert_bytes(
886 &mut self,
887 key: &[u8],
888 value: &[u8],
889 ) -> DatabaseResult<Option<Vec<u8>>>;
890
891 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
893
894 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
896
897 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>>;
900
901 async fn raw_find_by_prefix_sorted_descending(
903 &mut self,
904 key_prefix: &[u8],
905 ) -> DatabaseResult<PrefixStream<'_>>;
906
907 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>>;
911
912 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()>;
914}
915
916#[apply(async_trait_maybe_send!)]
917impl<T> IDatabaseTransactionOpsCore for Box<T>
918where
919 T: IDatabaseTransactionOpsCore + ?Sized,
920{
921 async fn raw_insert_bytes(
922 &mut self,
923 key: &[u8],
924 value: &[u8],
925 ) -> DatabaseResult<Option<Vec<u8>>> {
926 (**self).raw_insert_bytes(key, value).await
927 }
928
929 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
930 (**self).raw_get_bytes(key).await
931 }
932
933 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
934 (**self).raw_remove_entry(key).await
935 }
936
937 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
938 (**self).raw_find_by_prefix(key_prefix).await
939 }
940
941 async fn raw_find_by_prefix_sorted_descending(
942 &mut self,
943 key_prefix: &[u8],
944 ) -> DatabaseResult<PrefixStream<'_>> {
945 (**self)
946 .raw_find_by_prefix_sorted_descending(key_prefix)
947 .await
948 }
949
950 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
951 (**self).raw_find_by_range(range).await
952 }
953
954 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
955 (**self).raw_remove_by_prefix(key_prefix).await
956 }
957}
958
959#[apply(async_trait_maybe_send!)]
960impl<T> IDatabaseTransactionOpsCore for &mut T
961where
962 T: IDatabaseTransactionOpsCore + ?Sized,
963{
964 async fn raw_insert_bytes(
965 &mut self,
966 key: &[u8],
967 value: &[u8],
968 ) -> DatabaseResult<Option<Vec<u8>>> {
969 (**self).raw_insert_bytes(key, value).await
970 }
971
972 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
973 (**self).raw_get_bytes(key).await
974 }
975
976 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
977 (**self).raw_remove_entry(key).await
978 }
979
980 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
981 (**self).raw_find_by_prefix(key_prefix).await
982 }
983
984 async fn raw_find_by_prefix_sorted_descending(
985 &mut self,
986 key_prefix: &[u8],
987 ) -> DatabaseResult<PrefixStream<'_>> {
988 (**self)
989 .raw_find_by_prefix_sorted_descending(key_prefix)
990 .await
991 }
992
993 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
994 (**self).raw_find_by_range(range).await
995 }
996
997 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
998 (**self).raw_remove_by_prefix(key_prefix).await
999 }
1000}
1001
1002pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {}
1008
1009impl<T> IDatabaseTransactionOps for Box<T> where T: IDatabaseTransactionOps + ?Sized {}
1010
1011impl<T> IDatabaseTransactionOps for &mut T where T: IDatabaseTransactionOps + ?Sized {}
1012
1013#[apply(async_trait_maybe_send!)]
1019pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1020 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1021 where
1022 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1023
1024 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1025 where
1026 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1027 K::Value: MaybeSend + MaybeSync;
1028
1029 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1030 where
1031 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1032 K::Value: MaybeSend + MaybeSync;
1033
1034 async fn find_by_range<K>(
1035 &mut self,
1036 key_range: Range<K>,
1037 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1038 where
1039 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1040 K::Value: MaybeSend + MaybeSync;
1041
1042 async fn find_by_prefix<KP>(
1043 &mut self,
1044 key_prefix: &KP,
1045 ) -> Pin<
1046 Box<
1047 maybe_add_send!(
1048 dyn Stream<
1049 Item = (
1050 KP::Record,
1051 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1052 ),
1053 > + '_
1054 ),
1055 >,
1056 >
1057 where
1058 KP: DatabaseLookup + MaybeSend + MaybeSync,
1059 KP::Record: DatabaseKey;
1060
1061 async fn find_by_prefix_sorted_descending<KP>(
1062 &mut self,
1063 key_prefix: &KP,
1064 ) -> Pin<
1065 Box<
1066 maybe_add_send!(
1067 dyn Stream<
1068 Item = (
1069 KP::Record,
1070 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1071 ),
1072 > + '_
1073 ),
1074 >,
1075 >
1076 where
1077 KP: DatabaseLookup + MaybeSend + MaybeSync,
1078 KP::Record: DatabaseKey;
1079
1080 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1081 where
1082 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1083
1084 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1085 where
1086 KP: DatabaseLookup + MaybeSend + MaybeSync;
1087}
1088
1089#[apply(async_trait_maybe_send!)]
1092impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1093where
1094 T: IDatabaseTransactionOpsCore + WithDecoders,
1095{
1096 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1097 where
1098 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1099 {
1100 let key_bytes = key.to_bytes();
1101 let raw = self
1102 .raw_get_bytes(&key_bytes)
1103 .await
1104 .expect("Unrecoverable error occurred while reading and entry from the database");
1105 raw.map(|value_bytes| {
1106 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1107 })
1108 }
1109
1110 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1111 where
1112 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1113 K::Value: MaybeSend + MaybeSync,
1114 {
1115 let key_bytes = key.to_bytes();
1116 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1117 .await
1118 .expect("Unrecoverable error occurred while inserting entry into the database")
1119 .map(|value_bytes| {
1120 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1121 })
1122 }
1123
1124 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1125 where
1126 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1127 K::Value: MaybeSend + MaybeSync,
1128 {
1129 if let Some(prev) = self.insert_entry(key, value).await {
1130 panic!(
1131 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1132 );
1133 }
1134 }
1135
1136 async fn find_by_range<K>(
1137 &mut self,
1138 key_range: Range<K>,
1139 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1140 where
1141 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1142 K::Value: MaybeSend + MaybeSync,
1143 {
1144 let decoders = self.decoders().clone();
1145 Box::pin(
1146 self.raw_find_by_range(Range {
1147 start: &key_range.start.to_bytes(),
1148 end: &key_range.end.to_bytes(),
1149 })
1150 .await
1151 .expect("Unrecoverable error occurred while listing entries from the database")
1152 .map(move |(key_bytes, value_bytes)| {
1153 let key = decode_key_expect(&key_bytes, &decoders);
1154 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1155 (key, value)
1156 }),
1157 )
1158 }
1159
1160 async fn find_by_prefix<KP>(
1161 &mut self,
1162 key_prefix: &KP,
1163 ) -> Pin<
1164 Box<
1165 maybe_add_send!(
1166 dyn Stream<
1167 Item = (
1168 KP::Record,
1169 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1170 ),
1171 > + '_
1172 ),
1173 >,
1174 >
1175 where
1176 KP: DatabaseLookup + MaybeSend + MaybeSync,
1177 KP::Record: DatabaseKey,
1178 {
1179 let decoders = self.decoders().clone();
1180 Box::pin(
1181 self.raw_find_by_prefix(&key_prefix.to_bytes())
1182 .await
1183 .expect("Unrecoverable error occurred while listing entries from the database")
1184 .map(move |(key_bytes, value_bytes)| {
1185 let key = decode_key_expect(&key_bytes, &decoders);
1186 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1187 (key, value)
1188 }),
1189 )
1190 }
1191
1192 async fn find_by_prefix_sorted_descending<KP>(
1193 &mut self,
1194 key_prefix: &KP,
1195 ) -> Pin<
1196 Box<
1197 maybe_add_send!(
1198 dyn Stream<
1199 Item = (
1200 KP::Record,
1201 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1202 ),
1203 > + '_
1204 ),
1205 >,
1206 >
1207 where
1208 KP: DatabaseLookup + MaybeSend + MaybeSync,
1209 KP::Record: DatabaseKey,
1210 {
1211 let decoders = self.decoders().clone();
1212 Box::pin(
1213 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1214 .await
1215 .expect("Unrecoverable error occurred while listing entries from the database")
1216 .map(move |(key_bytes, value_bytes)| {
1217 let key = decode_key_expect(&key_bytes, &decoders);
1218 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1219 (key, value)
1220 }),
1221 )
1222 }
1223 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1224 where
1225 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1226 {
1227 let key_bytes = key.to_bytes();
1228 self.raw_remove_entry(&key_bytes)
1229 .await
1230 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1231 .map(|value_bytes| {
1232 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1233 })
1234 }
1235 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1236 where
1237 KP: DatabaseLookup + MaybeSend + MaybeSync,
1238 {
1239 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1240 .await
1241 .expect("Unrecoverable error when removing entries from the database");
1242 }
1243}
1244
1245pub trait WithDecoders {
1248 fn decoders(&self) -> &ModuleDecoderRegistry;
1249}
1250
1251#[apply(async_trait_maybe_send!)]
1253pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1254 async fn commit_tx(self) -> DatabaseResult<()>;
1255}
1256
1257#[apply(async_trait_maybe_send!)]
1261pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1262 async fn commit_tx(&mut self) -> DatabaseResult<()>;
1264
1265 fn is_global(&self) -> bool;
1267
1268 #[doc(hidden)]
1273 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1274 -> &mut dyn IDatabaseTransaction;
1275}
1276
1277#[apply(async_trait_maybe_send!)]
1278impl<T> IDatabaseTransaction for Box<T>
1279where
1280 T: IDatabaseTransaction + ?Sized,
1281{
1282 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1283 (**self).commit_tx().await
1284 }
1285
1286 fn is_global(&self) -> bool {
1287 (**self).is_global()
1288 }
1289
1290 fn global_dbtx(
1291 &mut self,
1292 access_token: GlobalDBTxAccessToken,
1293 ) -> &mut dyn IDatabaseTransaction {
1294 (**self).global_dbtx(access_token)
1295 }
1296}
1297
1298#[apply(async_trait_maybe_send!)]
1299impl<'a, T> IDatabaseTransaction for &'a mut T
1300where
1301 T: IDatabaseTransaction + ?Sized,
1302{
1303 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1304 (**self).commit_tx().await
1305 }
1306
1307 fn is_global(&self) -> bool {
1308 (**self).is_global()
1309 }
1310
1311 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1312 (**self).global_dbtx(access_key)
1313 }
1314}
1315
1316struct BaseDatabaseTransaction<Tx> {
1319 raw: Option<Tx>,
1321 notify_queue: Option<NotifyQueue>,
1322 notifications: Arc<Notifications>,
1323}
1324
1325impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1326where
1327 Tx: fmt::Debug,
1328{
1329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330 f.write_fmt(format_args!(
1331 "BaseDatabaseTransaction{{ raw={:?} }}",
1332 self.raw
1333 ))
1334 }
1335}
1336impl<Tx> BaseDatabaseTransaction<Tx>
1337where
1338 Tx: IRawDatabaseTransaction,
1339{
1340 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1341 Self {
1342 raw: Some(dbtx),
1343 notifications,
1344 notify_queue: Some(NotifyQueue::new()),
1345 }
1346 }
1347
1348 fn add_notification_key(&mut self, key: &[u8]) -> DatabaseResult<()> {
1349 self.notify_queue
1350 .as_mut()
1351 .ok_or(DatabaseError::TransactionConsumed)?
1352 .add(key);
1353 Ok(())
1354 }
1355}
1356
1357#[apply(async_trait_maybe_send!)]
1358impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1359 async fn raw_insert_bytes(
1360 &mut self,
1361 key: &[u8],
1362 value: &[u8],
1363 ) -> DatabaseResult<Option<Vec<u8>>> {
1364 self.add_notification_key(key)?;
1365 self.raw
1366 .as_mut()
1367 .ok_or(DatabaseError::TransactionConsumed)?
1368 .raw_insert_bytes(key, value)
1369 .await
1370 }
1371
1372 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1373 self.raw
1374 .as_mut()
1375 .ok_or(DatabaseError::TransactionConsumed)?
1376 .raw_get_bytes(key)
1377 .await
1378 }
1379
1380 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1381 self.add_notification_key(key)?;
1382 self.raw
1383 .as_mut()
1384 .ok_or(DatabaseError::TransactionConsumed)?
1385 .raw_remove_entry(key)
1386 .await
1387 }
1388
1389 async fn raw_find_by_range(
1390 &mut self,
1391 key_range: Range<&[u8]>,
1392 ) -> DatabaseResult<PrefixStream<'_>> {
1393 self.raw
1394 .as_mut()
1395 .ok_or(DatabaseError::TransactionConsumed)?
1396 .raw_find_by_range(key_range)
1397 .await
1398 }
1399
1400 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1401 self.raw
1402 .as_mut()
1403 .ok_or(DatabaseError::TransactionConsumed)?
1404 .raw_find_by_prefix(key_prefix)
1405 .await
1406 }
1407
1408 async fn raw_find_by_prefix_sorted_descending(
1409 &mut self,
1410 key_prefix: &[u8],
1411 ) -> DatabaseResult<PrefixStream<'_>> {
1412 self.raw
1413 .as_mut()
1414 .ok_or(DatabaseError::TransactionConsumed)?
1415 .raw_find_by_prefix_sorted_descending(key_prefix)
1416 .await
1417 }
1418
1419 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1420 self.raw
1421 .as_mut()
1422 .ok_or(DatabaseError::TransactionConsumed)?
1423 .raw_remove_by_prefix(key_prefix)
1424 .await
1425 }
1426}
1427
1428impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {}
1429
1430#[apply(async_trait_maybe_send!)]
1431impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1432 for BaseDatabaseTransaction<Tx>
1433{
1434 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1435 self.raw
1436 .take()
1437 .ok_or(DatabaseError::TransactionConsumed)?
1438 .commit_tx()
1439 .await?;
1440 self.notifications.submit_queue(
1441 &self
1442 .notify_queue
1443 .take()
1444 .expect("commit must be called only once"),
1445 );
1446 Ok(())
1447 }
1448
1449 fn is_global(&self) -> bool {
1450 true
1451 }
1452
1453 fn global_dbtx(
1454 &mut self,
1455 _access_token: GlobalDBTxAccessToken,
1456 ) -> &mut dyn IDatabaseTransaction {
1457 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1458 }
1459}
1460
1461#[derive(Clone)]
1464struct CommitTracker {
1465 is_committed: bool,
1467 has_writes: bool,
1469 ignore_uncommitted: bool,
1471}
1472
1473impl Drop for CommitTracker {
1474 fn drop(&mut self) {
1475 if self.has_writes && !self.is_committed {
1476 if self.ignore_uncommitted {
1477 trace!(
1478 target: LOG_DB,
1479 "DatabaseTransaction has writes and has not called commit, but that's expected."
1480 );
1481 } else {
1482 warn!(
1483 target: LOG_DB,
1484 location = ?backtrace::Backtrace::new(),
1485 "DatabaseTransaction has writes and has not called commit."
1486 );
1487 }
1488 }
1489 }
1490}
1491
1492enum MaybeRef<'a, T> {
1493 Owned(T),
1494 Borrowed(&'a mut T),
1495}
1496
1497impl<T> ops::Deref for MaybeRef<'_, T> {
1498 type Target = T;
1499
1500 fn deref(&self) -> &Self::Target {
1501 match self {
1502 MaybeRef::Owned(o) => o,
1503 MaybeRef::Borrowed(r) => r,
1504 }
1505 }
1506}
1507
1508impl<T> ops::DerefMut for MaybeRef<'_, T> {
1509 fn deref_mut(&mut self) -> &mut Self::Target {
1510 match self {
1511 MaybeRef::Owned(o) => o,
1512 MaybeRef::Borrowed(r) => r,
1513 }
1514 }
1515}
1516
1517pub struct Committable;
1521
1522pub struct NonCommittable;
1526
1527pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1531 tx: Box<dyn IDatabaseTransaction + 'tx>,
1532 decoders: ModuleDecoderRegistry,
1533 commit_tracker: MaybeRef<'tx, CommitTracker>,
1534 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1535 capability: marker::PhantomData<Cap>,
1536}
1537
1538impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540 f.write_fmt(format_args!(
1541 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1542 self.tx, self.decoders
1543 ))
1544 }
1545}
1546
1547impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1548 fn decoders(&self) -> &ModuleDecoderRegistry {
1549 &self.decoders
1550 }
1551}
1552
1553#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1554fn decode_value<V: DatabaseValue>(
1555 value_bytes: &[u8],
1556 decoders: &ModuleDecoderRegistry,
1557) -> std::result::Result<V, DecodingError> {
1558 trace!(
1559 bytes = %AbbreviateHexBytes(value_bytes),
1560 "decoding value",
1561 );
1562 V::from_bytes(value_bytes, decoders)
1563}
1564
1565#[track_caller]
1566fn decode_value_expect<V: DatabaseValue>(
1567 value_bytes: &[u8],
1568 decoders: &ModuleDecoderRegistry,
1569 key_bytes: &[u8],
1570) -> V {
1571 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1572 panic!(
1573 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1574 any::type_name::<V>(),
1575 err,
1576 AbbreviateHexBytes(key_bytes),
1577 AbbreviateHexBytes(value_bytes),
1578 )
1579 })
1580}
1581
1582#[track_caller]
1583fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1584 trace!(
1585 bytes = %AbbreviateHexBytes(key_bytes),
1586 "decoding key",
1587 );
1588 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1589 panic!(
1590 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1591 any::type_name::<K>(),
1592 err,
1593 AbbreviateHexBytes(key_bytes)
1594 )
1595 })
1596}
1597
1598impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1599 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1601 DatabaseTransaction {
1602 tx: self.tx,
1603 decoders: self.decoders,
1604 commit_tracker: self.commit_tracker,
1605 on_commit_hooks: self.on_commit_hooks,
1606 capability: PhantomData::<NonCommittable>,
1607 }
1608 }
1609
1610 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1612 where
1613 's: 'a,
1614 {
1615 self.to_ref().into_nc()
1616 }
1617
1618 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1620 where
1621 'tx: 'a,
1622 {
1623 DatabaseTransaction {
1624 tx: Box::new(PrefixDatabaseTransaction {
1625 inner: self.tx,
1626 global_dbtx_access_token: None,
1627 prefix,
1628 }),
1629 decoders: self.decoders,
1630 commit_tracker: self.commit_tracker,
1631 on_commit_hooks: self.on_commit_hooks,
1632 capability: self.capability,
1633 }
1634 }
1635
1636 pub fn with_prefix_module_id<'a: 'tx>(
1640 self,
1641 module_instance_id: ModuleInstanceId,
1642 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1643 where
1644 'tx: 'a,
1645 {
1646 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1647 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1648 (
1649 DatabaseTransaction {
1650 tx: Box::new(PrefixDatabaseTransaction {
1651 inner: self.tx,
1652 global_dbtx_access_token: Some(global_dbtx_access_token),
1653 prefix,
1654 }),
1655 decoders: self.decoders,
1656 commit_tracker: self.commit_tracker,
1657 on_commit_hooks: self.on_commit_hooks,
1658 capability: self.capability,
1659 },
1660 global_dbtx_access_token,
1661 )
1662 }
1663
1664 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1666 where
1667 's: 'a,
1668 {
1669 let decoders = self.decoders.clone();
1670
1671 DatabaseTransaction {
1672 tx: Box::new(&mut self.tx),
1673 decoders,
1674 commit_tracker: match self.commit_tracker {
1675 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1676 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1677 },
1678 on_commit_hooks: match self.on_commit_hooks {
1679 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1680 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1681 },
1682 capability: self.capability,
1683 }
1684 }
1685
1686 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1688 where
1689 'tx: 'a,
1690 {
1691 DatabaseTransaction {
1692 tx: Box::new(PrefixDatabaseTransaction {
1693 inner: &mut self.tx,
1694 global_dbtx_access_token: None,
1695 prefix,
1696 }),
1697 decoders: self.decoders.clone(),
1698 commit_tracker: match self.commit_tracker {
1699 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1700 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1701 },
1702 on_commit_hooks: match self.on_commit_hooks {
1703 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1704 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1705 },
1706 capability: self.capability,
1707 }
1708 }
1709
1710 pub fn to_ref_with_prefix_module_id<'a>(
1711 &'a mut self,
1712 module_instance_id: ModuleInstanceId,
1713 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1714 where
1715 'tx: 'a,
1716 {
1717 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1718 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1719 (
1720 DatabaseTransaction {
1721 tx: Box::new(PrefixDatabaseTransaction {
1722 inner: &mut self.tx,
1723 global_dbtx_access_token: Some(global_dbtx_access_token),
1724 prefix,
1725 }),
1726 decoders: self.decoders.clone(),
1727 commit_tracker: match self.commit_tracker {
1728 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1729 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1730 },
1731 on_commit_hooks: match self.on_commit_hooks {
1732 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1733 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1734 },
1735 capability: self.capability,
1736 },
1737 global_dbtx_access_token,
1738 )
1739 }
1740
1741 pub fn is_global(&self) -> bool {
1743 self.tx.is_global()
1744 }
1745
1746 pub fn ensure_global(&self) -> DatabaseResult<()> {
1748 if !self.is_global() {
1749 return Err(DatabaseError::Other(anyhow::anyhow!(
1750 "Database instance not global"
1751 )));
1752 }
1753
1754 Ok(())
1755 }
1756
1757 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
1759 if self.is_global() {
1760 return Err(DatabaseError::Other(anyhow::anyhow!(
1761 "Database instance not isolated"
1762 )));
1763 }
1764
1765 Ok(())
1766 }
1767
1768 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1770 self.commit_tracker.ignore_uncommitted = true;
1771 self
1772 }
1773
1774 pub fn warn_uncommitted(&mut self) -> &mut Self {
1776 self.commit_tracker.ignore_uncommitted = false;
1777 self
1778 }
1779
1780 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1782 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1783 self.on_commit_hooks.push(Box::new(f));
1784 }
1785
1786 pub fn global_dbtx<'a>(
1787 &'a mut self,
1788 access_token: GlobalDBTxAccessToken,
1789 ) -> DatabaseTransaction<'a, Cap>
1790 where
1791 'tx: 'a,
1792 {
1793 let decoders = self.decoders.clone();
1794
1795 DatabaseTransaction {
1796 tx: Box::new(self.tx.global_dbtx(access_token)),
1797 decoders,
1798 commit_tracker: match self.commit_tracker {
1799 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1800 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1801 },
1802 on_commit_hooks: match self.on_commit_hooks {
1803 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1804 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1805 },
1806 capability: self.capability,
1807 }
1808 }
1809}
1810
1811#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1813pub struct GlobalDBTxAccessToken(u32);
1814
1815impl GlobalDBTxAccessToken {
1816 fn from_prefix(prefix: &[u8]) -> Self {
1827 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1828 }
1829}
1830
1831impl<'tx> DatabaseTransaction<'tx, Committable> {
1832 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1833 Self {
1834 tx: dbtx,
1835 decoders,
1836 commit_tracker: MaybeRef::Owned(CommitTracker {
1837 is_committed: false,
1838 has_writes: false,
1839 ignore_uncommitted: false,
1840 }),
1841 on_commit_hooks: MaybeRef::Owned(vec![]),
1842 capability: PhantomData,
1843 }
1844 }
1845
1846 pub async fn commit_tx_result(mut self) -> DatabaseResult<()> {
1847 self.commit_tracker.is_committed = true;
1848 let commit_result = self.tx.commit_tx().await;
1849
1850 if commit_result.is_ok() {
1852 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1853 hook();
1854 }
1855 }
1856
1857 commit_result
1858 }
1859
1860 pub async fn commit_tx(mut self) {
1861 self.commit_tracker.is_committed = true;
1862 self.commit_tx_result()
1863 .await
1864 .expect("Unrecoverable error occurred while committing to the database.");
1865 }
1866}
1867
1868#[apply(async_trait_maybe_send!)]
1869impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1870where
1871 Cap: Send,
1872{
1873 async fn raw_insert_bytes(
1874 &mut self,
1875 key: &[u8],
1876 value: &[u8],
1877 ) -> DatabaseResult<Option<Vec<u8>>> {
1878 self.commit_tracker.has_writes = true;
1879 self.tx.raw_insert_bytes(key, value).await
1880 }
1881
1882 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1883 self.tx.raw_get_bytes(key).await
1884 }
1885
1886 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1887 self.tx.raw_remove_entry(key).await
1888 }
1889
1890 async fn raw_find_by_range(
1891 &mut self,
1892 key_range: Range<&[u8]>,
1893 ) -> DatabaseResult<PrefixStream<'_>> {
1894 self.tx.raw_find_by_range(key_range).await
1895 }
1896
1897 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1898 self.tx.raw_find_by_prefix(key_prefix).await
1899 }
1900
1901 async fn raw_find_by_prefix_sorted_descending(
1902 &mut self,
1903 key_prefix: &[u8],
1904 ) -> DatabaseResult<PrefixStream<'_>> {
1905 self.tx
1906 .raw_find_by_prefix_sorted_descending(key_prefix)
1907 .await
1908 }
1909
1910 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1911 self.commit_tracker.has_writes = true;
1912 self.tx.raw_remove_by_prefix(key_prefix).await
1913 }
1914}
1915impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {}
1916
1917impl<T> DatabaseKeyPrefix for T
1918where
1919 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1920{
1921 fn to_bytes(&self) -> Vec<u8> {
1922 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1923 data.append(&mut self.consensus_encode_to_vec());
1924 data
1925 }
1926}
1927
1928impl<T> DatabaseKey for T
1929where
1930 T: DatabaseRecord + crate::encoding::Decodable + Sized,
1933{
1934 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1935 fn from_bytes(
1936 data: &[u8],
1937 modules: &ModuleDecoderRegistry,
1938 ) -> std::result::Result<Self, DecodingError> {
1939 if data.is_empty() {
1940 return Err(DecodingError::wrong_length(1, 0));
1942 }
1943
1944 if data[0] != Self::DB_PREFIX {
1945 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1946 }
1947
1948 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
1949 .map_err(|decode_error| DecodingError::Other(decode_error.0))
1950 }
1951}
1952
1953impl<T> DatabaseValue for T
1954where
1955 T: Debug + Encodable + Decodable,
1956{
1957 fn from_bytes(
1958 data: &[u8],
1959 modules: &ModuleDecoderRegistry,
1960 ) -> std::result::Result<Self, DecodingError> {
1961 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
1962 }
1963
1964 fn to_bytes(&self) -> Vec<u8> {
1965 self.consensus_encode_to_vec()
1966 }
1967}
1968
1969#[macro_export]
2030macro_rules! impl_db_record {
2031 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2032 impl $crate::db::DatabaseRecord for $key {
2033 const DB_PREFIX: u8 = $db_prefix as u8;
2034 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2035 type Key = Self;
2036 type Value = $val;
2037 }
2038 $(
2039 impl_db_record! {
2040 @impl_notify_marker key = $key, notify_on_modify = $notify
2041 }
2042 )?
2043 };
2044 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2046 impl $crate::db::DatabaseKeyWithNotify for $key {}
2047 };
2048 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2050}
2051
2052#[macro_export]
2053macro_rules! impl_db_lookup{
2054 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2055 $(
2056 impl $crate::db::DatabaseLookup for $query_prefix {
2057 type Record = $key;
2058 }
2059 )*
2060 };
2061}
2062
2063#[derive(Debug, Encodable, Decodable, Serialize)]
2065pub struct DatabaseVersionKeyV0;
2066
2067#[derive(Debug, Encodable, Decodable, Serialize)]
2068pub struct DatabaseVersionKey(pub ModuleInstanceId);
2069
2070#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2071pub struct DatabaseVersion(pub u64);
2072
2073impl_db_record!(
2074 key = DatabaseVersionKeyV0,
2075 value = DatabaseVersion,
2076 db_prefix = DbKeyPrefix::DatabaseVersion
2077);
2078
2079impl_db_record!(
2080 key = DatabaseVersionKey,
2081 value = DatabaseVersion,
2082 db_prefix = DbKeyPrefix::DatabaseVersion
2083);
2084
2085impl std::fmt::Display for DatabaseVersion {
2086 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2087 write!(f, "{}", self.0)
2088 }
2089}
2090
2091impl DatabaseVersion {
2092 pub fn increment(&self) -> Self {
2093 Self(self.0 + 1)
2094 }
2095}
2096
2097impl std::fmt::Display for DbKeyPrefix {
2098 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2099 write!(f, "{self:?}")
2100 }
2101}
2102
2103#[repr(u8)]
2104#[derive(Clone, EnumIter, Debug)]
2105pub enum DbKeyPrefix {
2106 DatabaseVersion = 0x50,
2107 ClientBackup = 0x51,
2108}
2109
2110#[derive(Debug, Error)]
2111pub enum DecodingError {
2112 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2113 WrongPrefix { expected: u8, found: u8 },
2114 #[error("Key had a wrong length, expected {expected} but got {found}")]
2115 WrongLength { expected: usize, found: usize },
2116 #[error("Other decoding error: {0:#}")]
2117 Other(anyhow::Error),
2118}
2119
2120impl DecodingError {
2121 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2122 Self::Other(anyhow::Error::from(error))
2123 }
2124
2125 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2126 Self::WrongPrefix { expected, found }
2127 }
2128
2129 pub fn wrong_length(expected: usize, found: usize) -> Self {
2130 Self::WrongLength { expected, found }
2131 }
2132}
2133
2134#[derive(Debug, Error)]
2136pub enum DatabaseError {
2137 #[error("Write-write conflict detected")]
2140 WriteConflict,
2141
2142 #[error("Transaction already consumed")]
2145 TransactionConsumed,
2146
2147 #[error("Database backend error: {0}")]
2149 DatabaseBackend(#[from] Box<dyn Error + Send + Sync>),
2150
2151 #[error("Database error: {0:#}")]
2153 Other(anyhow::Error),
2154}
2155
2156impl DatabaseError {
2157 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2159 Self::Other(anyhow::Error::from(error))
2160 }
2161
2162 pub fn backend<E: Error + Send + Sync + 'static>(error: E) -> Self {
2164 Self::DatabaseBackend(Box::new(error))
2165 }
2166}
2167
2168impl From<anyhow::Error> for DatabaseError {
2169 fn from(error: anyhow::Error) -> Self {
2170 Self::Other(error)
2171 }
2172}
2173
2174#[macro_export]
2175macro_rules! push_db_pair_items {
2176 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2177 let db_items =
2178 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2179 .await
2180 .map(|(key, val)| {
2181 (
2182 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2183 val,
2184 )
2185 })
2186 .collect::<BTreeMap<String, $value_type>>()
2187 .await;
2188
2189 $map.insert($key_literal.to_string(), Box::new(db_items));
2190 };
2191}
2192
2193#[macro_export]
2194macro_rules! push_db_key_items {
2195 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2196 let db_items =
2197 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2198 .await
2199 .map(|(key, _)| key)
2200 .collect::<Vec<$key_type>>()
2201 .await;
2202
2203 $map.insert($key_literal.to_string(), Box::new(db_items));
2204 };
2205}
2206
2207pub struct DbMigrationFnContext<'tx, C> {
2221 dbtx: DatabaseTransaction<'tx>,
2222 module_instance_id: Option<ModuleInstanceId>,
2223 ctx: C,
2224 __please_use_constructor: (),
2225}
2226
2227impl<'tx, C> DbMigrationFnContext<'tx, C> {
2228 pub fn new(
2229 dbtx: DatabaseTransaction<'tx>,
2230 module_instance_id: Option<ModuleInstanceId>,
2231 ctx: C,
2232 ) -> Self {
2233 dbtx.ensure_global().expect("Must pass global dbtx");
2234 Self {
2235 dbtx,
2236 module_instance_id,
2237 ctx,
2238 __please_use_constructor: (),
2240 }
2241 }
2242
2243 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2244 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2245 }
2246
2247 #[doc(hidden)]
2249 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2250 let Self { dbtx, ctx, .. } = self;
2251
2252 (dbtx, ctx)
2253 }
2254
2255 pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2256 if let Some(module_instance_id) = self.module_instance_id {
2257 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2258 } else {
2259 self.dbtx.to_ref_nc()
2260 }
2261 }
2262
2263 #[doc(hidden)]
2265 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2266 self.module_instance_id
2267 }
2268}
2269
2270pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2272pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2273
2274pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2279pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2280
2281pub type DbMigrationFn<C> = Box<
2292 maybe_add_send_sync!(
2293 dyn for<'tx> Fn(
2294 DbMigrationFnContext<'tx, C>,
2295 ) -> Pin<
2296 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2297 >
2298 ),
2299>;
2300
2301pub fn get_current_database_version<F>(
2305 migrations: &BTreeMap<DatabaseVersion, F>,
2306) -> DatabaseVersion {
2307 let versions = migrations.keys().copied().collect::<Vec<_>>();
2308
2309 if !versions
2312 .windows(2)
2313 .all(|window| window[0].increment() == window[1])
2314 {
2315 panic!("Database Migrations are not defined contiguously");
2316 }
2317
2318 versions
2319 .last()
2320 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2321}
2322
2323pub async fn apply_migrations<C>(
2324 db: &Database,
2325 ctx: C,
2326 kind: String,
2327 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2328 module_instance_id: Option<ModuleInstanceId>,
2329 external_prefixes_above: Option<u8>,
2332) -> std::result::Result<(), anyhow::Error>
2333where
2334 C: Clone,
2335{
2336 let mut dbtx = db.begin_transaction().await;
2337 apply_migrations_dbtx(
2338 &mut dbtx.to_ref_nc(),
2339 ctx,
2340 kind,
2341 migrations,
2342 module_instance_id,
2343 external_prefixes_above,
2344 )
2345 .await?;
2346
2347 dbtx.commit_tx_result()
2348 .await
2349 .map_err(|e| anyhow::Error::msg(e.to_string()))
2350}
2351pub async fn apply_migrations_dbtx<C>(
2363 global_dbtx: &mut DatabaseTransaction<'_>,
2364 ctx: C,
2365 kind: String,
2366 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2367 module_instance_id: Option<ModuleInstanceId>,
2368 external_prefixes_above: Option<u8>,
2371) -> std::result::Result<(), anyhow::Error>
2372where
2373 C: Clone,
2374{
2375 let is_new_db = global_dbtx
2378 .raw_find_by_prefix(&[])
2379 .await?
2380 .filter(|(key, _v)| {
2381 std::future::ready(
2382 external_prefixes_above.is_none_or(|external_prefixes_above| {
2383 !key.is_empty() && key[0] < external_prefixes_above
2384 }),
2385 )
2386 })
2387 .next()
2388 .await
2389 .is_none();
2390
2391 let target_db_version = get_current_database_version(&migrations);
2392
2393 create_database_version_dbtx(
2395 global_dbtx,
2396 target_db_version,
2397 module_instance_id,
2398 kind.clone(),
2399 is_new_db,
2400 )
2401 .await?;
2402
2403 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2404
2405 let disk_version = global_dbtx
2406 .get_value(&DatabaseVersionKey(module_instance_id_key))
2407 .await;
2408
2409 let db_version = if let Some(disk_version) = disk_version {
2410 let mut current_db_version = disk_version;
2411
2412 if current_db_version > target_db_version {
2413 return Err(anyhow::anyhow!(format!(
2414 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2415 )));
2416 }
2417
2418 while current_db_version < target_db_version {
2419 if let Some(migration) = migrations.get(¤t_db_version) {
2420 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2421 migration(DbMigrationFnContext::new(
2422 global_dbtx.to_ref_nc(),
2423 module_instance_id,
2424 ctx.clone(),
2425 ))
2426 .await?;
2427 } else {
2428 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2429 }
2430
2431 current_db_version = current_db_version.increment();
2432
2433 global_dbtx
2434 .insert_entry(
2435 &DatabaseVersionKey(module_instance_id_key),
2436 ¤t_db_version,
2437 )
2438 .await;
2439 }
2440
2441 current_db_version
2442 } else {
2443 target_db_version
2444 };
2445
2446 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2447 Ok(())
2448}
2449
2450pub async fn create_database_version(
2451 db: &Database,
2452 target_db_version: DatabaseVersion,
2453 module_instance_id: Option<ModuleInstanceId>,
2454 kind: String,
2455 is_new_db: bool,
2456) -> std::result::Result<(), anyhow::Error> {
2457 let mut dbtx = db.begin_transaction().await;
2458
2459 create_database_version_dbtx(
2460 &mut dbtx.to_ref_nc(),
2461 target_db_version,
2462 module_instance_id,
2463 kind,
2464 is_new_db,
2465 )
2466 .await?;
2467
2468 dbtx.commit_tx_result().await?;
2469 Ok(())
2470}
2471
2472pub async fn create_database_version_dbtx(
2476 global_dbtx: &mut DatabaseTransaction<'_>,
2477 target_db_version: DatabaseVersion,
2478 module_instance_id: Option<ModuleInstanceId>,
2479 kind: String,
2480 is_new_db: bool,
2481) -> std::result::Result<(), anyhow::Error> {
2482 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2483
2484 if global_dbtx
2488 .get_value(&DatabaseVersionKey(key_module_instance_id))
2489 .await
2490 .is_none()
2491 {
2492 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2501 remove_current_db_version_if_exists(
2502 &mut global_dbtx
2503 .to_ref_with_prefix_module_id(module_instance_id)
2504 .0
2505 .into_nc(),
2506 is_new_db,
2507 target_db_version,
2508 )
2509 .await
2510 } else {
2511 remove_current_db_version_if_exists(
2512 &mut global_dbtx.to_ref().into_nc(),
2513 is_new_db,
2514 target_db_version,
2515 )
2516 .await
2517 };
2518
2519 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2521 global_dbtx
2522 .insert_new_entry(
2523 &DatabaseVersionKey(key_module_instance_id),
2524 ¤t_version_in_module,
2525 )
2526 .await;
2527 }
2528
2529 Ok(())
2530}
2531
2532async fn remove_current_db_version_if_exists(
2537 version_dbtx: &mut DatabaseTransaction<'_>,
2538 is_new_db: bool,
2539 target_db_version: DatabaseVersion,
2540) -> DatabaseVersion {
2541 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2545 match current_version_in_module {
2546 Some(database_version) => database_version,
2547 None if is_new_db => target_db_version,
2548 None => DatabaseVersion(0),
2549 }
2550}
2551
2552fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2555 module_instance_id.map_or_else(
2557 || MODULE_GLOBAL_PREFIX.into(),
2558 |module_instance_id| module_instance_id,
2559 )
2560}
2561#[allow(unused_imports)]
2562mod test_utils {
2563 use std::collections::BTreeMap;
2564 use std::time::Duration;
2565
2566 use fedimint_core::db::DbMigrationFnContext;
2567 use futures::future::ready;
2568 use futures::{Future, FutureExt, StreamExt};
2569 use rand::Rng;
2570 use tokio::join;
2571
2572 use super::{
2573 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2574 DbMigrationFn, apply_migrations,
2575 };
2576 use crate::core::ModuleKind;
2577 use crate::db::mem_impl::MemDatabase;
2578 use crate::db::{
2579 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2580 };
2581 use crate::encoding::{Decodable, Encodable};
2582 use crate::module::registry::ModuleDecoderRegistry;
2583
2584 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2585 crate::runtime::timeout(Duration::from_millis(10), fut)
2586 .await
2587 .ok()
2588 }
2589
2590 #[repr(u8)]
2591 #[derive(Clone)]
2592 pub enum TestDbKeyPrefix {
2593 Test = 0x42,
2594 AltTest = 0x43,
2595 PercentTestKey = 0x25,
2596 }
2597
2598 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2599 pub(super) struct TestKey(pub u64);
2600
2601 #[derive(Debug, Encodable, Decodable)]
2602 struct DbPrefixTestPrefix;
2603
2604 impl_db_record!(
2605 key = TestKey,
2606 value = TestVal,
2607 db_prefix = TestDbKeyPrefix::Test,
2608 notify_on_modify = true,
2609 );
2610 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2611
2612 #[derive(Debug, Encodable, Decodable)]
2613 struct TestKeyV0(u64, u64);
2614
2615 #[derive(Debug, Encodable, Decodable)]
2616 struct DbPrefixTestPrefixV0;
2617
2618 impl_db_record!(
2619 key = TestKeyV0,
2620 value = TestVal,
2621 db_prefix = TestDbKeyPrefix::Test,
2622 );
2623 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2624
2625 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2626 struct AltTestKey(u64);
2627
2628 #[derive(Debug, Encodable, Decodable)]
2629 struct AltDbPrefixTestPrefix;
2630
2631 impl_db_record!(
2632 key = AltTestKey,
2633 value = TestVal,
2634 db_prefix = TestDbKeyPrefix::AltTest,
2635 );
2636 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2637
2638 #[derive(Debug, Encodable, Decodable)]
2639 struct PercentTestKey(u64);
2640
2641 #[derive(Debug, Encodable, Decodable)]
2642 struct PercentPrefixTestPrefix;
2643
2644 impl_db_record!(
2645 key = PercentTestKey,
2646 value = TestVal,
2647 db_prefix = TestDbKeyPrefix::PercentTestKey,
2648 );
2649
2650 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2651 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2652 pub(super) struct TestVal(pub u64);
2653
2654 const TEST_MODULE_PREFIX: u16 = 1;
2655 const ALT_MODULE_PREFIX: u16 = 2;
2656
2657 pub async fn verify_insert_elements(db: Database) {
2658 let mut dbtx = db.begin_transaction().await;
2659 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2660 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2661 dbtx.commit_tx().await;
2662
2663 let mut dbtx = db.begin_transaction().await;
2665 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2666 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2667 dbtx.commit_tx().await;
2668
2669 let mut dbtx = db.begin_transaction().await;
2671 assert_eq!(
2672 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2673 Some(TestVal(2))
2674 );
2675 assert_eq!(
2676 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2677 Some(TestVal(3))
2678 );
2679 dbtx.commit_tx().await;
2680
2681 let mut dbtx = db.begin_transaction().await;
2682 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2683 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2684 dbtx.commit_tx().await;
2685 }
2686
2687 pub async fn verify_remove_nonexisting(db: Database) {
2688 let mut dbtx = db.begin_transaction().await;
2689 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2690 let removed = dbtx.remove_entry(&TestKey(1)).await;
2691 assert!(removed.is_none());
2692
2693 dbtx.commit_tx().await;
2695 }
2696
2697 pub async fn verify_remove_existing(db: Database) {
2698 let mut dbtx = db.begin_transaction().await;
2699
2700 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2701
2702 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2703
2704 let removed = dbtx.remove_entry(&TestKey(1)).await;
2705 assert_eq!(removed, Some(TestVal(2)));
2706 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2707
2708 dbtx.commit_tx().await;
2710 }
2711
2712 pub async fn verify_read_own_writes(db: Database) {
2713 let mut dbtx = db.begin_transaction().await;
2714
2715 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2716
2717 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2718
2719 dbtx.commit_tx().await;
2721 }
2722
2723 pub async fn verify_prevent_dirty_reads(db: Database) {
2724 let mut dbtx = db.begin_transaction().await;
2725
2726 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2727
2728 let mut dbtx2 = db.begin_transaction().await;
2730 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2731
2732 dbtx.commit_tx().await;
2734 }
2735
2736 pub async fn verify_find_by_range(db: Database) {
2737 let mut dbtx = db.begin_transaction().await;
2738 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2739 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2740 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2741
2742 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2743 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2744
2745 {
2746 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2747 module_dbtx
2748 .insert_entry(&TestKey(300), &TestVal(3000))
2749 .await;
2750 }
2751
2752 dbtx.commit_tx().await;
2753
2754 let mut dbtx = db.begin_transaction_nc().await;
2756
2757 let returned_keys = dbtx
2758 .find_by_range(TestKey(55)..TestKey(56))
2759 .await
2760 .collect::<Vec<_>>()
2761 .await;
2762
2763 let expected = vec![(TestKey(55), TestVal(9999))];
2764
2765 assert_eq!(returned_keys, expected);
2766
2767 let returned_keys = dbtx
2768 .find_by_range(TestKey(54)..TestKey(56))
2769 .await
2770 .collect::<Vec<_>>()
2771 .await;
2772
2773 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2774 assert_eq!(returned_keys, expected);
2775
2776 let returned_keys = dbtx
2777 .find_by_range(TestKey(54)..TestKey(57))
2778 .await
2779 .collect::<Vec<_>>()
2780 .await;
2781
2782 let expected = vec![
2783 (TestKey(54), TestVal(8888)),
2784 (TestKey(55), TestVal(9999)),
2785 (TestKey(56), TestVal(7777)),
2786 ];
2787 assert_eq!(returned_keys, expected);
2788
2789 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2790 let test_range = module_dbtx
2791 .find_by_range(TestKey(300)..TestKey(301))
2792 .await
2793 .collect::<Vec<_>>()
2794 .await;
2795 assert!(test_range.len() == 1);
2796 }
2797
2798 pub async fn verify_find_by_prefix(db: Database) {
2799 let mut dbtx = db.begin_transaction().await;
2800 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2801 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2802
2803 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2804 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2805 dbtx.commit_tx().await;
2806
2807 let mut dbtx = db.begin_transaction().await;
2809
2810 let returned_keys = dbtx
2811 .find_by_prefix(&DbPrefixTestPrefix)
2812 .await
2813 .collect::<Vec<_>>()
2814 .await;
2815
2816 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2817 assert_eq!(returned_keys, expected);
2818
2819 let reversed = dbtx
2820 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2821 .await
2822 .collect::<Vec<_>>()
2823 .await;
2824 let mut reversed_expected = expected;
2825 reversed_expected.reverse();
2826 assert_eq!(reversed, reversed_expected);
2827
2828 let returned_keys = dbtx
2829 .find_by_prefix(&AltDbPrefixTestPrefix)
2830 .await
2831 .collect::<Vec<_>>()
2832 .await;
2833
2834 let expected = vec![
2835 (AltTestKey(54), TestVal(6666)),
2836 (AltTestKey(55), TestVal(7777)),
2837 ];
2838 assert_eq!(returned_keys, expected);
2839
2840 let reversed = dbtx
2841 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2842 .await
2843 .collect::<Vec<_>>()
2844 .await;
2845 let mut reversed_expected = expected;
2846 reversed_expected.reverse();
2847 assert_eq!(reversed, reversed_expected);
2848 }
2849
2850 pub async fn verify_commit(db: Database) {
2851 let mut dbtx = db.begin_transaction().await;
2852
2853 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2854 dbtx.commit_tx().await;
2855
2856 let mut dbtx2 = db.begin_transaction().await;
2858 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2859 }
2860
2861 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2862 let mut dbtx = db.begin_transaction().await;
2863 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2864
2865 let mut dbtx2 = db.begin_transaction().await;
2866
2867 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2868
2869 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2870
2871 dbtx2.commit_tx().await;
2872
2873 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2876
2877 let expected_keys = 0;
2878 let returned_keys = dbtx
2879 .find_by_prefix(&DbPrefixTestPrefix)
2880 .await
2881 .fold(0, |returned_keys, (key, value)| async move {
2882 if key == TestKey(100) {
2883 assert!(value.eq(&TestVal(101)));
2884 }
2885 returned_keys + 1
2886 })
2887 .await;
2888
2889 assert_eq!(returned_keys, expected_keys);
2890 }
2891
2892 pub async fn verify_snapshot_isolation(db: Database) {
2893 async fn random_yield() {
2894 let times = if rand::thread_rng().gen_bool(0.5) {
2895 0
2896 } else {
2897 10
2898 };
2899 for _ in 0..times {
2900 tokio::task::yield_now().await;
2901 }
2902 }
2903
2904 for i in 0..1000 {
2906 let base_key = i * 2;
2907 let tx_accepted_key = base_key;
2908 let spent_input_key = base_key + 1;
2909
2910 join!(
2911 async {
2912 random_yield().await;
2913 let mut dbtx = db.begin_transaction().await;
2914
2915 random_yield().await;
2916 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2917 random_yield().await;
2918 let s = match i % 5 {
2921 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2922 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2923 2 => {
2924 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2925 .await
2926 }
2927 3 => {
2928 dbtx.find_by_prefix(&DbPrefixTestPrefix)
2929 .await
2930 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2931 .map(|(_k, v)| v)
2932 .next()
2933 .await
2934 }
2935 4 => {
2936 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2937 .await
2938 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2939 .map(|(_k, v)| v)
2940 .next()
2941 .await
2942 }
2943 _ => {
2944 panic!("woot?");
2945 }
2946 };
2947
2948 match (a, s) {
2949 (None, None) | (Some(_), Some(_)) => {}
2950 (None, Some(_)) => panic!("none some?! {i}"),
2951 (Some(_), None) => panic!("some none?! {i}"),
2952 }
2953 },
2954 async {
2955 random_yield().await;
2956
2957 let mut dbtx = db.begin_transaction().await;
2958 random_yield().await;
2959 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2960
2961 random_yield().await;
2962 assert_eq!(
2963 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2964 .await,
2965 None
2966 );
2967
2968 random_yield().await;
2969 assert_eq!(
2970 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2971 .await,
2972 None
2973 );
2974 random_yield().await;
2975 dbtx.commit_tx().await;
2976 }
2977 );
2978 }
2979 }
2980
2981 pub async fn verify_phantom_entry(db: Database) {
2982 let mut dbtx = db.begin_transaction().await;
2983
2984 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2985
2986 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2987
2988 dbtx.commit_tx().await;
2989
2990 let mut dbtx = db.begin_transaction().await;
2991 let expected_keys = 2;
2992 let returned_keys = dbtx
2993 .find_by_prefix(&DbPrefixTestPrefix)
2994 .await
2995 .fold(0, |returned_keys, (key, value)| async move {
2996 match key {
2997 TestKey(100) => {
2998 assert!(value.eq(&TestVal(101)));
2999 }
3000 TestKey(101) => {
3001 assert!(value.eq(&TestVal(102)));
3002 }
3003 _ => {}
3004 }
3005 returned_keys + 1
3006 })
3007 .await;
3008
3009 assert_eq!(returned_keys, expected_keys);
3010
3011 let mut dbtx2 = db.begin_transaction().await;
3012
3013 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3014
3015 dbtx2.commit_tx().await;
3016
3017 let returned_keys = dbtx
3018 .find_by_prefix(&DbPrefixTestPrefix)
3019 .await
3020 .fold(0, |returned_keys, (key, value)| async move {
3021 match key {
3022 TestKey(100) => {
3023 assert!(value.eq(&TestVal(101)));
3024 }
3025 TestKey(101) => {
3026 assert!(value.eq(&TestVal(102)));
3027 }
3028 _ => {}
3029 }
3030 returned_keys + 1
3031 })
3032 .await;
3033
3034 assert_eq!(returned_keys, expected_keys);
3035 }
3036
3037 pub async fn expect_write_conflict(db: Database) {
3038 let mut dbtx = db.begin_transaction().await;
3039 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3040 dbtx.commit_tx().await;
3041
3042 let mut dbtx2 = db.begin_transaction().await;
3043 let mut dbtx3 = db.begin_transaction().await;
3044
3045 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3046
3047 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3051
3052 dbtx2.commit_tx().await;
3053 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3054 }
3055
3056 pub async fn verify_string_prefix(db: Database) {
3057 let mut dbtx = db.begin_transaction().await;
3058 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3059
3060 assert_eq!(
3061 dbtx.get_value(&PercentTestKey(100)).await,
3062 Some(TestVal(101))
3063 );
3064
3065 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3066
3067 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3068
3069 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3070
3071 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3074
3075 let expected_keys = 4;
3076 let returned_keys = dbtx
3077 .find_by_prefix(&PercentPrefixTestPrefix)
3078 .await
3079 .fold(0, |returned_keys, (key, value)| async move {
3080 if matches!(key, PercentTestKey(101)) {
3081 assert!(value.eq(&TestVal(100)));
3082 }
3083 returned_keys + 1
3084 })
3085 .await;
3086
3087 assert_eq!(returned_keys, expected_keys);
3088 }
3089
3090 pub async fn verify_remove_by_prefix(db: Database) {
3091 let mut dbtx = db.begin_transaction().await;
3092
3093 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3094
3095 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3096
3097 dbtx.commit_tx().await;
3098
3099 let mut remove_dbtx = db.begin_transaction().await;
3100 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3101 remove_dbtx.commit_tx().await;
3102
3103 let mut dbtx = db.begin_transaction().await;
3104 let expected_keys = 0;
3105 let returned_keys = dbtx
3106 .find_by_prefix(&DbPrefixTestPrefix)
3107 .await
3108 .fold(0, |returned_keys, (key, value)| async move {
3109 match key {
3110 TestKey(100) => {
3111 assert!(value.eq(&TestVal(101)));
3112 }
3113 TestKey(101) => {
3114 assert!(value.eq(&TestVal(102)));
3115 }
3116 _ => {}
3117 }
3118 returned_keys + 1
3119 })
3120 .await;
3121
3122 assert_eq!(returned_keys, expected_keys);
3123 }
3124
3125 pub async fn verify_module_db(db: Database, module_db: Database) {
3126 let mut dbtx = db.begin_transaction().await;
3127
3128 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3129
3130 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3131
3132 dbtx.commit_tx().await;
3133
3134 let mut module_dbtx = module_db.begin_transaction().await;
3136 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3137
3138 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3139
3140 let mut dbtx = db.begin_transaction().await;
3142 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3143
3144 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3145
3146 let mut module_dbtx = module_db.begin_transaction().await;
3147
3148 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3149
3150 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3151
3152 module_dbtx.commit_tx().await;
3153
3154 let expected_keys = 2;
3155 let mut dbtx = db.begin_transaction().await;
3156 let returned_keys = dbtx
3157 .find_by_prefix(&DbPrefixTestPrefix)
3158 .await
3159 .fold(0, |returned_keys, (key, value)| async move {
3160 match key {
3161 TestKey(100) => {
3162 assert!(value.eq(&TestVal(101)));
3163 }
3164 TestKey(101) => {
3165 assert!(value.eq(&TestVal(102)));
3166 }
3167 _ => {}
3168 }
3169 returned_keys + 1
3170 })
3171 .await;
3172
3173 assert_eq!(returned_keys, expected_keys);
3174
3175 let removed = dbtx.remove_entry(&TestKey(100)).await;
3176 assert_eq!(removed, Some(TestVal(101)));
3177 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3178
3179 let mut module_dbtx = module_db.begin_transaction().await;
3180 assert_eq!(
3181 module_dbtx.get_value(&TestKey(100)).await,
3182 Some(TestVal(103))
3183 );
3184 }
3185
3186 pub async fn verify_module_prefix(db: Database) {
3187 let mut test_dbtx = db.begin_transaction().await;
3188 {
3189 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3190
3191 test_module_dbtx
3192 .insert_entry(&TestKey(100), &TestVal(101))
3193 .await;
3194
3195 test_module_dbtx
3196 .insert_entry(&TestKey(101), &TestVal(102))
3197 .await;
3198 }
3199
3200 test_dbtx.commit_tx().await;
3201
3202 let mut alt_dbtx = db.begin_transaction().await;
3203 {
3204 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3205
3206 alt_module_dbtx
3207 .insert_entry(&TestKey(100), &TestVal(103))
3208 .await;
3209
3210 alt_module_dbtx
3211 .insert_entry(&TestKey(101), &TestVal(104))
3212 .await;
3213 }
3214
3215 alt_dbtx.commit_tx().await;
3216
3217 let mut test_dbtx = db.begin_transaction().await;
3219 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3220 assert_eq!(
3221 test_module_dbtx.get_value(&TestKey(100)).await,
3222 Some(TestVal(101))
3223 );
3224
3225 assert_eq!(
3226 test_module_dbtx.get_value(&TestKey(101)).await,
3227 Some(TestVal(102))
3228 );
3229
3230 let expected_keys = 2;
3231 let returned_keys = test_module_dbtx
3232 .find_by_prefix(&DbPrefixTestPrefix)
3233 .await
3234 .fold(0, |returned_keys, (key, value)| async move {
3235 match key {
3236 TestKey(100) => {
3237 assert!(value.eq(&TestVal(101)));
3238 }
3239 TestKey(101) => {
3240 assert!(value.eq(&TestVal(102)));
3241 }
3242 _ => {}
3243 }
3244 returned_keys + 1
3245 })
3246 .await;
3247
3248 assert_eq!(returned_keys, expected_keys);
3249
3250 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3251 assert_eq!(removed, Some(TestVal(101)));
3252 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3253
3254 let mut test_dbtx = db.begin_transaction().await;
3257 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3258
3259 test_dbtx.commit_tx().await;
3260 }
3261
3262 #[cfg(test)]
3263 #[tokio::test]
3264 pub async fn verify_test_migration() {
3265 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3267 let expected_test_keys_size: usize = 100;
3268 let mut dbtx = db.begin_transaction().await;
3269 for i in 0..expected_test_keys_size {
3270 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3271 .await;
3272 }
3273
3274 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3276 .await;
3277 dbtx.commit_tx().await;
3278
3279 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3280
3281 migrations.insert(
3282 DatabaseVersion(0),
3283 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3284 );
3285
3286 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3287 .await
3288 .expect("Error applying migrations for TestModule");
3289
3290 let mut dbtx = db.begin_transaction().await;
3292
3293 assert!(
3296 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3297 .await
3298 .is_some()
3299 );
3300
3301 let test_keys = dbtx
3303 .find_by_prefix(&DbPrefixTestPrefix)
3304 .await
3305 .collect::<Vec<_>>()
3306 .await;
3307 let test_keys_size = test_keys.len();
3308 assert_eq!(test_keys_size, expected_test_keys_size);
3309 for (key, val) in test_keys {
3310 assert_eq!(key.0, val.0 + 1);
3311 }
3312 }
3313
3314 #[allow(dead_code)]
3315 async fn migrate_test_db_version_0(
3316 mut ctx: DbMigrationFnContext<'_, ()>,
3317 ) -> std::result::Result<(), anyhow::Error> {
3318 let mut dbtx = ctx.dbtx();
3319 let example_keys_v0 = dbtx
3320 .find_by_prefix(&DbPrefixTestPrefixV0)
3321 .await
3322 .collect::<Vec<_>>()
3323 .await;
3324 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3325 for (key, val) in example_keys_v0 {
3326 let key_v2 = TestKey(key.1);
3327 dbtx.insert_new_entry(&key_v2, &val).await;
3328 }
3329 Ok(())
3330 }
3331
3332 #[cfg(test)]
3333 #[tokio::test]
3334 async fn test_autocommit() {
3335 use std::marker::PhantomData;
3336 use std::ops::Range;
3337 use std::path::Path;
3338
3339 use anyhow::anyhow;
3340 use async_trait::async_trait;
3341
3342 use crate::ModuleDecoderRegistry;
3343 use crate::db::{
3344 AutocommitError, BaseDatabaseTransaction, DatabaseError, DatabaseResult,
3345 IDatabaseTransaction, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
3346 IRawDatabase, IRawDatabaseTransaction,
3347 };
3348
3349 #[derive(Debug)]
3350 struct FakeDatabase;
3351
3352 #[async_trait]
3353 impl IRawDatabase for FakeDatabase {
3354 type Transaction<'a> = FakeTransaction<'a>;
3355 async fn begin_transaction(&self) -> FakeTransaction {
3356 FakeTransaction(PhantomData)
3357 }
3358
3359 fn checkpoint(&self, _backup_path: &Path) -> DatabaseResult<()> {
3360 Ok(())
3361 }
3362 }
3363
3364 #[derive(Debug)]
3365 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3366
3367 #[async_trait]
3368 impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3369 async fn raw_insert_bytes(
3370 &mut self,
3371 _key: &[u8],
3372 _value: &[u8],
3373 ) -> DatabaseResult<Option<Vec<u8>>> {
3374 unimplemented!()
3375 }
3376
3377 async fn raw_get_bytes(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3378 unimplemented!()
3379 }
3380
3381 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3382 unimplemented!()
3383 }
3384
3385 async fn raw_find_by_range(
3386 &mut self,
3387 _key_range: Range<&[u8]>,
3388 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3389 unimplemented!()
3390 }
3391
3392 async fn raw_find_by_prefix(
3393 &mut self,
3394 _key_prefix: &[u8],
3395 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3396 unimplemented!()
3397 }
3398
3399 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
3400 unimplemented!()
3401 }
3402
3403 async fn raw_find_by_prefix_sorted_descending(
3404 &mut self,
3405 _key_prefix: &[u8],
3406 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3407 unimplemented!()
3408 }
3409 }
3410
3411 impl IDatabaseTransactionOps for FakeTransaction<'_> {}
3412
3413 #[async_trait]
3414 impl IRawDatabaseTransaction for FakeTransaction<'_> {
3415 async fn commit_tx(self) -> DatabaseResult<()> {
3416 use crate::db::DatabaseError;
3417
3418 Err(DatabaseError::Other(anyhow::anyhow!("Can't commit!")))
3419 }
3420 }
3421
3422 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3423 let err = db
3424 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3425 .await
3426 .unwrap_err();
3427
3428 match err {
3429 AutocommitError::CommitFailed {
3430 attempts: failed_attempts,
3431 ..
3432 } => {
3433 assert_eq!(failed_attempts, 5);
3434 }
3435 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3436 }
3437 }
3438}
3439
3440pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3441 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3442 decoders: ModuleDecoderRegistry,
3443 key_prefix: &KP,
3444) -> impl Stream<
3445 Item = (
3446 KP::Record,
3447 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3448 ),
3449>
3450+ 'r
3451+ use<'r, KP>
3452where
3453 'inner: 'r,
3454 KP: DatabaseLookup,
3455 KP::Record: DatabaseKey,
3456{
3457 debug!(target: LOG_DB, "find by prefix sorted descending");
3458 let prefix_bytes = key_prefix.to_bytes();
3459 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3460 .await
3461 .expect("Error doing prefix search in database")
3462 .map(move |(key_bytes, value_bytes)| {
3463 let key = decode_key_expect(&key_bytes, &decoders);
3464 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3465 (key, value)
3466 })
3467}
3468
3469pub async fn verify_module_db_integrity_dbtx(
3470 dbtx: &mut DatabaseTransaction<'_>,
3471 module_id: ModuleInstanceId,
3472 module_kind: ModuleKind,
3473 prefixes: &BTreeSet<u8>,
3474) {
3475 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3476 if module_id < 250 {
3477 assert_eq!(module_db_prefix.len(), 2);
3478 }
3479 let mut records = dbtx
3480 .raw_find_by_prefix(&module_db_prefix)
3481 .await
3482 .expect("DB fail");
3483 while let Some((k, v)) = records.next().await {
3484 assert!(
3485 prefixes.contains(&k[module_db_prefix.len()]),
3486 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3487 k.as_hex(),
3488 v.as_hex()
3489 );
3490 }
3491}
3492
3493#[cfg(test)]
3494mod tests;