1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17 IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34 I: Iterator + Send + 'i,
35 I::Item: Send,
36{
37 stream::unfold(iter, |mut iter| async {
38 fedimint_core::runtime::block_in_place(|| {
39 let item = iter.next();
40 item.map(|item| (item, iter))
41 })
42 })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52 #[builder(start_fn = build)]
54 #[builder(finish_fn = open_blocking)]
55 pub fn open_blocking(
56 #[builder(start_fn)] db_path: impl AsRef<Path>,
57 relaxed_consistency: Option<bool>,
60 ) -> anyhow::Result<Locked<RocksDb>> {
61 let db_path = db_path.as_ref();
62
63 block_in_place(|| {
64 std::fs::create_dir_all(
65 db_path
66 .parent()
67 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68 )?;
69 LockedBuilder::new(db_path)?.with_db(|| {
70 Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71 })
72 })
73 }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78 S: rocks_db_open_blocking_builder::State,
79 I1: std::convert::AsRef<std::path::Path>,
80{
81 #[allow(clippy::unused_async)]
83 pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84 block_in_place(|| self.open_blocking())
85 }
86}
87
88impl RocksDb {
89 fn open_blocking_unlocked(
90 db_path: &Path,
91 relaxed_consistency: bool,
92 ) -> anyhow::Result<RocksDb> {
93 let mut opts = get_default_options()?;
94 if relaxed_consistency {
95 opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97 } else {
98 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101 }
102 let db: rocksdb::OptimisticTransactionDB =
103 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104 Ok(RocksDb(db))
105 }
106
107 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108 &self.0
109 }
110}
111
112fn is_power_of_two(num: usize) -> bool {
114 num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.write_str("RocksDbTransaction")
120 }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.write_str("RocksDbTransaction")
126 }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131 assert!(!is_power_of_two(0));
132 assert!(is_power_of_two(1));
133 assert!(is_power_of_two(2));
134 assert!(!is_power_of_two(3));
135 assert!(is_power_of_two(4));
136 assert!(!is_power_of_two(5));
137 assert!(is_power_of_two(2 << 10));
138 assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141fn get_default_options() -> anyhow::Result<rocksdb::Options> {
142 let mut opts = rocksdb::Options::default();
143 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
144 debug!(var, "Using custom write buffer size");
145 let size: usize = FromStr::from_str(&var)
146 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
147 if !is_power_of_two(size) {
148 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
149 }
150 opts.set_write_buffer_size(size);
151 }
152 opts.create_if_missing(true);
153 Ok(opts)
154}
155
156#[derive(Debug)]
157pub struct RocksDbReadOnly(rocksdb::DB);
158
159pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
160
161impl RocksDbReadOnly {
162 #[allow(clippy::unused_async)]
163 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
164 let db_path = db_path.as_ref();
165 block_in_place(|| Self::open_read_only_blocking(db_path))
166 }
167
168 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
169 let opts = get_default_options()?;
170 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
172 Ok(RocksDbReadOnly(db))
173 }
174}
175
176impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
177 fn from(db: OptimisticTransactionDB) -> Self {
178 RocksDb(db)
179 }
180}
181
182impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
183 fn from(db: RocksDb) -> Self {
184 db.0
185 }
186}
187
188fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
194 let mut next_prefix = prefix.to_vec();
195 let mut is_last_prefix = true;
196 for i in (0..next_prefix.len()).rev() {
197 next_prefix[i] = next_prefix[i].wrapping_add(1);
198 if next_prefix[i] > 0 {
199 is_last_prefix = false;
200 break;
201 }
202 }
203 if is_last_prefix {
204 None
207 } else {
208 Some(next_prefix)
209 }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDb {
214 type Transaction<'a> = RocksDbTransaction<'a>;
215 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
216 let mut optimistic_options = OptimisticTransactionOptions::default();
217 optimistic_options.set_snapshot(true);
218
219 let mut write_options = WriteOptions::default();
220 write_options.set_sync(true);
222
223 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
224 }
225
226 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
227 let checkpoint =
228 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
229 checkpoint
230 .create_checkpoint(backup_path)
231 .map_err(DatabaseError::backend)?;
232 Ok(())
233 }
234}
235
236#[async_trait]
237impl IRawDatabase for RocksDbReadOnly {
238 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
239 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
240 RocksDbReadOnlyTransaction(&self.0)
241 }
242
243 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
244 let checkpoint =
245 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
246 checkpoint
247 .create_checkpoint(backup_path)
248 .map_err(DatabaseError::backend)?;
249 Ok(())
250 }
251}
252
253#[async_trait]
254impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
255 async fn raw_insert_bytes(
256 &mut self,
257 key: &[u8],
258 value: &[u8],
259 ) -> DatabaseResult<Option<Vec<u8>>> {
260 fedimint_core::runtime::block_in_place(|| {
261 let val = self.0.snapshot().get(key).unwrap();
262 self.0.put(key, value).map_err(DatabaseError::backend)?;
263 Ok(val)
264 })
265 }
266
267 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
268 fedimint_core::runtime::block_in_place(|| {
269 self.0.snapshot().get(key).map_err(DatabaseError::backend)
270 })
271 }
272
273 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
274 fedimint_core::runtime::block_in_place(|| {
275 let val = self.0.snapshot().get(key).unwrap();
276 self.0.delete(key).map_err(DatabaseError::backend)?;
277 Ok(val)
278 })
279 }
280
281 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
282 Ok(fedimint_core::runtime::block_in_place(|| {
283 let prefix = key_prefix.to_vec();
284 let mut options = rocksdb::ReadOptions::default();
285 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
286 let iter = self.0.snapshot().iterator_opt(
287 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
288 options,
289 );
290 let rocksdb_iter = iter.map_while(move |res| {
291 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
292 key_bytes
293 .starts_with(&prefix)
294 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
295 });
296 Box::pin(convert_to_async_stream(rocksdb_iter))
297 }))
298 }
299
300 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
301 Ok(fedimint_core::runtime::block_in_place(|| {
302 let range = Range {
303 start: range.start.to_vec(),
304 end: range.end.to_vec(),
305 };
306 let mut options = rocksdb::ReadOptions::default();
307 options.set_iterate_range(range.clone());
308 let iter = self.0.snapshot().iterator_opt(
309 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
310 options,
311 );
312 let rocksdb_iter = iter.map_while(move |res| {
313 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
314 (key_bytes.as_ref() < range.end.as_slice())
315 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
316 });
317 Box::pin(convert_to_async_stream(rocksdb_iter))
318 }))
319 }
320
321 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
322 fedimint_core::runtime::block_in_place(|| {
323 let mut options = rocksdb::ReadOptions::default();
325 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
326 let iter = self
327 .0
328 .snapshot()
329 .iterator_opt(
330 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
331 options,
332 )
333 .map_while(|res| {
334 res.map(|(key_bytes, _)| {
335 key_bytes
336 .starts_with(key_prefix)
337 .then_some(key_bytes.to_vec())
338 })
339 .transpose()
340 });
341
342 for item in iter {
343 let key = item.map_err(DatabaseError::backend)?;
344 self.0.delete(key).map_err(DatabaseError::backend)?;
345 }
346
347 Ok(())
348 })
349 }
350
351 async fn raw_find_by_prefix_sorted_descending(
352 &mut self,
353 key_prefix: &[u8],
354 ) -> DatabaseResult<PrefixStream<'_>> {
355 let prefix = key_prefix.to_vec();
356 let next_prefix = next_prefix(&prefix);
357 let iterator_mode = if let Some(next_prefix) = &next_prefix {
358 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
359 } else {
360 rocksdb::IteratorMode::End
361 };
362 Ok(fedimint_core::runtime::block_in_place(|| {
363 let mut options = rocksdb::ReadOptions::default();
364 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
365 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
366 let rocksdb_iter = iter.map_while(move |res| {
367 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
368 key_bytes
369 .starts_with(&prefix)
370 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
371 });
372 Box::pin(convert_to_async_stream(rocksdb_iter))
373 }))
374 }
375}
376
377impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
378
379#[async_trait]
380impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
381 async fn commit_tx(self) -> DatabaseResult<()> {
382 fedimint_core::runtime::block_in_place(|| {
383 match self.0.commit() {
384 Ok(()) => Ok(()),
385 Err(err) => {
386 match err.kind() {
391 rocksdb::ErrorKind::Busy
392 | rocksdb::ErrorKind::TryAgain
393 | rocksdb::ErrorKind::MergeInProgress
394 | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
395 _ => Err(DatabaseError::backend(err)),
396 }
397 }
398 }
399 })
400 }
401}
402
403#[async_trait]
404impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
405 async fn raw_insert_bytes(
406 &mut self,
407 _key: &[u8],
408 _value: &[u8],
409 ) -> DatabaseResult<Option<Vec<u8>>> {
410 panic!("Cannot insert into a read only transaction");
411 }
412
413 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
414 fedimint_core::runtime::block_in_place(|| {
415 self.0.snapshot().get(key).map_err(DatabaseError::backend)
416 })
417 }
418
419 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
420 panic!("Cannot remove from a read only transaction");
421 }
422
423 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
424 Ok(fedimint_core::runtime::block_in_place(|| {
425 let range = Range {
426 start: range.start.to_vec(),
427 end: range.end.to_vec(),
428 };
429 let mut options = rocksdb::ReadOptions::default();
430 options.set_iterate_range(range.clone());
431 let iter = self.0.snapshot().iterator_opt(
432 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
433 options,
434 );
435 let rocksdb_iter = iter.map_while(move |res| {
436 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
437 (key_bytes.as_ref() < range.end.as_slice())
438 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
439 });
440 Box::pin(convert_to_async_stream(rocksdb_iter))
441 }))
442 }
443
444 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
445 Ok(fedimint_core::runtime::block_in_place(|| {
446 let prefix = key_prefix.to_vec();
447 let mut options = rocksdb::ReadOptions::default();
448 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
449 let iter = self.0.snapshot().iterator_opt(
450 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
451 options,
452 );
453 let rocksdb_iter = iter.map_while(move |res| {
454 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
455 key_bytes
456 .starts_with(&prefix)
457 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
458 });
459 Box::pin(convert_to_async_stream(rocksdb_iter))
460 }))
461 }
462
463 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
464 panic!("Cannot remove from a read only transaction");
465 }
466
467 async fn raw_find_by_prefix_sorted_descending(
468 &mut self,
469 key_prefix: &[u8],
470 ) -> DatabaseResult<PrefixStream<'_>> {
471 let prefix = key_prefix.to_vec();
472 let next_prefix = next_prefix(&prefix);
473 let iterator_mode = if let Some(next_prefix) = &next_prefix {
474 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
475 } else {
476 rocksdb::IteratorMode::End
477 };
478 Ok(fedimint_core::runtime::block_in_place(|| {
479 let mut options = rocksdb::ReadOptions::default();
480 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
481 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
482 let rocksdb_iter = iter.map_while(move |res| {
483 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
484 key_bytes
485 .starts_with(&prefix)
486 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
487 });
488 Box::pin(stream::iter(rocksdb_iter))
489 }))
490 }
491}
492
493impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
494
495#[async_trait]
496impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
497 async fn commit_tx(self) -> DatabaseResult<()> {
498 panic!("Cannot commit a read only transaction");
499 }
500}
501
502#[cfg(test)]
503mod fedimint_rocksdb_tests {
504 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
505 use fedimint_core::encoding::{Decodable, Encodable};
506 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
507 use fedimint_core::{impl_db_lookup, impl_db_record};
508 use futures::StreamExt;
509
510 use super::*;
511
512 fn open_temp_db(temp_path: &str) -> Database {
513 let path = tempfile::Builder::new()
514 .prefix(temp_path)
515 .tempdir()
516 .unwrap();
517
518 Database::new(
519 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
520 ModuleDecoderRegistry::default(),
521 )
522 }
523
524 #[tokio::test(flavor = "multi_thread")]
525 async fn test_dbtx_insert_elements() {
526 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
527 .await;
528 }
529
530 #[tokio::test(flavor = "multi_thread")]
531 async fn test_dbtx_remove_nonexisting() {
532 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
533 "fcb-rocksdb-test-remove-nonexisting",
534 ))
535 .await;
536 }
537
538 #[tokio::test(flavor = "multi_thread")]
539 async fn test_dbtx_remove_existing() {
540 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
541 .await;
542 }
543
544 #[tokio::test(flavor = "multi_thread")]
545 async fn test_dbtx_read_own_writes() {
546 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
547 .await;
548 }
549
550 #[tokio::test(flavor = "multi_thread")]
551 async fn test_dbtx_prevent_dirty_reads() {
552 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
553 "fcb-rocksdb-test-prevent-dirty-reads",
554 ))
555 .await;
556 }
557
558 #[tokio::test(flavor = "multi_thread")]
559 async fn test_dbtx_find_by_range() {
560 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
561 .await;
562 }
563
564 #[tokio::test(flavor = "multi_thread")]
565 async fn test_dbtx_find_by_prefix() {
566 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
567 .await;
568 }
569
570 #[tokio::test(flavor = "multi_thread")]
571 async fn test_dbtx_commit() {
572 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
573 }
574
575 #[tokio::test(flavor = "multi_thread")]
576 async fn test_dbtx_prevent_nonrepeatable_reads() {
577 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
578 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
579 ))
580 .await;
581 }
582
583 #[tokio::test(flavor = "multi_thread")]
584 async fn test_dbtx_snapshot_isolation() {
585 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
586 "fcb-rocksdb-test-snapshot-isolation",
587 ))
588 .await;
589 }
590
591 #[tokio::test(flavor = "multi_thread")]
592 async fn test_dbtx_phantom_entry() {
593 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
594 .await;
595 }
596
597 #[tokio::test(flavor = "multi_thread")]
598 async fn test_dbtx_write_conflict() {
599 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
600 .await;
601 }
602
603 #[tokio::test(flavor = "multi_thread")]
606 async fn test_concurrent_transaction_conflict_with_autocommit() {
607 use std::sync::Arc;
608
609 let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
610
611 let mut handles = Vec::new();
614
615 for i in 0u64..10 {
616 let db_clone = Arc::clone(&db);
617 let handle =
618 fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
619 for j in 0u64..10 {
620 let result = db_clone
622 .autocommit::<_, _, anyhow::Error>(
623 |dbtx, _| {
624 #[allow(clippy::cast_possible_truncation)]
625 let val = (i * 100 + j) as u8;
626 Box::pin(async move {
627 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
629 .await;
630 Ok(())
631 })
632 },
633 None, )
635 .await;
636
637 assert!(
639 result.is_ok(),
640 "Transaction should succeed after retries, got: {result:?}",
641 );
642 }
643 });
644 handles.push(handle);
645 }
646
647 for handle in handles {
649 handle.await.expect("Task should not panic");
650 }
651 }
652
653 #[tokio::test(flavor = "multi_thread")]
654 async fn test_dbtx_remove_by_prefix() {
655 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
656 "fcb-rocksdb-test-remove-by-prefix",
657 ))
658 .await;
659 }
660
661 #[tokio::test(flavor = "multi_thread")]
662 async fn test_module_dbtx() {
663 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
664 .await;
665 }
666
667 #[tokio::test(flavor = "multi_thread")]
668 async fn test_module_db() {
669 let module_instance_id = 1;
670 let path = tempfile::Builder::new()
671 .prefix("fcb-rocksdb-test-module-db-prefix")
672 .tempdir()
673 .unwrap();
674
675 let module_db = Database::new(
676 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
677 ModuleDecoderRegistry::default(),
678 );
679
680 fedimint_core::db::verify_module_db(
681 open_temp_db("fcb-rocksdb-test-module-db"),
682 module_db.with_prefix_module_id(module_instance_id).0,
683 )
684 .await;
685 }
686
687 #[test]
688 fn test_next_prefix() {
689 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
692 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
693 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
694 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
695 assert!(next_prefix(&[255, 255, 255]).is_none());
697 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
699 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
700 assert!(next_prefix(&[255]).is_none()); }
702
703 #[repr(u8)]
704 #[derive(Clone)]
705 pub enum TestDbKeyPrefix {
706 Test = 254,
707 MaxTest = 255,
708 }
709
710 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
711 pub(super) struct TestKey(pub Vec<u8>);
712
713 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
714 pub(super) struct TestVal(pub Vec<u8>);
715
716 #[derive(Debug, Encodable, Decodable)]
717 struct DbPrefixTestPrefix;
718
719 impl_db_record!(
720 key = TestKey,
721 value = TestVal,
722 db_prefix = TestDbKeyPrefix::Test,
723 notify_on_modify = true,
724 );
725 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
726
727 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
728 pub(super) struct TestKey2(pub Vec<u8>);
729
730 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
731 pub(super) struct TestVal2(pub Vec<u8>);
732
733 #[derive(Debug, Encodable, Decodable)]
734 struct DbPrefixTestPrefixMax;
735
736 impl_db_record!(
737 key = TestKey2,
738 value = TestVal2,
739 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
741 );
742 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
743
744 #[tokio::test(flavor = "multi_thread")]
745 async fn test_retrieve_descending_order() {
746 let path = tempfile::Builder::new()
747 .prefix("fcb-rocksdb-test-descending-order")
748 .tempdir()
749 .unwrap();
750 {
751 let db = Database::new(
752 RocksDb::build(&path).open().await.unwrap(),
753 ModuleDecoderRegistry::default(),
754 );
755 let mut dbtx = db.begin_transaction().await;
756 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
757 .await;
758 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
759 .await;
760 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
761 .await;
762 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
763 .await;
764 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
765 .await;
766 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
767 .await;
768 let query = dbtx
769 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
770 .await
771 .collect::<Vec<_>>()
772 .await;
773 assert_eq!(
774 query,
775 vec![
776 (TestKey(vec![255]), TestVal(vec![2])),
777 (TestKey(vec![254]), TestVal(vec![1])),
778 (TestKey(vec![0]), TestVal(vec![3]))
779 ]
780 );
781 let query = dbtx
782 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
783 .await
784 .collect::<Vec<_>>()
785 .await;
786 assert_eq!(
787 query,
788 vec![
789 (TestKey2(vec![255]), TestVal2(vec![2])),
790 (TestKey2(vec![254]), TestVal2(vec![1])),
791 (TestKey2(vec![0]), TestVal2(vec![3]))
792 ]
793 );
794 dbtx.commit_tx().await;
795 }
796 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
798 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
799 let mut dbtx = db_readonly.begin_transaction_nc().await;
800 let query = dbtx
801 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
802 .await
803 .collect::<Vec<_>>()
804 .await;
805 assert_eq!(
806 query,
807 vec![
808 (TestKey(vec![255]), TestVal(vec![2])),
809 (TestKey(vec![254]), TestVal(vec![1])),
810 (TestKey(vec![0]), TestVal(vec![3]))
811 ]
812 );
813 let query = dbtx
814 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
815 .await
816 .collect::<Vec<_>>()
817 .await;
818 assert_eq!(
819 query,
820 vec![
821 (TestKey2(vec![255]), TestVal2(vec![2])),
822 (TestKey2(vec![254]), TestVal2(vec![1])),
823 (TestKey2(vec![0]), TestVal2(vec![3]))
824 ]
825 );
826 }
827}