1use std::any;
106use std::collections::{BTreeMap, BTreeSet};
107use std::error::Error;
108use std::fmt::{self, Debug};
109use std::marker::{self, PhantomData};
110use std::ops::{self, DerefMut, Range};
111use std::path::Path;
112use std::pin::Pin;
113use std::sync::Arc;
114use std::time::Duration;
115
116use bitcoin::hex::DisplayHex as _;
117use fedimint_core::util::BoxFuture;
118use fedimint_logging::LOG_DB;
119use fedimint_util_error::FmtCompact as _;
120use futures::{Stream, StreamExt};
121use macro_rules_attribute::apply;
122use rand::Rng;
123use serde::Serialize;
124use strum_macros::EnumIter;
125use thiserror::Error;
126use tracing::{debug, error, info, instrument, trace, warn};
127
128use crate::core::{ModuleInstanceId, ModuleKind};
129use crate::encoding::{Decodable, Encodable};
130use crate::fmt_utils::AbbreviateHexBytes;
131use crate::task::{MaybeSend, MaybeSync};
132use crate::{async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, timing};
133
134pub mod mem_impl;
135pub mod notifications;
136
137pub use test_utils::*;
138
139use self::notifications::{Notifications, NotifyQueue};
140use crate::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
141
142pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
143
144pub type DatabaseResult<T> = std::result::Result<T, DatabaseError>;
146
147pub trait DatabaseKeyPrefix: Debug {
148 fn to_bytes(&self) -> Vec<u8>;
149}
150
151pub trait DatabaseRecord: DatabaseKeyPrefix {
154 const DB_PREFIX: u8;
155 const NOTIFY_ON_MODIFY: bool = false;
156 type Key: DatabaseKey + Debug;
157 type Value: DatabaseValue + Debug;
158}
159
160pub trait DatabaseLookup: DatabaseKeyPrefix {
163 type Record: DatabaseRecord;
164}
165
166impl<Record> DatabaseLookup for Record
168where
169 Record: DatabaseRecord + Debug + Decodable + Encodable,
170{
171 type Record = Record;
172}
173
174pub trait DatabaseKey: Sized {
177 const NOTIFY_ON_MODIFY: bool = false;
185 fn from_bytes(
186 data: &[u8],
187 modules: &ModuleDecoderRegistry,
188 ) -> std::result::Result<Self, DecodingError>;
189}
190
191pub trait DatabaseKeyWithNotify {}
193
194pub trait DatabaseValue: Sized + Debug {
196 fn from_bytes(
197 data: &[u8],
198 modules: &ModuleDecoderRegistry,
199 ) -> std::result::Result<Self, DecodingError>;
200 fn to_bytes(&self) -> Vec<u8>;
201}
202
203pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
204
205pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
209
210#[derive(Debug, Error)]
212pub enum AutocommitError<E> {
213 #[error("Commit Failed: {last_error}")]
215 CommitFailed {
216 attempts: usize,
218 last_error: DatabaseError,
220 },
221 #[error("Closure error: {error}")]
224 ClosureError {
225 attempts: usize,
231 error: E,
233 },
234}
235
236pub trait AutocommitResultExt<T, E> {
237 fn unwrap_autocommit(self) -> std::result::Result<T, E>;
241}
242
243impl<T, E> AutocommitResultExt<T, E> for std::result::Result<T, AutocommitError<E>> {
244 fn unwrap_autocommit(self) -> std::result::Result<T, E> {
245 match self {
246 Ok(value) => Ok(value),
247 Err(AutocommitError::CommitFailed { .. }) => {
248 panic!("`unwrap_autocommit` called on a autocommit result with finite retries");
249 }
250 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
251 }
252 }
253}
254
255#[apply(async_trait_maybe_send!)]
264pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
265 type Transaction<'a>: IRawDatabaseTransaction + Debug;
267
268 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
270
271 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
273}
274
275#[apply(async_trait_maybe_send!)]
276impl<T> IRawDatabase for Box<T>
277where
278 T: IRawDatabase,
279{
280 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
281
282 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
283 (**self).begin_transaction().await
284 }
285
286 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287 (**self).checkpoint(backup_path)
288 }
289}
290
291pub trait IRawDatabaseExt: IRawDatabase + Sized {
293 fn into_database(self) -> Database {
297 Database::new(self, ModuleRegistry::default())
298 }
299}
300
301impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
302
303impl<T> From<T> for Database
304where
305 T: IRawDatabase,
306{
307 fn from(raw: T) -> Self {
308 Self::new(raw, ModuleRegistry::default())
309 }
310}
311
312#[apply(async_trait_maybe_send!)]
315pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
316 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
318 async fn register(&self, key: &[u8]);
320 async fn notify(&self, key: &[u8]);
322
323 fn is_global(&self) -> bool;
326
327 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()>;
329}
330
331#[apply(async_trait_maybe_send!)]
332impl<T> IDatabase for Arc<T>
333where
334 T: IDatabase + ?Sized,
335{
336 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
337 (**self).begin_transaction().await
338 }
339 async fn register(&self, key: &[u8]) {
340 (**self).register(key).await;
341 }
342 async fn notify(&self, key: &[u8]) {
343 (**self).notify(key).await;
344 }
345
346 fn is_global(&self) -> bool {
347 (**self).is_global()
348 }
349
350 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
351 (**self).checkpoint(backup_path)
352 }
353}
354
355struct BaseDatabase<RawDatabase> {
359 notifications: Arc<Notifications>,
360 raw: RawDatabase,
361}
362
363impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 f.write_str("BaseDatabase")
366 }
367}
368
369#[apply(async_trait_maybe_send!)]
370impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
371 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
372 Box::new(BaseDatabaseTransaction::new(
373 self.raw.begin_transaction().await,
374 self.notifications.clone(),
375 ))
376 }
377 async fn register(&self, key: &[u8]) {
378 self.notifications.register(key).await;
379 }
380 async fn notify(&self, key: &[u8]) {
381 self.notifications.notify(key);
382 }
383
384 fn is_global(&self) -> bool {
385 true
386 }
387
388 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
389 self.raw.checkpoint(backup_path)
390 }
391}
392
393#[derive(Clone, Debug)]
399pub struct Database {
400 inner: Arc<dyn IDatabase + 'static>,
401 module_decoders: ModuleDecoderRegistry,
402}
403
404impl Database {
405 pub fn strong_count(&self) -> usize {
406 Arc::strong_count(&self.inner)
407 }
408
409 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
410 self.inner
411 }
412}
413
414impl Database {
415 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
420 let inner = BaseDatabase {
421 raw,
422 notifications: Arc::new(Notifications::new()),
423 };
424 Self::new_from_arc(
425 Arc::new(inner) as Arc<dyn IDatabase + 'static>,
426 module_decoders,
427 )
428 }
429
430 pub fn new_from_arc(
432 inner: Arc<dyn IDatabase + 'static>,
433 module_decoders: ModuleDecoderRegistry,
434 ) -> Self {
435 Self {
436 inner,
437 module_decoders,
438 }
439 }
440
441 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
443 Self {
444 inner: Arc::new(PrefixDatabase {
445 inner: self.inner.clone(),
446 global_dbtx_access_token: None,
447 prefix,
448 }),
449 module_decoders: self.module_decoders.clone(),
450 }
451 }
452
453 pub fn with_prefix_module_id(
457 &self,
458 module_instance_id: ModuleInstanceId,
459 ) -> (Self, GlobalDBTxAccessToken) {
460 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
461 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
462 (
463 Self {
464 inner: Arc::new(PrefixDatabase {
465 inner: self.inner.clone(),
466 global_dbtx_access_token: Some(global_dbtx_access_token),
467 prefix,
468 }),
469 module_decoders: self.module_decoders.clone(),
470 },
471 global_dbtx_access_token,
472 )
473 }
474
475 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
476 Self {
477 inner: self.inner.clone(),
478 module_decoders,
479 }
480 }
481
482 pub fn is_global(&self) -> bool {
484 self.inner.is_global()
485 }
486
487 pub fn ensure_global(&self) -> DatabaseResult<()> {
489 if !self.is_global() {
490 return Err(DatabaseError::Other(anyhow::anyhow!(
491 "Database instance not global"
492 )));
493 }
494
495 Ok(())
496 }
497
498 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
500 if self.is_global() {
501 return Err(DatabaseError::Other(anyhow::anyhow!(
502 "Database instance not isolated"
503 )));
504 }
505
506 Ok(())
507 }
508
509 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
511 where
512 's: 'tx,
513 {
514 DatabaseTransaction::<Committable>::new(
515 self.inner.begin_transaction().await,
516 self.module_decoders.clone(),
517 )
518 }
519
520 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
522 where
523 's: 'tx,
524 {
525 self.begin_transaction().await.into_nc()
526 }
527
528 pub fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
529 self.inner.checkpoint(backup_path)
530 }
531
532 pub async fn autocommit<'s, 'dbtx, F, T, E>(
560 &'s self,
561 tx_fn: F,
562 max_attempts: Option<usize>,
563 ) -> std::result::Result<T, AutocommitError<E>>
564 where
565 's: 'dbtx,
566 for<'r, 'o> F: Fn(
567 &'r mut DatabaseTransaction<'o>,
568 PhantomBound<'dbtx, 'o>,
569 ) -> BoxFuture<'r, std::result::Result<T, E>>,
570 {
571 assert_ne!(max_attempts, Some(0));
572 let mut curr_attempts: usize = 0;
573
574 loop {
575 curr_attempts = curr_attempts
580 .checked_add(1)
581 .expect("db autocommit attempt counter overflowed");
582
583 let mut dbtx = self.begin_transaction().await;
584
585 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
586 let val = match tx_fn_res {
587 Ok(val) => val,
588 Err(err) => {
589 dbtx.ignore_uncommitted();
590 return Err(AutocommitError::ClosureError {
591 attempts: curr_attempts,
592 error: err,
593 });
594 }
595 };
596
597 let _timing = timing::TimeReporter::new("autocommit - commit_tx");
598
599 match dbtx.commit_tx_result().await {
600 Ok(()) => {
601 return Ok(val);
602 }
603 Err(err) => {
604 if max_attempts.is_some_and(|max_att| max_att <= curr_attempts) {
605 warn!(
606 target: LOG_DB,
607 curr_attempts,
608 err = %err.fmt_compact(),
609 "Database commit failed in an autocommit block - terminating"
610 );
611 return Err(AutocommitError::CommitFailed {
612 attempts: curr_attempts,
613 last_error: err,
614 });
615 }
616
617 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
618 let delay = rand::thread_rng().gen_range(delay..(2 * delay));
619 warn!(
620 target: LOG_DB,
621 curr_attempts,
622 err = %err.fmt_compact(),
623 delay_ms = %delay,
624 "Database commit failed in an autocommit block - retrying"
625 );
626 crate::runtime::sleep(Duration::from_millis(delay)).await;
627 }
628 }
629 }
630 }
631
632 pub async fn wait_key_check<'a, K, T>(
637 &'a self,
638 key: &K,
639 checker: impl Fn(Option<K::Value>) -> Option<T>,
640 ) -> (T, DatabaseTransaction<'a, Committable>)
641 where
642 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
643 {
644 let key_bytes = key.to_bytes();
645 loop {
646 let notify = self.inner.register(&key_bytes);
648
649 let mut tx = self.inner.begin_transaction().await;
651
652 let maybe_value_bytes = tx
653 .raw_get_bytes(&key_bytes)
654 .await
655 .expect("Unrecoverable error when reading from database")
656 .map(|value_bytes| {
657 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
658 });
659
660 if let Some(value) = checker(maybe_value_bytes) {
661 return (
662 value,
663 DatabaseTransaction::new(tx, self.module_decoders.clone()),
664 );
665 }
666
667 notify.await;
669 }
672 }
673
674 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
676 where
677 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
678 {
679 self.wait_key_check(key, std::convert::identity).await.0
680 }
681}
682
683fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
684 let mut bytes = vec![MODULE_GLOBAL_PREFIX];
685 bytes.append(&mut module_instance_id.consensus_encode_to_vec());
686 bytes
687}
688
689#[derive(Clone, Debug)]
692struct PrefixDatabase<Inner>
693where
694 Inner: Debug,
695{
696 prefix: Vec<u8>,
697 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
698 inner: Inner,
699}
700
701impl<Inner> PrefixDatabase<Inner>
702where
703 Inner: Debug,
704{
705 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
709 let mut full_key = self.prefix.clone();
710 full_key.extend_from_slice(key);
711 full_key
712 }
713}
714
715#[apply(async_trait_maybe_send!)]
716impl<Inner> IDatabase for PrefixDatabase<Inner>
717where
718 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
719{
720 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
721 Box::new(PrefixDatabaseTransaction {
722 inner: self.inner.begin_transaction().await,
723 global_dbtx_access_token: self.global_dbtx_access_token,
724 prefix: self.prefix.clone(),
725 })
726 }
727 async fn register(&self, key: &[u8]) {
728 self.inner.register(&self.get_full_key(key)).await;
729 }
730
731 async fn notify(&self, key: &[u8]) {
732 self.inner.notify(&self.get_full_key(key)).await;
733 }
734
735 fn is_global(&self) -> bool {
736 if self.global_dbtx_access_token.is_some() {
737 false
738 } else {
739 self.inner.is_global()
740 }
741 }
742
743 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
744 self.inner.checkpoint(backup_path)
745 }
746}
747
748#[derive(Debug)]
753struct PrefixDatabaseTransaction<Inner> {
754 inner: Inner,
755 global_dbtx_access_token: Option<GlobalDBTxAccessToken>,
756 prefix: Vec<u8>,
757}
758
759impl<Inner> PrefixDatabaseTransaction<Inner> {
760 fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
764 let mut full_key = self.prefix.clone();
765 full_key.extend_from_slice(key);
766 full_key
767 }
768
769 fn get_full_range(&self, range: Range<&[u8]>) -> Range<Vec<u8>> {
770 Range {
771 start: self.get_full_key(range.start),
772 end: self.get_full_key(range.end),
773 }
774 }
775
776 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
777 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v)))
778 }
779}
780
781#[apply(async_trait_maybe_send!)]
782impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
783where
784 Inner: IDatabaseTransaction,
785{
786 async fn commit_tx(&mut self) -> DatabaseResult<()> {
787 self.inner.commit_tx().await
788 }
789
790 fn is_global(&self) -> bool {
791 if self.global_dbtx_access_token.is_some() {
792 false
793 } else {
794 self.inner.is_global()
795 }
796 }
797
798 fn global_dbtx(
799 &mut self,
800 access_token: GlobalDBTxAccessToken,
801 ) -> &mut dyn IDatabaseTransaction {
802 if let Some(self_global_dbtx_access_token) = self.global_dbtx_access_token {
803 assert_eq!(
804 access_token, self_global_dbtx_access_token,
805 "Invalid access key used to access global_dbtx"
806 );
807 &mut self.inner
808 } else {
809 self.inner.global_dbtx(access_token)
810 }
811 }
812}
813
814#[apply(async_trait_maybe_send!)]
815impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
816where
817 Inner: IDatabaseTransactionOpsCore,
818{
819 async fn raw_insert_bytes(
820 &mut self,
821 key: &[u8],
822 value: &[u8],
823 ) -> DatabaseResult<Option<Vec<u8>>> {
824 let key = self.get_full_key(key);
825 self.inner.raw_insert_bytes(&key, value).await
826 }
827
828 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
829 let key = self.get_full_key(key);
830 self.inner.raw_get_bytes(&key).await
831 }
832
833 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
834 let key = self.get_full_key(key);
835 self.inner.raw_remove_entry(&key).await
836 }
837
838 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
839 let key = self.get_full_key(key_prefix);
840 let stream = self.inner.raw_find_by_prefix(&key).await?;
841 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
842 }
843
844 async fn raw_find_by_prefix_sorted_descending(
845 &mut self,
846 key_prefix: &[u8],
847 ) -> DatabaseResult<PrefixStream<'_>> {
848 let key = self.get_full_key(key_prefix);
849 let stream = self
850 .inner
851 .raw_find_by_prefix_sorted_descending(&key)
852 .await?;
853 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
854 }
855
856 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
857 let range = self.get_full_range(range);
858 let stream = self
859 .inner
860 .raw_find_by_range(Range {
861 start: &range.start,
862 end: &range.end,
863 })
864 .await?;
865 Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
866 }
867
868 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
869 let key = self.get_full_key(key_prefix);
870 self.inner.raw_remove_by_prefix(&key).await
871 }
872}
873
874#[apply(async_trait_maybe_send!)]
875impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
876where
877 Inner: IDatabaseTransactionOps,
878{
879 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
880 self.inner.rollback_tx_to_savepoint().await
881 }
882
883 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
884 self.set_tx_savepoint().await
885 }
886}
887
888#[apply(async_trait_maybe_send!)]
892pub trait IDatabaseTransactionOpsCore: MaybeSend {
893 async fn raw_insert_bytes(
895 &mut self,
896 key: &[u8],
897 value: &[u8],
898 ) -> DatabaseResult<Option<Vec<u8>>>;
899
900 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
902
903 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>>;
905
906 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>>;
909
910 async fn raw_find_by_prefix_sorted_descending(
912 &mut self,
913 key_prefix: &[u8],
914 ) -> DatabaseResult<PrefixStream<'_>>;
915
916 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>>;
920
921 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()>;
923}
924
925#[apply(async_trait_maybe_send!)]
926impl<T> IDatabaseTransactionOpsCore for Box<T>
927where
928 T: IDatabaseTransactionOpsCore + ?Sized,
929{
930 async fn raw_insert_bytes(
931 &mut self,
932 key: &[u8],
933 value: &[u8],
934 ) -> DatabaseResult<Option<Vec<u8>>> {
935 (**self).raw_insert_bytes(key, value).await
936 }
937
938 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
939 (**self).raw_get_bytes(key).await
940 }
941
942 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
943 (**self).raw_remove_entry(key).await
944 }
945
946 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
947 (**self).raw_find_by_prefix(key_prefix).await
948 }
949
950 async fn raw_find_by_prefix_sorted_descending(
951 &mut self,
952 key_prefix: &[u8],
953 ) -> DatabaseResult<PrefixStream<'_>> {
954 (**self)
955 .raw_find_by_prefix_sorted_descending(key_prefix)
956 .await
957 }
958
959 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
960 (**self).raw_find_by_range(range).await
961 }
962
963 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
964 (**self).raw_remove_by_prefix(key_prefix).await
965 }
966}
967
968#[apply(async_trait_maybe_send!)]
969impl<T> IDatabaseTransactionOpsCore for &mut T
970where
971 T: IDatabaseTransactionOpsCore + ?Sized,
972{
973 async fn raw_insert_bytes(
974 &mut self,
975 key: &[u8],
976 value: &[u8],
977 ) -> DatabaseResult<Option<Vec<u8>>> {
978 (**self).raw_insert_bytes(key, value).await
979 }
980
981 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
982 (**self).raw_get_bytes(key).await
983 }
984
985 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
986 (**self).raw_remove_entry(key).await
987 }
988
989 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
990 (**self).raw_find_by_prefix(key_prefix).await
991 }
992
993 async fn raw_find_by_prefix_sorted_descending(
994 &mut self,
995 key_prefix: &[u8],
996 ) -> DatabaseResult<PrefixStream<'_>> {
997 (**self)
998 .raw_find_by_prefix_sorted_descending(key_prefix)
999 .await
1000 }
1001
1002 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
1003 (**self).raw_find_by_range(range).await
1004 }
1005
1006 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1007 (**self).raw_remove_by_prefix(key_prefix).await
1008 }
1009}
1010
1011#[apply(async_trait_maybe_send!)]
1017pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
1018 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()>;
1027
1028 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()>;
1029}
1030
1031#[apply(async_trait_maybe_send!)]
1032impl<T> IDatabaseTransactionOps for Box<T>
1033where
1034 T: IDatabaseTransactionOps + ?Sized,
1035{
1036 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1037 (**self).set_tx_savepoint().await
1038 }
1039
1040 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1041 (**self).rollback_tx_to_savepoint().await
1042 }
1043}
1044
1045#[apply(async_trait_maybe_send!)]
1046impl<T> IDatabaseTransactionOps for &mut T
1047where
1048 T: IDatabaseTransactionOps + ?Sized,
1049{
1050 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1051 (**self).set_tx_savepoint().await
1052 }
1053
1054 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1055 (**self).rollback_tx_to_savepoint().await
1056 }
1057}
1058
1059#[apply(async_trait_maybe_send!)]
1065pub trait IDatabaseTransactionOpsCoreTyped<'a> {
1066 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1067 where
1068 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1069
1070 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1071 where
1072 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1073 K::Value: MaybeSend + MaybeSync;
1074
1075 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1076 where
1077 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1078 K::Value: MaybeSend + MaybeSync;
1079
1080 async fn find_by_range<K>(
1081 &mut self,
1082 key_range: Range<K>,
1083 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1084 where
1085 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1086 K::Value: MaybeSend + MaybeSync;
1087
1088 async fn find_by_prefix<KP>(
1089 &mut self,
1090 key_prefix: &KP,
1091 ) -> Pin<
1092 Box<
1093 maybe_add_send!(
1094 dyn Stream<
1095 Item = (
1096 KP::Record,
1097 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1098 ),
1099 > + '_
1100 ),
1101 >,
1102 >
1103 where
1104 KP: DatabaseLookup + MaybeSend + MaybeSync,
1105 KP::Record: DatabaseKey;
1106
1107 async fn find_by_prefix_sorted_descending<KP>(
1108 &mut self,
1109 key_prefix: &KP,
1110 ) -> Pin<
1111 Box<
1112 maybe_add_send!(
1113 dyn Stream<
1114 Item = (
1115 KP::Record,
1116 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1117 ),
1118 > + '_
1119 ),
1120 >,
1121 >
1122 where
1123 KP: DatabaseLookup + MaybeSend + MaybeSync,
1124 KP::Record: DatabaseKey;
1125
1126 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1127 where
1128 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
1129
1130 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1131 where
1132 KP: DatabaseLookup + MaybeSend + MaybeSync;
1133}
1134
1135#[apply(async_trait_maybe_send!)]
1138impl<T> IDatabaseTransactionOpsCoreTyped<'_> for T
1139where
1140 T: IDatabaseTransactionOpsCore + WithDecoders,
1141{
1142 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
1143 where
1144 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1145 {
1146 let key_bytes = key.to_bytes();
1147 let raw = self
1148 .raw_get_bytes(&key_bytes)
1149 .await
1150 .expect("Unrecoverable error occurred while reading and entry from the database");
1151 raw.map(|value_bytes| {
1152 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1153 })
1154 }
1155
1156 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
1157 where
1158 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1159 K::Value: MaybeSend + MaybeSync,
1160 {
1161 let key_bytes = key.to_bytes();
1162 self.raw_insert_bytes(&key_bytes, &value.to_bytes())
1163 .await
1164 .expect("Unrecoverable error occurred while inserting entry into the database")
1165 .map(|value_bytes| {
1166 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1167 })
1168 }
1169
1170 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
1171 where
1172 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1173 K::Value: MaybeSend + MaybeSync,
1174 {
1175 if let Some(prev) = self.insert_entry(key, value).await {
1176 panic!(
1177 "Database overwriting element when expecting insertion of new entry. Key: {key:?} Prev Value: {prev:?}"
1178 );
1179 }
1180 }
1181
1182 async fn find_by_range<K>(
1183 &mut self,
1184 key_range: Range<K>,
1185 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (K, K::Value)> + '_)>>
1186 where
1187 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1188 K::Value: MaybeSend + MaybeSync,
1189 {
1190 let decoders = self.decoders().clone();
1191 Box::pin(
1192 self.raw_find_by_range(Range {
1193 start: &key_range.start.to_bytes(),
1194 end: &key_range.end.to_bytes(),
1195 })
1196 .await
1197 .expect("Unrecoverable error occurred while listing entries from the database")
1198 .map(move |(key_bytes, value_bytes)| {
1199 let key = decode_key_expect(&key_bytes, &decoders);
1200 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1201 (key, value)
1202 }),
1203 )
1204 }
1205
1206 async fn find_by_prefix<KP>(
1207 &mut self,
1208 key_prefix: &KP,
1209 ) -> Pin<
1210 Box<
1211 maybe_add_send!(
1212 dyn Stream<
1213 Item = (
1214 KP::Record,
1215 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1216 ),
1217 > + '_
1218 ),
1219 >,
1220 >
1221 where
1222 KP: DatabaseLookup + MaybeSend + MaybeSync,
1223 KP::Record: DatabaseKey,
1224 {
1225 let decoders = self.decoders().clone();
1226 Box::pin(
1227 self.raw_find_by_prefix(&key_prefix.to_bytes())
1228 .await
1229 .expect("Unrecoverable error occurred while listing entries from the database")
1230 .map(move |(key_bytes, value_bytes)| {
1231 let key = decode_key_expect(&key_bytes, &decoders);
1232 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1233 (key, value)
1234 }),
1235 )
1236 }
1237
1238 async fn find_by_prefix_sorted_descending<KP>(
1239 &mut self,
1240 key_prefix: &KP,
1241 ) -> Pin<
1242 Box<
1243 maybe_add_send!(
1244 dyn Stream<
1245 Item = (
1246 KP::Record,
1247 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
1248 ),
1249 > + '_
1250 ),
1251 >,
1252 >
1253 where
1254 KP: DatabaseLookup + MaybeSend + MaybeSync,
1255 KP::Record: DatabaseKey,
1256 {
1257 let decoders = self.decoders().clone();
1258 Box::pin(
1259 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1260 .await
1261 .expect("Unrecoverable error occurred while listing entries from the database")
1262 .map(move |(key_bytes, value_bytes)| {
1263 let key = decode_key_expect(&key_bytes, &decoders);
1264 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1265 (key, value)
1266 }),
1267 )
1268 }
1269 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1270 where
1271 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1272 {
1273 let key_bytes = key.to_bytes();
1274 self.raw_remove_entry(&key_bytes)
1275 .await
1276 .expect("Unrecoverable error occurred while inserting removing entry from the database")
1277 .map(|value_bytes| {
1278 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1279 })
1280 }
1281 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1282 where
1283 KP: DatabaseLookup + MaybeSend + MaybeSync,
1284 {
1285 self.raw_remove_by_prefix(&key_prefix.to_bytes())
1286 .await
1287 .expect("Unrecoverable error when removing entries from the database");
1288 }
1289}
1290
1291pub trait WithDecoders {
1294 fn decoders(&self) -> &ModuleDecoderRegistry;
1295}
1296
1297#[apply(async_trait_maybe_send!)]
1299pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1300 async fn commit_tx(self) -> DatabaseResult<()>;
1301}
1302
1303#[apply(async_trait_maybe_send!)]
1307pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1308 async fn commit_tx(&mut self) -> DatabaseResult<()>;
1310
1311 fn is_global(&self) -> bool;
1313
1314 #[doc(hidden)]
1319 fn global_dbtx(&mut self, access_token: GlobalDBTxAccessToken)
1320 -> &mut dyn IDatabaseTransaction;
1321}
1322
1323#[apply(async_trait_maybe_send!)]
1324impl<T> IDatabaseTransaction for Box<T>
1325where
1326 T: IDatabaseTransaction + ?Sized,
1327{
1328 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1329 (**self).commit_tx().await
1330 }
1331
1332 fn is_global(&self) -> bool {
1333 (**self).is_global()
1334 }
1335
1336 fn global_dbtx(
1337 &mut self,
1338 access_token: GlobalDBTxAccessToken,
1339 ) -> &mut dyn IDatabaseTransaction {
1340 (**self).global_dbtx(access_token)
1341 }
1342}
1343
1344#[apply(async_trait_maybe_send!)]
1345impl<'a, T> IDatabaseTransaction for &'a mut T
1346where
1347 T: IDatabaseTransaction + ?Sized,
1348{
1349 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1350 (**self).commit_tx().await
1351 }
1352
1353 fn is_global(&self) -> bool {
1354 (**self).is_global()
1355 }
1356
1357 fn global_dbtx(&mut self, access_key: GlobalDBTxAccessToken) -> &mut dyn IDatabaseTransaction {
1358 (**self).global_dbtx(access_key)
1359 }
1360}
1361
1362struct BaseDatabaseTransaction<Tx> {
1365 raw: Option<Tx>,
1367 notify_queue: Option<NotifyQueue>,
1368 notifications: Arc<Notifications>,
1369}
1370
1371impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1372where
1373 Tx: fmt::Debug,
1374{
1375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1376 f.write_fmt(format_args!(
1377 "BaseDatabaseTransaction{{ raw={:?} }}",
1378 self.raw
1379 ))
1380 }
1381}
1382impl<Tx> BaseDatabaseTransaction<Tx>
1383where
1384 Tx: IRawDatabaseTransaction,
1385{
1386 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> Self {
1387 Self {
1388 raw: Some(dbtx),
1389 notifications,
1390 notify_queue: Some(NotifyQueue::new()),
1391 }
1392 }
1393
1394 fn add_notification_key(&mut self, key: &[u8]) -> DatabaseResult<()> {
1395 self.notify_queue
1396 .as_mut()
1397 .ok_or(DatabaseError::TransactionConsumed)?
1398 .add(key);
1399 Ok(())
1400 }
1401}
1402
1403#[apply(async_trait_maybe_send!)]
1404impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1405 async fn raw_insert_bytes(
1406 &mut self,
1407 key: &[u8],
1408 value: &[u8],
1409 ) -> DatabaseResult<Option<Vec<u8>>> {
1410 self.add_notification_key(key)?;
1411 self.raw
1412 .as_mut()
1413 .ok_or(DatabaseError::TransactionConsumed)?
1414 .raw_insert_bytes(key, value)
1415 .await
1416 }
1417
1418 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1419 self.raw
1420 .as_mut()
1421 .ok_or(DatabaseError::TransactionConsumed)?
1422 .raw_get_bytes(key)
1423 .await
1424 }
1425
1426 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1427 self.add_notification_key(key)?;
1428 self.raw
1429 .as_mut()
1430 .ok_or(DatabaseError::TransactionConsumed)?
1431 .raw_remove_entry(key)
1432 .await
1433 }
1434
1435 async fn raw_find_by_range(
1436 &mut self,
1437 key_range: Range<&[u8]>,
1438 ) -> DatabaseResult<PrefixStream<'_>> {
1439 self.raw
1440 .as_mut()
1441 .ok_or(DatabaseError::TransactionConsumed)?
1442 .raw_find_by_range(key_range)
1443 .await
1444 }
1445
1446 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1447 self.raw
1448 .as_mut()
1449 .ok_or(DatabaseError::TransactionConsumed)?
1450 .raw_find_by_prefix(key_prefix)
1451 .await
1452 }
1453
1454 async fn raw_find_by_prefix_sorted_descending(
1455 &mut self,
1456 key_prefix: &[u8],
1457 ) -> DatabaseResult<PrefixStream<'_>> {
1458 self.raw
1459 .as_mut()
1460 .ok_or(DatabaseError::TransactionConsumed)?
1461 .raw_find_by_prefix_sorted_descending(key_prefix)
1462 .await
1463 }
1464
1465 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1466 self.raw
1467 .as_mut()
1468 .ok_or(DatabaseError::TransactionConsumed)?
1469 .raw_remove_by_prefix(key_prefix)
1470 .await
1471 }
1472}
1473
1474#[apply(async_trait_maybe_send!)]
1475impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1476 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1477 self.raw
1478 .as_mut()
1479 .ok_or(DatabaseError::TransactionConsumed)?
1480 .rollback_tx_to_savepoint()
1481 .await?;
1482 Ok(())
1483 }
1484
1485 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1486 self.raw
1487 .as_mut()
1488 .ok_or(DatabaseError::TransactionConsumed)?
1489 .set_tx_savepoint()
1490 .await?;
1491 Ok(())
1492 }
1493}
1494
1495#[apply(async_trait_maybe_send!)]
1496impl<Tx: IRawDatabaseTransaction + fmt::Debug> IDatabaseTransaction
1497 for BaseDatabaseTransaction<Tx>
1498{
1499 async fn commit_tx(&mut self) -> DatabaseResult<()> {
1500 self.raw
1501 .take()
1502 .ok_or(DatabaseError::TransactionConsumed)?
1503 .commit_tx()
1504 .await?;
1505 self.notifications.submit_queue(
1506 &self
1507 .notify_queue
1508 .take()
1509 .expect("commit must be called only once"),
1510 );
1511 Ok(())
1512 }
1513
1514 fn is_global(&self) -> bool {
1515 true
1516 }
1517
1518 fn global_dbtx(
1519 &mut self,
1520 _access_token: GlobalDBTxAccessToken,
1521 ) -> &mut dyn IDatabaseTransaction {
1522 panic!("Illegal to call global_dbtx on BaseDatabaseTransaction");
1523 }
1524}
1525
1526#[derive(Clone)]
1529struct CommitTracker {
1530 is_committed: bool,
1532 has_writes: bool,
1534 ignore_uncommitted: bool,
1536}
1537
1538impl Drop for CommitTracker {
1539 fn drop(&mut self) {
1540 if self.has_writes && !self.is_committed {
1541 if self.ignore_uncommitted {
1542 trace!(
1543 target: LOG_DB,
1544 "DatabaseTransaction has writes and has not called commit, but that's expected."
1545 );
1546 } else {
1547 warn!(
1548 target: LOG_DB,
1549 location = ?backtrace::Backtrace::new(),
1550 "DatabaseTransaction has writes and has not called commit."
1551 );
1552 }
1553 }
1554 }
1555}
1556
1557enum MaybeRef<'a, T> {
1558 Owned(T),
1559 Borrowed(&'a mut T),
1560}
1561
1562impl<T> ops::Deref for MaybeRef<'_, T> {
1563 type Target = T;
1564
1565 fn deref(&self) -> &Self::Target {
1566 match self {
1567 MaybeRef::Owned(o) => o,
1568 MaybeRef::Borrowed(r) => r,
1569 }
1570 }
1571}
1572
1573impl<T> ops::DerefMut for MaybeRef<'_, T> {
1574 fn deref_mut(&mut self) -> &mut Self::Target {
1575 match self {
1576 MaybeRef::Owned(o) => o,
1577 MaybeRef::Borrowed(r) => r,
1578 }
1579 }
1580}
1581
1582pub struct Committable;
1586
1587pub struct NonCommittable;
1591
1592pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1596 tx: Box<dyn IDatabaseTransaction + 'tx>,
1597 decoders: ModuleDecoderRegistry,
1598 commit_tracker: MaybeRef<'tx, CommitTracker>,
1599 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1600 capability: marker::PhantomData<Cap>,
1601}
1602
1603impl<Cap> fmt::Debug for DatabaseTransaction<'_, Cap> {
1604 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1605 f.write_fmt(format_args!(
1606 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1607 self.tx, self.decoders
1608 ))
1609 }
1610}
1611
1612impl<Cap> WithDecoders for DatabaseTransaction<'_, Cap> {
1613 fn decoders(&self) -> &ModuleDecoderRegistry {
1614 &self.decoders
1615 }
1616}
1617
1618#[instrument(target = LOG_DB, level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1619fn decode_value<V: DatabaseValue>(
1620 value_bytes: &[u8],
1621 decoders: &ModuleDecoderRegistry,
1622) -> std::result::Result<V, DecodingError> {
1623 trace!(
1624 bytes = %AbbreviateHexBytes(value_bytes),
1625 "decoding value",
1626 );
1627 V::from_bytes(value_bytes, decoders)
1628}
1629
1630#[track_caller]
1631fn decode_value_expect<V: DatabaseValue>(
1632 value_bytes: &[u8],
1633 decoders: &ModuleDecoderRegistry,
1634 key_bytes: &[u8],
1635) -> V {
1636 decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1637 panic!(
1638 "Unrecoverable decoding DatabaseValue as {}; err={}, key_bytes={}, val_bytes={}",
1639 any::type_name::<V>(),
1640 err,
1641 AbbreviateHexBytes(key_bytes),
1642 AbbreviateHexBytes(value_bytes),
1643 )
1644 })
1645}
1646
1647#[track_caller]
1648fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1649 trace!(
1650 bytes = %AbbreviateHexBytes(key_bytes),
1651 "decoding key",
1652 );
1653 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1654 panic!(
1655 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1656 any::type_name::<K>(),
1657 err,
1658 AbbreviateHexBytes(key_bytes)
1659 )
1660 })
1661}
1662
1663impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1664 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1666 DatabaseTransaction {
1667 tx: self.tx,
1668 decoders: self.decoders,
1669 commit_tracker: self.commit_tracker,
1670 on_commit_hooks: self.on_commit_hooks,
1671 capability: PhantomData::<NonCommittable>,
1672 }
1673 }
1674
1675 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1677 where
1678 's: 'a,
1679 {
1680 self.to_ref().into_nc()
1681 }
1682
1683 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1685 where
1686 'tx: 'a,
1687 {
1688 DatabaseTransaction {
1689 tx: Box::new(PrefixDatabaseTransaction {
1690 inner: self.tx,
1691 global_dbtx_access_token: None,
1692 prefix,
1693 }),
1694 decoders: self.decoders,
1695 commit_tracker: self.commit_tracker,
1696 on_commit_hooks: self.on_commit_hooks,
1697 capability: self.capability,
1698 }
1699 }
1700
1701 pub fn with_prefix_module_id<'a: 'tx>(
1705 self,
1706 module_instance_id: ModuleInstanceId,
1707 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1708 where
1709 'tx: 'a,
1710 {
1711 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1712 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1713 (
1714 DatabaseTransaction {
1715 tx: Box::new(PrefixDatabaseTransaction {
1716 inner: self.tx,
1717 global_dbtx_access_token: Some(global_dbtx_access_token),
1718 prefix,
1719 }),
1720 decoders: self.decoders,
1721 commit_tracker: self.commit_tracker,
1722 on_commit_hooks: self.on_commit_hooks,
1723 capability: self.capability,
1724 },
1725 global_dbtx_access_token,
1726 )
1727 }
1728
1729 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1731 where
1732 's: 'a,
1733 {
1734 let decoders = self.decoders.clone();
1735
1736 DatabaseTransaction {
1737 tx: Box::new(&mut self.tx),
1738 decoders,
1739 commit_tracker: match self.commit_tracker {
1740 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1741 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1742 },
1743 on_commit_hooks: match self.on_commit_hooks {
1744 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1745 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1746 },
1747 capability: self.capability,
1748 }
1749 }
1750
1751 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1753 where
1754 'tx: 'a,
1755 {
1756 DatabaseTransaction {
1757 tx: Box::new(PrefixDatabaseTransaction {
1758 inner: &mut self.tx,
1759 global_dbtx_access_token: None,
1760 prefix,
1761 }),
1762 decoders: self.decoders.clone(),
1763 commit_tracker: match self.commit_tracker {
1764 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1765 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1766 },
1767 on_commit_hooks: match self.on_commit_hooks {
1768 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1769 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1770 },
1771 capability: self.capability,
1772 }
1773 }
1774
1775 pub fn to_ref_with_prefix_module_id<'a>(
1776 &'a mut self,
1777 module_instance_id: ModuleInstanceId,
1778 ) -> (DatabaseTransaction<'a, Cap>, GlobalDBTxAccessToken)
1779 where
1780 'tx: 'a,
1781 {
1782 let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1783 let global_dbtx_access_token = GlobalDBTxAccessToken::from_prefix(&prefix);
1784 (
1785 DatabaseTransaction {
1786 tx: Box::new(PrefixDatabaseTransaction {
1787 inner: &mut self.tx,
1788 global_dbtx_access_token: Some(global_dbtx_access_token),
1789 prefix,
1790 }),
1791 decoders: self.decoders.clone(),
1792 commit_tracker: match self.commit_tracker {
1793 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1794 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1795 },
1796 on_commit_hooks: match self.on_commit_hooks {
1797 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1798 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1799 },
1800 capability: self.capability,
1801 },
1802 global_dbtx_access_token,
1803 )
1804 }
1805
1806 pub fn is_global(&self) -> bool {
1808 self.tx.is_global()
1809 }
1810
1811 pub fn ensure_global(&self) -> DatabaseResult<()> {
1813 if !self.is_global() {
1814 return Err(DatabaseError::Other(anyhow::anyhow!(
1815 "Database instance not global"
1816 )));
1817 }
1818
1819 Ok(())
1820 }
1821
1822 pub fn ensure_isolated(&self) -> DatabaseResult<()> {
1824 if self.is_global() {
1825 return Err(DatabaseError::Other(anyhow::anyhow!(
1826 "Database instance not isolated"
1827 )));
1828 }
1829
1830 Ok(())
1831 }
1832
1833 pub fn ignore_uncommitted(&mut self) -> &mut Self {
1835 self.commit_tracker.ignore_uncommitted = true;
1836 self
1837 }
1838
1839 pub fn warn_uncommitted(&mut self) -> &mut Self {
1841 self.commit_tracker.ignore_uncommitted = false;
1842 self
1843 }
1844
1845 #[instrument(target = LOG_DB, level = "trace", skip_all)]
1847 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1848 self.on_commit_hooks.push(Box::new(f));
1849 }
1850
1851 pub fn global_dbtx<'a>(
1852 &'a mut self,
1853 access_token: GlobalDBTxAccessToken,
1854 ) -> DatabaseTransaction<'a, Cap>
1855 where
1856 'tx: 'a,
1857 {
1858 let decoders = self.decoders.clone();
1859
1860 DatabaseTransaction {
1861 tx: Box::new(self.tx.global_dbtx(access_token)),
1862 decoders,
1863 commit_tracker: match self.commit_tracker {
1864 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1865 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1866 },
1867 on_commit_hooks: match self.on_commit_hooks {
1868 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1869 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1870 },
1871 capability: self.capability,
1872 }
1873 }
1874}
1875
1876#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1878pub struct GlobalDBTxAccessToken(u32);
1879
1880impl GlobalDBTxAccessToken {
1881 fn from_prefix(prefix: &[u8]) -> Self {
1892 Self(prefix.iter().fold(0, |acc, b| acc + u32::from(*b)) + 513)
1893 }
1894}
1895
1896impl<'tx> DatabaseTransaction<'tx, Committable> {
1897 pub fn new(dbtx: Box<dyn IDatabaseTransaction + 'tx>, decoders: ModuleDecoderRegistry) -> Self {
1898 Self {
1899 tx: dbtx,
1900 decoders,
1901 commit_tracker: MaybeRef::Owned(CommitTracker {
1902 is_committed: false,
1903 has_writes: false,
1904 ignore_uncommitted: false,
1905 }),
1906 on_commit_hooks: MaybeRef::Owned(vec![]),
1907 capability: PhantomData,
1908 }
1909 }
1910
1911 pub async fn commit_tx_result(mut self) -> DatabaseResult<()> {
1912 self.commit_tracker.is_committed = true;
1913 let commit_result = self.tx.commit_tx().await;
1914
1915 if commit_result.is_ok() {
1917 for hook in self.on_commit_hooks.deref_mut().drain(..) {
1918 hook();
1919 }
1920 }
1921
1922 commit_result
1923 }
1924
1925 pub async fn commit_tx(mut self) {
1926 self.commit_tracker.is_committed = true;
1927 self.commit_tx_result()
1928 .await
1929 .expect("Unrecoverable error occurred while committing to the database.");
1930 }
1931}
1932
1933#[apply(async_trait_maybe_send!)]
1934impl<Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'_, Cap>
1935where
1936 Cap: Send,
1937{
1938 async fn raw_insert_bytes(
1939 &mut self,
1940 key: &[u8],
1941 value: &[u8],
1942 ) -> DatabaseResult<Option<Vec<u8>>> {
1943 self.commit_tracker.has_writes = true;
1944 self.tx.raw_insert_bytes(key, value).await
1945 }
1946
1947 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1948 self.tx.raw_get_bytes(key).await
1949 }
1950
1951 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
1952 self.tx.raw_remove_entry(key).await
1953 }
1954
1955 async fn raw_find_by_range(
1956 &mut self,
1957 key_range: Range<&[u8]>,
1958 ) -> DatabaseResult<PrefixStream<'_>> {
1959 self.tx.raw_find_by_range(key_range).await
1960 }
1961
1962 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
1963 self.tx.raw_find_by_prefix(key_prefix).await
1964 }
1965
1966 async fn raw_find_by_prefix_sorted_descending(
1967 &mut self,
1968 key_prefix: &[u8],
1969 ) -> DatabaseResult<PrefixStream<'_>> {
1970 self.tx
1971 .raw_find_by_prefix_sorted_descending(key_prefix)
1972 .await
1973 }
1974
1975 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
1976 self.commit_tracker.has_writes = true;
1977 self.tx.raw_remove_by_prefix(key_prefix).await
1978 }
1979}
1980#[apply(async_trait_maybe_send!)]
1981impl IDatabaseTransactionOps for DatabaseTransaction<'_, Committable> {
1982 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
1983 self.tx.set_tx_savepoint().await
1984 }
1985
1986 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
1987 self.tx.rollback_tx_to_savepoint().await
1988 }
1989}
1990
1991impl<T> DatabaseKeyPrefix for T
1992where
1993 T: DatabaseLookup + crate::encoding::Encodable + Debug,
1994{
1995 fn to_bytes(&self) -> Vec<u8> {
1996 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1997 data.append(&mut self.consensus_encode_to_vec());
1998 data
1999 }
2000}
2001
2002impl<T> DatabaseKey for T
2003where
2004 T: DatabaseRecord + crate::encoding::Decodable + Sized,
2007{
2008 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
2009 fn from_bytes(
2010 data: &[u8],
2011 modules: &ModuleDecoderRegistry,
2012 ) -> std::result::Result<Self, DecodingError> {
2013 if data.is_empty() {
2014 return Err(DecodingError::wrong_length(1, 0));
2016 }
2017
2018 if data[0] != Self::DB_PREFIX {
2019 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
2020 }
2021
2022 <Self as crate::encoding::Decodable>::consensus_decode_whole(&data[1..], modules)
2023 .map_err(|decode_error| DecodingError::Other(decode_error.0))
2024 }
2025}
2026
2027impl<T> DatabaseValue for T
2028where
2029 T: Debug + Encodable + Decodable,
2030{
2031 fn from_bytes(
2032 data: &[u8],
2033 modules: &ModuleDecoderRegistry,
2034 ) -> std::result::Result<Self, DecodingError> {
2035 T::consensus_decode_whole(data, modules).map_err(|e| DecodingError::Other(e.0))
2036 }
2037
2038 fn to_bytes(&self) -> Vec<u8> {
2039 self.consensus_encode_to_vec()
2040 }
2041}
2042
2043#[macro_export]
2104macro_rules! impl_db_record {
2105 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr_2021 $(, notify_on_modify = $notify:tt)? $(,)?) => {
2106 impl $crate::db::DatabaseRecord for $key {
2107 const DB_PREFIX: u8 = $db_prefix as u8;
2108 $(const NOTIFY_ON_MODIFY: bool = $notify;)?
2109 type Key = Self;
2110 type Value = $val;
2111 }
2112 $(
2113 impl_db_record! {
2114 @impl_notify_marker key = $key, notify_on_modify = $notify
2115 }
2116 )?
2117 };
2118 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
2120 impl $crate::db::DatabaseKeyWithNotify for $key {}
2121 };
2122 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
2124}
2125
2126#[macro_export]
2127macro_rules! impl_db_lookup{
2128 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
2129 $(
2130 impl $crate::db::DatabaseLookup for $query_prefix {
2131 type Record = $key;
2132 }
2133 )*
2134 };
2135}
2136
2137#[derive(Debug, Encodable, Decodable, Serialize)]
2139pub struct DatabaseVersionKeyV0;
2140
2141#[derive(Debug, Encodable, Decodable, Serialize)]
2142pub struct DatabaseVersionKey(pub ModuleInstanceId);
2143
2144#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
2145pub struct DatabaseVersion(pub u64);
2146
2147impl_db_record!(
2148 key = DatabaseVersionKeyV0,
2149 value = DatabaseVersion,
2150 db_prefix = DbKeyPrefix::DatabaseVersion
2151);
2152
2153impl_db_record!(
2154 key = DatabaseVersionKey,
2155 value = DatabaseVersion,
2156 db_prefix = DbKeyPrefix::DatabaseVersion
2157);
2158
2159impl std::fmt::Display for DatabaseVersion {
2160 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2161 write!(f, "{}", self.0)
2162 }
2163}
2164
2165impl DatabaseVersion {
2166 pub fn increment(&self) -> Self {
2167 Self(self.0 + 1)
2168 }
2169}
2170
2171impl std::fmt::Display for DbKeyPrefix {
2172 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2173 write!(f, "{self:?}")
2174 }
2175}
2176
2177#[repr(u8)]
2178#[derive(Clone, EnumIter, Debug)]
2179pub enum DbKeyPrefix {
2180 DatabaseVersion = 0x50,
2181 ClientBackup = 0x51,
2182}
2183
2184#[derive(Debug, Error)]
2185pub enum DecodingError {
2186 #[error("Key had a wrong prefix, expected {expected} but got {found}")]
2187 WrongPrefix { expected: u8, found: u8 },
2188 #[error("Key had a wrong length, expected {expected} but got {found}")]
2189 WrongLength { expected: usize, found: usize },
2190 #[error("Other decoding error: {0:#}")]
2191 Other(anyhow::Error),
2192}
2193
2194impl DecodingError {
2195 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2196 Self::Other(anyhow::Error::from(error))
2197 }
2198
2199 pub fn wrong_prefix(expected: u8, found: u8) -> Self {
2200 Self::WrongPrefix { expected, found }
2201 }
2202
2203 pub fn wrong_length(expected: usize, found: usize) -> Self {
2204 Self::WrongLength { expected, found }
2205 }
2206}
2207
2208#[derive(Debug, Error)]
2210pub enum DatabaseError {
2211 #[error("Write-write conflict detected")]
2214 WriteConflict,
2215
2216 #[error("Transaction already consumed")]
2219 TransactionConsumed,
2220
2221 #[error("Database backend error: {0}")]
2223 DatabaseBackend(#[from] Box<dyn Error + Send + Sync>),
2224
2225 #[error("Database error: {0:#}")]
2227 Other(anyhow::Error),
2228}
2229
2230impl DatabaseError {
2231 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> Self {
2233 Self::Other(anyhow::Error::from(error))
2234 }
2235
2236 pub fn backend<E: Error + Send + Sync + 'static>(error: E) -> Self {
2238 Self::DatabaseBackend(Box::new(error))
2239 }
2240}
2241
2242impl From<anyhow::Error> for DatabaseError {
2243 fn from(error: anyhow::Error) -> Self {
2244 Self::Other(error)
2245 }
2246}
2247
2248#[macro_export]
2249macro_rules! push_db_pair_items {
2250 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
2251 let db_items =
2252 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2253 .await
2254 .map(|(key, val)| {
2255 (
2256 $crate::encoding::Encodable::consensus_encode_to_hex(&key),
2257 val,
2258 )
2259 })
2260 .collect::<BTreeMap<String, $value_type>>()
2261 .await;
2262
2263 $map.insert($key_literal.to_string(), Box::new(db_items));
2264 };
2265}
2266
2267#[macro_export]
2268macro_rules! push_db_key_items {
2269 ($dbtx:ident, $prefix_type:expr_2021, $key_type:ty, $map:ident, $key_literal:literal) => {
2270 let db_items =
2271 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
2272 .await
2273 .map(|(key, _)| key)
2274 .collect::<Vec<$key_type>>()
2275 .await;
2276
2277 $map.insert($key_literal.to_string(), Box::new(db_items));
2278 };
2279}
2280
2281pub struct DbMigrationFnContext<'tx, C> {
2295 dbtx: DatabaseTransaction<'tx>,
2296 module_instance_id: Option<ModuleInstanceId>,
2297 ctx: C,
2298 __please_use_constructor: (),
2299}
2300
2301impl<'tx, C> DbMigrationFnContext<'tx, C> {
2302 pub fn new(
2303 dbtx: DatabaseTransaction<'tx>,
2304 module_instance_id: Option<ModuleInstanceId>,
2305 ctx: C,
2306 ) -> Self {
2307 dbtx.ensure_global().expect("Must pass global dbtx");
2308 Self {
2309 dbtx,
2310 module_instance_id,
2311 ctx,
2312 __please_use_constructor: (),
2314 }
2315 }
2316
2317 pub fn map<R>(self, f: impl FnOnce(C) -> R) -> DbMigrationFnContext<'tx, R> {
2318 DbMigrationFnContext::new(self.dbtx, self.module_instance_id, f(self.ctx))
2319 }
2320
2321 #[doc(hidden)]
2323 pub fn split_dbtx_ctx<'s>(&'s mut self) -> (&'s mut DatabaseTransaction<'tx>, &'s C) {
2324 let Self { dbtx, ctx, .. } = self;
2325
2326 (dbtx, ctx)
2327 }
2328
2329 pub fn dbtx(&'_ mut self) -> DatabaseTransaction<'_> {
2330 if let Some(module_instance_id) = self.module_instance_id {
2331 self.dbtx.to_ref_with_prefix_module_id(module_instance_id).0
2332 } else {
2333 self.dbtx.to_ref_nc()
2334 }
2335 }
2336
2337 #[doc(hidden)]
2339 pub fn module_instance_id(&self) -> Option<ModuleInstanceId> {
2340 self.module_instance_id
2341 }
2342}
2343
2344pub type GeneralDbMigrationFn = DbMigrationFn<()>;
2346pub type GeneralDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2347
2348pub type ClientCoreDbMigrationFn = DbMigrationFn<()>;
2353pub type ClientCoreDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, ()>;
2354
2355pub type DbMigrationFn<C> = Box<
2366 maybe_add_send_sync!(
2367 dyn for<'tx> Fn(
2368 DbMigrationFnContext<'tx, C>,
2369 ) -> Pin<
2370 Box<maybe_add_send!(dyn futures::Future<Output = anyhow::Result<()>> + 'tx)>,
2371 >
2372 ),
2373>;
2374
2375pub fn get_current_database_version<F>(
2379 migrations: &BTreeMap<DatabaseVersion, F>,
2380) -> DatabaseVersion {
2381 let versions = migrations.keys().copied().collect::<Vec<_>>();
2382
2383 if !versions
2386 .windows(2)
2387 .all(|window| window[0].increment() == window[1])
2388 {
2389 panic!("Database Migrations are not defined contiguously");
2390 }
2391
2392 versions
2393 .last()
2394 .map_or(DatabaseVersion(0), DatabaseVersion::increment)
2395}
2396
2397pub async fn apply_migrations<C>(
2398 db: &Database,
2399 ctx: C,
2400 kind: String,
2401 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2402 module_instance_id: Option<ModuleInstanceId>,
2403 external_prefixes_above: Option<u8>,
2406) -> std::result::Result<(), anyhow::Error>
2407where
2408 C: Clone,
2409{
2410 let mut dbtx = db.begin_transaction().await;
2411 apply_migrations_dbtx(
2412 &mut dbtx.to_ref_nc(),
2413 ctx,
2414 kind,
2415 migrations,
2416 module_instance_id,
2417 external_prefixes_above,
2418 )
2419 .await?;
2420
2421 dbtx.commit_tx_result()
2422 .await
2423 .map_err(|e| anyhow::Error::msg(e.to_string()))
2424}
2425pub async fn apply_migrations_dbtx<C>(
2437 global_dbtx: &mut DatabaseTransaction<'_>,
2438 ctx: C,
2439 kind: String,
2440 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
2441 module_instance_id: Option<ModuleInstanceId>,
2442 external_prefixes_above: Option<u8>,
2445) -> std::result::Result<(), anyhow::Error>
2446where
2447 C: Clone,
2448{
2449 let is_new_db = global_dbtx
2452 .raw_find_by_prefix(&[])
2453 .await?
2454 .filter(|(key, _v)| {
2455 std::future::ready(
2456 external_prefixes_above.is_none_or(|external_prefixes_above| {
2457 !key.is_empty() && key[0] < external_prefixes_above
2458 }),
2459 )
2460 })
2461 .next()
2462 .await
2463 .is_none();
2464
2465 let target_db_version = get_current_database_version(&migrations);
2466
2467 create_database_version_dbtx(
2469 global_dbtx,
2470 target_db_version,
2471 module_instance_id,
2472 kind.clone(),
2473 is_new_db,
2474 )
2475 .await?;
2476
2477 let module_instance_id_key = module_instance_id_or_global(module_instance_id);
2478
2479 let disk_version = global_dbtx
2480 .get_value(&DatabaseVersionKey(module_instance_id_key))
2481 .await;
2482
2483 let db_version = if let Some(disk_version) = disk_version {
2484 let mut current_db_version = disk_version;
2485
2486 if current_db_version > target_db_version {
2487 return Err(anyhow::anyhow!(format!(
2488 "On disk database version {current_db_version} for module {kind} was higher than the code database version {target_db_version}."
2489 )));
2490 }
2491
2492 while current_db_version < target_db_version {
2493 if let Some(migration) = migrations.get(¤t_db_version) {
2494 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
2495 migration(DbMigrationFnContext::new(
2496 global_dbtx.to_ref_nc(),
2497 module_instance_id,
2498 ctx.clone(),
2499 ))
2500 .await?;
2501 } else {
2502 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
2503 }
2504
2505 current_db_version = current_db_version.increment();
2506
2507 global_dbtx
2508 .insert_entry(
2509 &DatabaseVersionKey(module_instance_id_key),
2510 ¤t_db_version,
2511 )
2512 .await;
2513 }
2514
2515 current_db_version
2516 } else {
2517 target_db_version
2518 };
2519
2520 debug!(target: LOG_DB, ?kind, ?db_version, "DB Version");
2521 Ok(())
2522}
2523
2524pub async fn create_database_version(
2525 db: &Database,
2526 target_db_version: DatabaseVersion,
2527 module_instance_id: Option<ModuleInstanceId>,
2528 kind: String,
2529 is_new_db: bool,
2530) -> std::result::Result<(), anyhow::Error> {
2531 let mut dbtx = db.begin_transaction().await;
2532
2533 create_database_version_dbtx(
2534 &mut dbtx.to_ref_nc(),
2535 target_db_version,
2536 module_instance_id,
2537 kind,
2538 is_new_db,
2539 )
2540 .await?;
2541
2542 dbtx.commit_tx_result().await?;
2543 Ok(())
2544}
2545
2546pub async fn create_database_version_dbtx(
2550 global_dbtx: &mut DatabaseTransaction<'_>,
2551 target_db_version: DatabaseVersion,
2552 module_instance_id: Option<ModuleInstanceId>,
2553 kind: String,
2554 is_new_db: bool,
2555) -> std::result::Result<(), anyhow::Error> {
2556 let key_module_instance_id = module_instance_id_or_global(module_instance_id);
2557
2558 if global_dbtx
2562 .get_value(&DatabaseVersionKey(key_module_instance_id))
2563 .await
2564 .is_none()
2565 {
2566 let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2575 remove_current_db_version_if_exists(
2576 &mut global_dbtx
2577 .to_ref_with_prefix_module_id(module_instance_id)
2578 .0
2579 .into_nc(),
2580 is_new_db,
2581 target_db_version,
2582 )
2583 .await
2584 } else {
2585 remove_current_db_version_if_exists(
2586 &mut global_dbtx.to_ref().into_nc(),
2587 is_new_db,
2588 target_db_version,
2589 )
2590 .await
2591 };
2592
2593 debug!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2595 global_dbtx
2596 .insert_new_entry(
2597 &DatabaseVersionKey(key_module_instance_id),
2598 ¤t_version_in_module,
2599 )
2600 .await;
2601 }
2602
2603 Ok(())
2604}
2605
2606async fn remove_current_db_version_if_exists(
2611 version_dbtx: &mut DatabaseTransaction<'_>,
2612 is_new_db: bool,
2613 target_db_version: DatabaseVersion,
2614) -> DatabaseVersion {
2615 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2619 match current_version_in_module {
2620 Some(database_version) => database_version,
2621 None if is_new_db => target_db_version,
2622 None => DatabaseVersion(0),
2623 }
2624}
2625
2626fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2629 module_instance_id.map_or_else(
2631 || MODULE_GLOBAL_PREFIX.into(),
2632 |module_instance_id| module_instance_id,
2633 )
2634}
2635#[allow(unused_imports)]
2636mod test_utils {
2637 use std::collections::BTreeMap;
2638 use std::time::Duration;
2639
2640 use fedimint_core::db::DbMigrationFnContext;
2641 use futures::future::ready;
2642 use futures::{Future, FutureExt, StreamExt};
2643 use rand::Rng;
2644 use tokio::join;
2645
2646 use super::{
2647 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, DatabaseVersionKeyV0,
2648 DbMigrationFn, apply_migrations,
2649 };
2650 use crate::core::ModuleKind;
2651 use crate::db::mem_impl::MemDatabase;
2652 use crate::db::{
2653 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2654 };
2655 use crate::encoding::{Decodable, Encodable};
2656 use crate::module::registry::ModuleDecoderRegistry;
2657
2658 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2659 crate::runtime::timeout(Duration::from_millis(10), fut)
2660 .await
2661 .ok()
2662 }
2663
2664 #[repr(u8)]
2665 #[derive(Clone)]
2666 pub enum TestDbKeyPrefix {
2667 Test = 0x42,
2668 AltTest = 0x43,
2669 PercentTestKey = 0x25,
2670 }
2671
2672 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2673 pub(super) struct TestKey(pub u64);
2674
2675 #[derive(Debug, Encodable, Decodable)]
2676 struct DbPrefixTestPrefix;
2677
2678 impl_db_record!(
2679 key = TestKey,
2680 value = TestVal,
2681 db_prefix = TestDbKeyPrefix::Test,
2682 notify_on_modify = true,
2683 );
2684 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2685
2686 #[derive(Debug, Encodable, Decodable)]
2687 struct TestKeyV0(u64, u64);
2688
2689 #[derive(Debug, Encodable, Decodable)]
2690 struct DbPrefixTestPrefixV0;
2691
2692 impl_db_record!(
2693 key = TestKeyV0,
2694 value = TestVal,
2695 db_prefix = TestDbKeyPrefix::Test,
2696 );
2697 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2698
2699 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2700 struct AltTestKey(u64);
2701
2702 #[derive(Debug, Encodable, Decodable)]
2703 struct AltDbPrefixTestPrefix;
2704
2705 impl_db_record!(
2706 key = AltTestKey,
2707 value = TestVal,
2708 db_prefix = TestDbKeyPrefix::AltTest,
2709 );
2710 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2711
2712 #[derive(Debug, Encodable, Decodable)]
2713 struct PercentTestKey(u64);
2714
2715 #[derive(Debug, Encodable, Decodable)]
2716 struct PercentPrefixTestPrefix;
2717
2718 impl_db_record!(
2719 key = PercentTestKey,
2720 value = TestVal,
2721 db_prefix = TestDbKeyPrefix::PercentTestKey,
2722 );
2723
2724 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2725 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2726 pub(super) struct TestVal(pub u64);
2727
2728 const TEST_MODULE_PREFIX: u16 = 1;
2729 const ALT_MODULE_PREFIX: u16 = 2;
2730
2731 pub async fn verify_insert_elements(db: Database) {
2732 let mut dbtx = db.begin_transaction().await;
2733 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2734 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2735 dbtx.commit_tx().await;
2736
2737 let mut dbtx = db.begin_transaction().await;
2739 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2740 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2741 dbtx.commit_tx().await;
2742
2743 let mut dbtx = db.begin_transaction().await;
2745 assert_eq!(
2746 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2747 Some(TestVal(2))
2748 );
2749 assert_eq!(
2750 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2751 Some(TestVal(3))
2752 );
2753 dbtx.commit_tx().await;
2754
2755 let mut dbtx = db.begin_transaction().await;
2756 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2757 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2758 dbtx.commit_tx().await;
2759 }
2760
2761 pub async fn verify_remove_nonexisting(db: Database) {
2762 let mut dbtx = db.begin_transaction().await;
2763 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2764 let removed = dbtx.remove_entry(&TestKey(1)).await;
2765 assert!(removed.is_none());
2766
2767 dbtx.commit_tx().await;
2769 }
2770
2771 pub async fn verify_remove_existing(db: Database) {
2772 let mut dbtx = db.begin_transaction().await;
2773
2774 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2775
2776 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2777
2778 let removed = dbtx.remove_entry(&TestKey(1)).await;
2779 assert_eq!(removed, Some(TestVal(2)));
2780 assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2781
2782 dbtx.commit_tx().await;
2784 }
2785
2786 pub async fn verify_read_own_writes(db: Database) {
2787 let mut dbtx = db.begin_transaction().await;
2788
2789 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2790
2791 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2792
2793 dbtx.commit_tx().await;
2795 }
2796
2797 pub async fn verify_prevent_dirty_reads(db: Database) {
2798 let mut dbtx = db.begin_transaction().await;
2799
2800 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2801
2802 let mut dbtx2 = db.begin_transaction().await;
2804 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2805
2806 dbtx.commit_tx().await;
2808 }
2809
2810 pub async fn verify_find_by_range(db: Database) {
2811 let mut dbtx = db.begin_transaction().await;
2812 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2813 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2814 dbtx.insert_entry(&TestKey(56), &TestVal(7777)).await;
2815
2816 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2817 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2818
2819 {
2820 let mut module_dbtx = dbtx.to_ref_with_prefix_module_id(2).0;
2821 module_dbtx
2822 .insert_entry(&TestKey(300), &TestVal(3000))
2823 .await;
2824 }
2825
2826 dbtx.commit_tx().await;
2827
2828 let mut dbtx = db.begin_transaction_nc().await;
2830
2831 let returned_keys = dbtx
2832 .find_by_range(TestKey(55)..TestKey(56))
2833 .await
2834 .collect::<Vec<_>>()
2835 .await;
2836
2837 let expected = vec![(TestKey(55), TestVal(9999))];
2838
2839 assert_eq!(returned_keys, expected);
2840
2841 let returned_keys = dbtx
2842 .find_by_range(TestKey(54)..TestKey(56))
2843 .await
2844 .collect::<Vec<_>>()
2845 .await;
2846
2847 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2848 assert_eq!(returned_keys, expected);
2849
2850 let returned_keys = dbtx
2851 .find_by_range(TestKey(54)..TestKey(57))
2852 .await
2853 .collect::<Vec<_>>()
2854 .await;
2855
2856 let expected = vec![
2857 (TestKey(54), TestVal(8888)),
2858 (TestKey(55), TestVal(9999)),
2859 (TestKey(56), TestVal(7777)),
2860 ];
2861 assert_eq!(returned_keys, expected);
2862
2863 let mut module_dbtx = dbtx.with_prefix_module_id(2).0;
2864 let test_range = module_dbtx
2865 .find_by_range(TestKey(300)..TestKey(301))
2866 .await
2867 .collect::<Vec<_>>()
2868 .await;
2869 assert!(test_range.len() == 1);
2870 }
2871
2872 pub async fn verify_find_by_prefix(db: Database) {
2873 let mut dbtx = db.begin_transaction().await;
2874 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2875 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2876
2877 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2878 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2879 dbtx.commit_tx().await;
2880
2881 let mut dbtx = db.begin_transaction().await;
2883
2884 let returned_keys = dbtx
2885 .find_by_prefix(&DbPrefixTestPrefix)
2886 .await
2887 .collect::<Vec<_>>()
2888 .await;
2889
2890 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2891 assert_eq!(returned_keys, expected);
2892
2893 let reversed = dbtx
2894 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2895 .await
2896 .collect::<Vec<_>>()
2897 .await;
2898 let mut reversed_expected = expected;
2899 reversed_expected.reverse();
2900 assert_eq!(reversed, reversed_expected);
2901
2902 let returned_keys = dbtx
2903 .find_by_prefix(&AltDbPrefixTestPrefix)
2904 .await
2905 .collect::<Vec<_>>()
2906 .await;
2907
2908 let expected = vec![
2909 (AltTestKey(54), TestVal(6666)),
2910 (AltTestKey(55), TestVal(7777)),
2911 ];
2912 assert_eq!(returned_keys, expected);
2913
2914 let reversed = dbtx
2915 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2916 .await
2917 .collect::<Vec<_>>()
2918 .await;
2919 let mut reversed_expected = expected;
2920 reversed_expected.reverse();
2921 assert_eq!(reversed, reversed_expected);
2922 }
2923
2924 pub async fn verify_commit(db: Database) {
2925 let mut dbtx = db.begin_transaction().await;
2926
2927 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2928 dbtx.commit_tx().await;
2929
2930 let mut dbtx2 = db.begin_transaction().await;
2932 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2933 }
2934
2935 pub async fn verify_rollback_to_savepoint(db: Database) {
2936 let mut dbtx_rollback = db.begin_transaction().await;
2937
2938 dbtx_rollback
2939 .insert_entry(&TestKey(20), &TestVal(2000))
2940 .await;
2941
2942 dbtx_rollback
2943 .set_tx_savepoint()
2944 .await
2945 .expect("Error setting transaction savepoint");
2946
2947 dbtx_rollback
2948 .insert_entry(&TestKey(21), &TestVal(2001))
2949 .await;
2950
2951 assert_eq!(
2952 dbtx_rollback.get_value(&TestKey(20)).await,
2953 Some(TestVal(2000))
2954 );
2955 assert_eq!(
2956 dbtx_rollback.get_value(&TestKey(21)).await,
2957 Some(TestVal(2001))
2958 );
2959
2960 dbtx_rollback
2961 .rollback_tx_to_savepoint()
2962 .await
2963 .expect("Error setting transaction savepoint");
2964
2965 assert_eq!(
2966 dbtx_rollback.get_value(&TestKey(20)).await,
2967 Some(TestVal(2000))
2968 );
2969
2970 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2971
2972 dbtx_rollback.commit_tx().await;
2974 }
2975
2976 pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2977 let mut dbtx = db.begin_transaction().await;
2978 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2979
2980 let mut dbtx2 = db.begin_transaction().await;
2981
2982 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2983
2984 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2985
2986 dbtx2.commit_tx().await;
2987
2988 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2991
2992 let expected_keys = 0;
2993 let returned_keys = dbtx
2994 .find_by_prefix(&DbPrefixTestPrefix)
2995 .await
2996 .fold(0, |returned_keys, (key, value)| async move {
2997 if key == TestKey(100) {
2998 assert!(value.eq(&TestVal(101)));
2999 }
3000 returned_keys + 1
3001 })
3002 .await;
3003
3004 assert_eq!(returned_keys, expected_keys);
3005 }
3006
3007 pub async fn verify_snapshot_isolation(db: Database) {
3008 async fn random_yield() {
3009 let times = if rand::thread_rng().gen_bool(0.5) {
3010 0
3011 } else {
3012 10
3013 };
3014 for _ in 0..times {
3015 tokio::task::yield_now().await;
3016 }
3017 }
3018
3019 for i in 0..1000 {
3021 let base_key = i * 2;
3022 let tx_accepted_key = base_key;
3023 let spent_input_key = base_key + 1;
3024
3025 join!(
3026 async {
3027 random_yield().await;
3028 let mut dbtx = db.begin_transaction().await;
3029
3030 random_yield().await;
3031 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
3032 random_yield().await;
3033 let s = match i % 5 {
3036 0 => dbtx.get_value(&TestKey(spent_input_key)).await,
3037 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
3038 2 => {
3039 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
3040 .await
3041 }
3042 3 => {
3043 dbtx.find_by_prefix(&DbPrefixTestPrefix)
3044 .await
3045 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
3046 .map(|(_k, v)| v)
3047 .next()
3048 .await
3049 }
3050 4 => {
3051 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
3052 .await
3053 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
3054 .map(|(_k, v)| v)
3055 .next()
3056 .await
3057 }
3058 _ => {
3059 panic!("woot?");
3060 }
3061 };
3062
3063 match (a, s) {
3064 (None, None) | (Some(_), Some(_)) => {}
3065 (None, Some(_)) => panic!("none some?! {i}"),
3066 (Some(_), None) => panic!("some none?! {i}"),
3067 }
3068 },
3069 async {
3070 random_yield().await;
3071
3072 let mut dbtx = db.begin_transaction().await;
3073 random_yield().await;
3074 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
3075
3076 random_yield().await;
3077 assert_eq!(
3078 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
3079 .await,
3080 None
3081 );
3082
3083 random_yield().await;
3084 assert_eq!(
3085 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
3086 .await,
3087 None
3088 );
3089 random_yield().await;
3090 dbtx.commit_tx().await;
3091 }
3092 );
3093 }
3094 }
3095
3096 pub async fn verify_phantom_entry(db: Database) {
3097 let mut dbtx = db.begin_transaction().await;
3098
3099 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3100
3101 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3102
3103 dbtx.commit_tx().await;
3104
3105 let mut dbtx = db.begin_transaction().await;
3106 let expected_keys = 2;
3107 let returned_keys = dbtx
3108 .find_by_prefix(&DbPrefixTestPrefix)
3109 .await
3110 .fold(0, |returned_keys, (key, value)| async move {
3111 match key {
3112 TestKey(100) => {
3113 assert!(value.eq(&TestVal(101)));
3114 }
3115 TestKey(101) => {
3116 assert!(value.eq(&TestVal(102)));
3117 }
3118 _ => {}
3119 }
3120 returned_keys + 1
3121 })
3122 .await;
3123
3124 assert_eq!(returned_keys, expected_keys);
3125
3126 let mut dbtx2 = db.begin_transaction().await;
3127
3128 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
3129
3130 dbtx2.commit_tx().await;
3131
3132 let returned_keys = dbtx
3133 .find_by_prefix(&DbPrefixTestPrefix)
3134 .await
3135 .fold(0, |returned_keys, (key, value)| async move {
3136 match key {
3137 TestKey(100) => {
3138 assert!(value.eq(&TestVal(101)));
3139 }
3140 TestKey(101) => {
3141 assert!(value.eq(&TestVal(102)));
3142 }
3143 _ => {}
3144 }
3145 returned_keys + 1
3146 })
3147 .await;
3148
3149 assert_eq!(returned_keys, expected_keys);
3150 }
3151
3152 pub async fn expect_write_conflict(db: Database) {
3153 let mut dbtx = db.begin_transaction().await;
3154 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3155 dbtx.commit_tx().await;
3156
3157 let mut dbtx2 = db.begin_transaction().await;
3158 let mut dbtx3 = db.begin_transaction().await;
3159
3160 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
3161
3162 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
3166
3167 dbtx2.commit_tx().await;
3168 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
3169 }
3170
3171 pub async fn verify_string_prefix(db: Database) {
3172 let mut dbtx = db.begin_transaction().await;
3173 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
3174
3175 assert_eq!(
3176 dbtx.get_value(&PercentTestKey(100)).await,
3177 Some(TestVal(101))
3178 );
3179
3180 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3181
3182 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3183
3184 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
3185
3186 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
3189
3190 let expected_keys = 4;
3191 let returned_keys = dbtx
3192 .find_by_prefix(&PercentPrefixTestPrefix)
3193 .await
3194 .fold(0, |returned_keys, (key, value)| async move {
3195 if matches!(key, PercentTestKey(101)) {
3196 assert!(value.eq(&TestVal(100)));
3197 }
3198 returned_keys + 1
3199 })
3200 .await;
3201
3202 assert_eq!(returned_keys, expected_keys);
3203 }
3204
3205 pub async fn verify_remove_by_prefix(db: Database) {
3206 let mut dbtx = db.begin_transaction().await;
3207
3208 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3209
3210 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3211
3212 dbtx.commit_tx().await;
3213
3214 let mut remove_dbtx = db.begin_transaction().await;
3215 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
3216 remove_dbtx.commit_tx().await;
3217
3218 let mut dbtx = db.begin_transaction().await;
3219 let expected_keys = 0;
3220 let returned_keys = dbtx
3221 .find_by_prefix(&DbPrefixTestPrefix)
3222 .await
3223 .fold(0, |returned_keys, (key, value)| async move {
3224 match key {
3225 TestKey(100) => {
3226 assert!(value.eq(&TestVal(101)));
3227 }
3228 TestKey(101) => {
3229 assert!(value.eq(&TestVal(102)));
3230 }
3231 _ => {}
3232 }
3233 returned_keys + 1
3234 })
3235 .await;
3236
3237 assert_eq!(returned_keys, expected_keys);
3238 }
3239
3240 pub async fn verify_module_db(db: Database, module_db: Database) {
3241 let mut dbtx = db.begin_transaction().await;
3242
3243 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
3244
3245 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
3246
3247 dbtx.commit_tx().await;
3248
3249 let mut module_dbtx = module_db.begin_transaction().await;
3251 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
3252
3253 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
3254
3255 let mut dbtx = db.begin_transaction().await;
3257 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
3258
3259 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
3260
3261 let mut module_dbtx = module_db.begin_transaction().await;
3262
3263 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
3264
3265 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
3266
3267 module_dbtx.commit_tx().await;
3268
3269 let expected_keys = 2;
3270 let mut dbtx = db.begin_transaction().await;
3271 let returned_keys = dbtx
3272 .find_by_prefix(&DbPrefixTestPrefix)
3273 .await
3274 .fold(0, |returned_keys, (key, value)| async move {
3275 match key {
3276 TestKey(100) => {
3277 assert!(value.eq(&TestVal(101)));
3278 }
3279 TestKey(101) => {
3280 assert!(value.eq(&TestVal(102)));
3281 }
3282 _ => {}
3283 }
3284 returned_keys + 1
3285 })
3286 .await;
3287
3288 assert_eq!(returned_keys, expected_keys);
3289
3290 let removed = dbtx.remove_entry(&TestKey(100)).await;
3291 assert_eq!(removed, Some(TestVal(101)));
3292 assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
3293
3294 let mut module_dbtx = module_db.begin_transaction().await;
3295 assert_eq!(
3296 module_dbtx.get_value(&TestKey(100)).await,
3297 Some(TestVal(103))
3298 );
3299 }
3300
3301 pub async fn verify_module_prefix(db: Database) {
3302 let mut test_dbtx = db.begin_transaction().await;
3303 {
3304 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3305
3306 test_module_dbtx
3307 .insert_entry(&TestKey(100), &TestVal(101))
3308 .await;
3309
3310 test_module_dbtx
3311 .insert_entry(&TestKey(101), &TestVal(102))
3312 .await;
3313 }
3314
3315 test_dbtx.commit_tx().await;
3316
3317 let mut alt_dbtx = db.begin_transaction().await;
3318 {
3319 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX).0;
3320
3321 alt_module_dbtx
3322 .insert_entry(&TestKey(100), &TestVal(103))
3323 .await;
3324
3325 alt_module_dbtx
3326 .insert_entry(&TestKey(101), &TestVal(104))
3327 .await;
3328 }
3329
3330 alt_dbtx.commit_tx().await;
3331
3332 let mut test_dbtx = db.begin_transaction().await;
3334 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX).0;
3335 assert_eq!(
3336 test_module_dbtx.get_value(&TestKey(100)).await,
3337 Some(TestVal(101))
3338 );
3339
3340 assert_eq!(
3341 test_module_dbtx.get_value(&TestKey(101)).await,
3342 Some(TestVal(102))
3343 );
3344
3345 let expected_keys = 2;
3346 let returned_keys = test_module_dbtx
3347 .find_by_prefix(&DbPrefixTestPrefix)
3348 .await
3349 .fold(0, |returned_keys, (key, value)| async move {
3350 match key {
3351 TestKey(100) => {
3352 assert!(value.eq(&TestVal(101)));
3353 }
3354 TestKey(101) => {
3355 assert!(value.eq(&TestVal(102)));
3356 }
3357 _ => {}
3358 }
3359 returned_keys + 1
3360 })
3361 .await;
3362
3363 assert_eq!(returned_keys, expected_keys);
3364
3365 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
3366 assert_eq!(removed, Some(TestVal(101)));
3367 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
3368
3369 let mut test_dbtx = db.begin_transaction().await;
3372 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
3373
3374 test_dbtx.commit_tx().await;
3375 }
3376
3377 #[cfg(test)]
3378 #[tokio::test]
3379 pub async fn verify_test_migration() {
3380 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
3382 let expected_test_keys_size: usize = 100;
3383 let mut dbtx = db.begin_transaction().await;
3384 for i in 0..expected_test_keys_size {
3385 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
3386 .await;
3387 }
3388
3389 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
3391 .await;
3392 dbtx.commit_tx().await;
3393
3394 let mut migrations: BTreeMap<DatabaseVersion, DbMigrationFn<()>> = BTreeMap::new();
3395
3396 migrations.insert(
3397 DatabaseVersion(0),
3398 Box::new(|ctx| migrate_test_db_version_0(ctx).boxed()),
3399 );
3400
3401 apply_migrations(&db, (), "TestModule".to_string(), migrations, None, None)
3402 .await
3403 .expect("Error applying migrations for TestModule");
3404
3405 let mut dbtx = db.begin_transaction().await;
3407
3408 assert!(
3411 dbtx.get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
3412 .await
3413 .is_some()
3414 );
3415
3416 let test_keys = dbtx
3418 .find_by_prefix(&DbPrefixTestPrefix)
3419 .await
3420 .collect::<Vec<_>>()
3421 .await;
3422 let test_keys_size = test_keys.len();
3423 assert_eq!(test_keys_size, expected_test_keys_size);
3424 for (key, val) in test_keys {
3425 assert_eq!(key.0, val.0 + 1);
3426 }
3427 }
3428
3429 #[allow(dead_code)]
3430 async fn migrate_test_db_version_0(
3431 mut ctx: DbMigrationFnContext<'_, ()>,
3432 ) -> std::result::Result<(), anyhow::Error> {
3433 let mut dbtx = ctx.dbtx();
3434 let example_keys_v0 = dbtx
3435 .find_by_prefix(&DbPrefixTestPrefixV0)
3436 .await
3437 .collect::<Vec<_>>()
3438 .await;
3439 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
3440 for (key, val) in example_keys_v0 {
3441 let key_v2 = TestKey(key.1);
3442 dbtx.insert_new_entry(&key_v2, &val).await;
3443 }
3444 Ok(())
3445 }
3446
3447 #[cfg(test)]
3448 #[tokio::test]
3449 async fn test_autocommit() {
3450 use std::marker::PhantomData;
3451 use std::ops::Range;
3452 use std::path::Path;
3453
3454 use anyhow::anyhow;
3455 use async_trait::async_trait;
3456
3457 use crate::ModuleDecoderRegistry;
3458 use crate::db::{
3459 AutocommitError, BaseDatabaseTransaction, DatabaseError, DatabaseResult,
3460 IDatabaseTransaction, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
3461 IRawDatabase, IRawDatabaseTransaction,
3462 };
3463
3464 #[derive(Debug)]
3465 struct FakeDatabase;
3466
3467 #[async_trait]
3468 impl IRawDatabase for FakeDatabase {
3469 type Transaction<'a> = FakeTransaction<'a>;
3470 async fn begin_transaction(&self) -> FakeTransaction {
3471 FakeTransaction(PhantomData)
3472 }
3473
3474 fn checkpoint(&self, _backup_path: &Path) -> DatabaseResult<()> {
3475 Ok(())
3476 }
3477 }
3478
3479 #[derive(Debug)]
3480 struct FakeTransaction<'a>(PhantomData<&'a ()>);
3481
3482 #[async_trait]
3483 impl IDatabaseTransactionOpsCore for FakeTransaction<'_> {
3484 async fn raw_insert_bytes(
3485 &mut self,
3486 _key: &[u8],
3487 _value: &[u8],
3488 ) -> DatabaseResult<Option<Vec<u8>>> {
3489 unimplemented!()
3490 }
3491
3492 async fn raw_get_bytes(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3493 unimplemented!()
3494 }
3495
3496 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
3497 unimplemented!()
3498 }
3499
3500 async fn raw_find_by_range(
3501 &mut self,
3502 _key_range: Range<&[u8]>,
3503 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3504 unimplemented!()
3505 }
3506
3507 async fn raw_find_by_prefix(
3508 &mut self,
3509 _key_prefix: &[u8],
3510 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3511 unimplemented!()
3512 }
3513
3514 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
3515 unimplemented!()
3516 }
3517
3518 async fn raw_find_by_prefix_sorted_descending(
3519 &mut self,
3520 _key_prefix: &[u8],
3521 ) -> DatabaseResult<crate::db::PrefixStream<'_>> {
3522 unimplemented!()
3523 }
3524 }
3525
3526 #[async_trait]
3527 impl IDatabaseTransactionOps for FakeTransaction<'_> {
3528 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
3529 unimplemented!()
3530 }
3531
3532 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
3533 unimplemented!()
3534 }
3535 }
3536
3537 #[async_trait]
3538 impl IRawDatabaseTransaction for FakeTransaction<'_> {
3539 async fn commit_tx(self) -> DatabaseResult<()> {
3540 use crate::db::DatabaseError;
3541
3542 Err(DatabaseError::Other(anyhow::anyhow!("Can't commit!")))
3543 }
3544 }
3545
3546 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
3547 let err = db
3548 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
3549 .await
3550 .unwrap_err();
3551
3552 match err {
3553 AutocommitError::CommitFailed {
3554 attempts: failed_attempts,
3555 ..
3556 } => {
3557 assert_eq!(failed_attempts, 5);
3558 }
3559 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
3560 }
3561 }
3562}
3563
3564pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
3565 tx: &'r mut (dyn IDatabaseTransaction + 'inner),
3566 decoders: ModuleDecoderRegistry,
3567 key_prefix: &KP,
3568) -> impl Stream<
3569 Item = (
3570 KP::Record,
3571 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
3572 ),
3573>
3574+ 'r
3575+ use<'r, KP>
3576where
3577 'inner: 'r,
3578 KP: DatabaseLookup,
3579 KP::Record: DatabaseKey,
3580{
3581 debug!(target: LOG_DB, "find by prefix sorted descending");
3582 let prefix_bytes = key_prefix.to_bytes();
3583 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
3584 .await
3585 .expect("Error doing prefix search in database")
3586 .map(move |(key_bytes, value_bytes)| {
3587 let key = decode_key_expect(&key_bytes, &decoders);
3588 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
3589 (key, value)
3590 })
3591}
3592
3593pub async fn verify_module_db_integrity_dbtx(
3594 dbtx: &mut DatabaseTransaction<'_>,
3595 module_id: ModuleInstanceId,
3596 module_kind: ModuleKind,
3597 prefixes: &BTreeSet<u8>,
3598) {
3599 let module_db_prefix = module_instance_id_to_byte_prefix(module_id);
3600 if module_id < 250 {
3601 assert_eq!(module_db_prefix.len(), 2);
3602 }
3603 let mut records = dbtx
3604 .raw_find_by_prefix(&module_db_prefix)
3605 .await
3606 .expect("DB fail");
3607 while let Some((k, v)) = records.next().await {
3608 assert!(
3609 prefixes.contains(&k[module_db_prefix.len()]),
3610 "Unexpected module {module_kind} {module_id} db record found: {}: {}",
3611 k.as_hex(),
3612 v.as_hex()
3613 );
3614 }
3615}
3616
3617#[cfg(test)]
3618mod tests;