1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17 IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::{FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV, FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV};
29
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34 I: Iterator + Send + 'i,
35 I::Item: Send,
36{
37 stream::unfold(iter, |mut iter| async {
38 fedimint_core::runtime::block_in_place(|| {
39 let item = iter.next();
40 item.map(|item| (item, iter))
41 })
42 })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52 #[builder(start_fn = build)]
54 #[builder(finish_fn = open_blocking)]
55 pub fn open_blocking(
56 #[builder(start_fn)] db_path: impl AsRef<Path>,
57 ) -> anyhow::Result<Locked<RocksDb>> {
58 let db_path = db_path.as_ref();
59
60 block_in_place(|| {
61 std::fs::create_dir_all(
62 db_path
63 .parent()
64 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
65 )?;
66 LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
67 })
68 }
69}
70
71impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
72where
73 S: rocks_db_open_blocking_builder::State,
74 I1: std::convert::AsRef<std::path::Path>,
75{
76 #[allow(clippy::unused_async)]
78 pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
79 block_in_place(|| self.open_blocking())
80 }
81}
82
83impl RocksDb {
84 fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
85 let mut opts = get_default_options()?;
86 opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
94 let db: rocksdb::OptimisticTransactionDB =
95 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
96 Ok(RocksDb(db))
97 }
98
99 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
100 &self.0
101 }
102}
103
104fn is_power_of_two(num: usize) -> bool {
106 num.is_power_of_two()
107}
108
109impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 f.write_str("RocksDbTransaction")
112 }
113}
114
115impl fmt::Debug for RocksDbTransaction<'_> {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.write_str("RocksDbTransaction")
118 }
119}
120
121#[test]
122fn is_power_of_two_sanity() {
123 assert!(!is_power_of_two(0));
124 assert!(is_power_of_two(1));
125 assert!(is_power_of_two(2));
126 assert!(!is_power_of_two(3));
127 assert!(is_power_of_two(4));
128 assert!(!is_power_of_two(5));
129 assert!(is_power_of_two(2 << 10));
130 assert!(!is_power_of_two((2 << 10) + 1));
131}
132
133const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
135
136const DEFAULT_BLOCK_CACHE_SIZE: usize = 2 * 1024 * 1024;
141
142const DEFAULT_MAX_OPEN_FILES: i32 = 256;
145
146fn parse_env_size(env_name: &str) -> anyhow::Result<Option<usize>> {
147 let Ok(var) = std::env::var(env_name) else {
148 return Ok(None);
149 };
150 let size: usize =
151 FromStr::from_str(&var).with_context(|| format!("Could not parse {env_name}"))?;
152 if !is_power_of_two(size) {
153 bail!("{env_name} is not a power of 2");
154 }
155 Ok(Some(size))
156}
157
158fn get_default_options() -> anyhow::Result<rocksdb::Options> {
159 let mut opts = rocksdb::Options::default();
160
161 let write_buffer_size =
162 parse_env_size(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV)?.unwrap_or(DEFAULT_WRITE_BUFFER_SIZE);
163 opts.set_write_buffer_size(write_buffer_size);
164
165 opts.set_max_write_buffer_number(2);
167
168 let block_cache_size =
169 parse_env_size(FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV)?.unwrap_or(DEFAULT_BLOCK_CACHE_SIZE);
170 let cache = rocksdb::Cache::new_lru_cache(block_cache_size);
171 let mut block_opts = rocksdb::BlockBasedOptions::default();
172 block_opts.set_block_cache(&cache);
173 block_opts.set_cache_index_and_filter_blocks(true);
176 opts.set_block_based_table_factory(&block_opts);
177
178 opts.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
179
180 debug!(
181 write_buffer_size,
182 block_cache_size,
183 max_open_files = DEFAULT_MAX_OPEN_FILES,
184 "RocksDB memory options"
185 );
186
187 opts.create_if_missing(true);
188 Ok(opts)
189}
190
191#[derive(Debug)]
192pub struct RocksDbReadOnly(rocksdb::DB);
193
194pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
195
196impl RocksDbReadOnly {
197 #[allow(clippy::unused_async)]
198 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
199 let db_path = db_path.as_ref();
200 block_in_place(|| Self::open_read_only_blocking(db_path))
201 }
202
203 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
204 let opts = get_default_options()?;
205 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
207 Ok(RocksDbReadOnly(db))
208 }
209}
210
211impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
212 fn from(db: OptimisticTransactionDB) -> Self {
213 RocksDb(db)
214 }
215}
216
217impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
218 fn from(db: RocksDb) -> Self {
219 db.0
220 }
221}
222
223fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
229 let mut next_prefix = prefix.to_vec();
230 let mut is_last_prefix = true;
231 for i in (0..next_prefix.len()).rev() {
232 next_prefix[i] = next_prefix[i].wrapping_add(1);
233 if next_prefix[i] > 0 {
234 is_last_prefix = false;
235 break;
236 }
237 }
238 if is_last_prefix {
239 None
242 } else {
243 Some(next_prefix)
244 }
245}
246
247#[async_trait]
248impl IRawDatabase for RocksDb {
249 type Transaction<'a> = RocksDbTransaction<'a>;
250 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
251 let mut optimistic_options = OptimisticTransactionOptions::default();
252 optimistic_options.set_snapshot(true);
253
254 let mut write_options = WriteOptions::default();
255 write_options.set_sync(true);
257
258 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
259 }
260
261 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
262 let checkpoint =
263 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
264 checkpoint
265 .create_checkpoint(backup_path)
266 .map_err(DatabaseError::backend)?;
267 Ok(())
268 }
269}
270
271#[async_trait]
272impl IRawDatabase for RocksDbReadOnly {
273 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
274 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
275 RocksDbReadOnlyTransaction(&self.0)
276 }
277
278 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
279 let checkpoint =
280 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
281 checkpoint
282 .create_checkpoint(backup_path)
283 .map_err(DatabaseError::backend)?;
284 Ok(())
285 }
286}
287
288#[async_trait]
289impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
290 async fn raw_insert_bytes(
291 &mut self,
292 key: &[u8],
293 value: &[u8],
294 ) -> DatabaseResult<Option<Vec<u8>>> {
295 fedimint_core::runtime::block_in_place(|| {
296 let val = self.0.snapshot().get(key).unwrap();
297 self.0.put(key, value).map_err(DatabaseError::backend)?;
298 Ok(val)
299 })
300 }
301
302 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
303 fedimint_core::runtime::block_in_place(|| {
304 self.0.snapshot().get(key).map_err(DatabaseError::backend)
305 })
306 }
307
308 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
309 fedimint_core::runtime::block_in_place(|| {
310 let val = self.0.snapshot().get(key).unwrap();
311 self.0.delete(key).map_err(DatabaseError::backend)?;
312 Ok(val)
313 })
314 }
315
316 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
317 Ok(fedimint_core::runtime::block_in_place(|| {
318 let prefix = key_prefix.to_vec();
319 let mut options = rocksdb::ReadOptions::default();
320 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
321 let iter = self.0.snapshot().iterator_opt(
322 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
323 options,
324 );
325 let rocksdb_iter = iter.map_while(move |res| {
326 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
327 key_bytes
328 .starts_with(&prefix)
329 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
330 });
331 Box::pin(convert_to_async_stream(rocksdb_iter))
332 }))
333 }
334
335 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
336 Ok(fedimint_core::runtime::block_in_place(|| {
337 let range = Range {
338 start: range.start.to_vec(),
339 end: range.end.to_vec(),
340 };
341 let mut options = rocksdb::ReadOptions::default();
342 options.set_iterate_range(range.clone());
343 let iter = self.0.snapshot().iterator_opt(
344 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
345 options,
346 );
347 let rocksdb_iter = iter.map_while(move |res| {
348 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
349 (key_bytes.as_ref() < range.end.as_slice())
350 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
351 });
352 Box::pin(convert_to_async_stream(rocksdb_iter))
353 }))
354 }
355
356 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
357 fedimint_core::runtime::block_in_place(|| {
358 let mut options = rocksdb::ReadOptions::default();
360 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
361 let iter = self
362 .0
363 .snapshot()
364 .iterator_opt(
365 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
366 options,
367 )
368 .map_while(|res| {
369 res.map(|(key_bytes, _)| {
370 key_bytes
371 .starts_with(key_prefix)
372 .then_some(key_bytes.to_vec())
373 })
374 .transpose()
375 });
376
377 for item in iter {
378 let key = item.map_err(DatabaseError::backend)?;
379 self.0.delete(key).map_err(DatabaseError::backend)?;
380 }
381
382 Ok(())
383 })
384 }
385
386 async fn raw_find_by_prefix_sorted_descending(
387 &mut self,
388 key_prefix: &[u8],
389 ) -> DatabaseResult<PrefixStream<'_>> {
390 let prefix = key_prefix.to_vec();
391 let next_prefix = next_prefix(&prefix);
392 let iterator_mode = if let Some(next_prefix) = &next_prefix {
393 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
394 } else {
395 rocksdb::IteratorMode::End
396 };
397 Ok(fedimint_core::runtime::block_in_place(|| {
398 let mut options = rocksdb::ReadOptions::default();
399 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
400 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
401 let rocksdb_iter = iter.map_while(move |res| {
402 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
403 key_bytes
404 .starts_with(&prefix)
405 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
406 });
407 Box::pin(convert_to_async_stream(rocksdb_iter))
408 }))
409 }
410}
411
412impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
413
414#[async_trait]
415impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
416 async fn commit_tx(self) -> DatabaseResult<()> {
417 fedimint_core::runtime::block_in_place(|| {
418 match self.0.commit() {
419 Ok(()) => Ok(()),
420 Err(err) => {
421 match err.kind() {
426 rocksdb::ErrorKind::Busy
427 | rocksdb::ErrorKind::TryAgain
428 | rocksdb::ErrorKind::MergeInProgress
429 | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
430 _ => Err(DatabaseError::backend(err)),
431 }
432 }
433 }
434 })
435 }
436}
437
438#[async_trait]
439impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
440 async fn raw_insert_bytes(
441 &mut self,
442 _key: &[u8],
443 _value: &[u8],
444 ) -> DatabaseResult<Option<Vec<u8>>> {
445 panic!("Cannot insert into a read only transaction");
446 }
447
448 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
449 fedimint_core::runtime::block_in_place(|| {
450 self.0.snapshot().get(key).map_err(DatabaseError::backend)
451 })
452 }
453
454 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
455 panic!("Cannot remove from a read only transaction");
456 }
457
458 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
459 Ok(fedimint_core::runtime::block_in_place(|| {
460 let range = Range {
461 start: range.start.to_vec(),
462 end: range.end.to_vec(),
463 };
464 let mut options = rocksdb::ReadOptions::default();
465 options.set_iterate_range(range.clone());
466 let iter = self.0.snapshot().iterator_opt(
467 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
468 options,
469 );
470 let rocksdb_iter = iter.map_while(move |res| {
471 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
472 (key_bytes.as_ref() < range.end.as_slice())
473 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
474 });
475 Box::pin(convert_to_async_stream(rocksdb_iter))
476 }))
477 }
478
479 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
480 Ok(fedimint_core::runtime::block_in_place(|| {
481 let prefix = key_prefix.to_vec();
482 let mut options = rocksdb::ReadOptions::default();
483 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
484 let iter = self.0.snapshot().iterator_opt(
485 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
486 options,
487 );
488 let rocksdb_iter = iter.map_while(move |res| {
489 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
490 key_bytes
491 .starts_with(&prefix)
492 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
493 });
494 Box::pin(convert_to_async_stream(rocksdb_iter))
495 }))
496 }
497
498 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
499 panic!("Cannot remove from a read only transaction");
500 }
501
502 async fn raw_find_by_prefix_sorted_descending(
503 &mut self,
504 key_prefix: &[u8],
505 ) -> DatabaseResult<PrefixStream<'_>> {
506 let prefix = key_prefix.to_vec();
507 let next_prefix = next_prefix(&prefix);
508 let iterator_mode = if let Some(next_prefix) = &next_prefix {
509 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
510 } else {
511 rocksdb::IteratorMode::End
512 };
513 Ok(fedimint_core::runtime::block_in_place(|| {
514 let mut options = rocksdb::ReadOptions::default();
515 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
516 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
517 let rocksdb_iter = iter.map_while(move |res| {
518 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
519 key_bytes
520 .starts_with(&prefix)
521 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
522 });
523 Box::pin(stream::iter(rocksdb_iter))
524 }))
525 }
526}
527
528impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
529
530#[async_trait]
531impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
532 async fn commit_tx(self) -> DatabaseResult<()> {
533 panic!("Cannot commit a read only transaction");
534 }
535}
536
537#[cfg(test)]
538mod fedimint_rocksdb_tests {
539 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
540 use fedimint_core::encoding::{Decodable, Encodable};
541 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
542 use fedimint_core::{impl_db_lookup, impl_db_record};
543 use futures::StreamExt;
544
545 use super::*;
546
547 fn open_temp_db(temp_path: &str) -> Database {
548 let path = tempfile::Builder::new()
549 .prefix(temp_path)
550 .tempdir()
551 .unwrap();
552
553 Database::new(
554 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
555 ModuleDecoderRegistry::default(),
556 )
557 }
558
559 #[tokio::test(flavor = "multi_thread")]
560 async fn test_dbtx_insert_elements() {
561 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
562 .await;
563 }
564
565 #[tokio::test(flavor = "multi_thread")]
566 async fn test_dbtx_remove_nonexisting() {
567 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
568 "fcb-rocksdb-test-remove-nonexisting",
569 ))
570 .await;
571 }
572
573 #[tokio::test(flavor = "multi_thread")]
574 async fn test_dbtx_remove_existing() {
575 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
576 .await;
577 }
578
579 #[tokio::test(flavor = "multi_thread")]
580 async fn test_dbtx_read_own_writes() {
581 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
582 .await;
583 }
584
585 #[tokio::test(flavor = "multi_thread")]
586 async fn test_dbtx_prevent_dirty_reads() {
587 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
588 "fcb-rocksdb-test-prevent-dirty-reads",
589 ))
590 .await;
591 }
592
593 #[tokio::test(flavor = "multi_thread")]
594 async fn test_dbtx_find_by_range() {
595 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
596 .await;
597 }
598
599 #[tokio::test(flavor = "multi_thread")]
600 async fn test_dbtx_find_by_prefix() {
601 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
602 .await;
603 }
604
605 #[tokio::test(flavor = "multi_thread")]
606 async fn test_dbtx_commit() {
607 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
608 }
609
610 #[tokio::test(flavor = "multi_thread")]
611 async fn test_dbtx_prevent_nonrepeatable_reads() {
612 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
613 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
614 ))
615 .await;
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn test_dbtx_snapshot_isolation() {
620 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
621 "fcb-rocksdb-test-snapshot-isolation",
622 ))
623 .await;
624 }
625
626 #[tokio::test(flavor = "multi_thread")]
627 async fn test_dbtx_phantom_entry() {
628 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
629 .await;
630 }
631
632 #[tokio::test(flavor = "multi_thread")]
633 async fn test_dbtx_write_conflict() {
634 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
635 .await;
636 }
637
638 #[tokio::test(flavor = "multi_thread")]
641 async fn test_concurrent_transaction_conflict_with_autocommit() {
642 use std::sync::Arc;
643
644 let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
645
646 let mut handles = Vec::new();
649
650 for i in 0u64..10 {
651 let db_clone = Arc::clone(&db);
652 let handle =
653 fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
654 for j in 0u64..10 {
655 let result = db_clone
657 .autocommit::<_, _, anyhow::Error>(
658 |dbtx, _| {
659 #[allow(clippy::cast_possible_truncation)]
660 let val = (i * 100 + j) as u8;
661 Box::pin(async move {
662 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
664 .await;
665 Ok(())
666 })
667 },
668 None, )
670 .await;
671
672 assert!(
674 result.is_ok(),
675 "Transaction should succeed after retries, got: {result:?}",
676 );
677 }
678 });
679 handles.push(handle);
680 }
681
682 for handle in handles {
684 handle.await.expect("Task should not panic");
685 }
686 }
687
688 #[tokio::test(flavor = "multi_thread")]
689 async fn test_dbtx_remove_by_prefix() {
690 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
691 "fcb-rocksdb-test-remove-by-prefix",
692 ))
693 .await;
694 }
695
696 #[tokio::test(flavor = "multi_thread")]
697 async fn test_module_dbtx() {
698 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
699 .await;
700 }
701
702 #[tokio::test(flavor = "multi_thread")]
703 async fn test_module_db() {
704 let module_instance_id = 1;
705 let path = tempfile::Builder::new()
706 .prefix("fcb-rocksdb-test-module-db-prefix")
707 .tempdir()
708 .unwrap();
709
710 let module_db = Database::new(
711 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
712 ModuleDecoderRegistry::default(),
713 );
714
715 fedimint_core::db::verify_module_db(
716 open_temp_db("fcb-rocksdb-test-module-db"),
717 module_db.with_prefix_module_id(module_instance_id).0,
718 )
719 .await;
720 }
721
722 #[test]
723 fn test_next_prefix() {
724 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
727 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
728 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
729 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
730 assert!(next_prefix(&[255, 255, 255]).is_none());
732 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
734 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
735 assert!(next_prefix(&[255]).is_none()); }
737
738 #[repr(u8)]
739 #[derive(Clone)]
740 pub enum TestDbKeyPrefix {
741 Test = 254,
742 MaxTest = 255,
743 }
744
745 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
746 pub(super) struct TestKey(pub Vec<u8>);
747
748 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
749 pub(super) struct TestVal(pub Vec<u8>);
750
751 #[derive(Debug, Encodable, Decodable)]
752 struct DbPrefixTestPrefix;
753
754 impl_db_record!(
755 key = TestKey,
756 value = TestVal,
757 db_prefix = TestDbKeyPrefix::Test,
758 notify_on_modify = true,
759 );
760 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
761
762 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
763 pub(super) struct TestKey2(pub Vec<u8>);
764
765 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
766 pub(super) struct TestVal2(pub Vec<u8>);
767
768 #[derive(Debug, Encodable, Decodable)]
769 struct DbPrefixTestPrefixMax;
770
771 impl_db_record!(
772 key = TestKey2,
773 value = TestVal2,
774 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
776 );
777 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
778
779 #[tokio::test(flavor = "multi_thread")]
780 async fn test_retrieve_descending_order() {
781 let path = tempfile::Builder::new()
782 .prefix("fcb-rocksdb-test-descending-order")
783 .tempdir()
784 .unwrap();
785 {
786 let db = Database::new(
787 RocksDb::build(&path).open().await.unwrap(),
788 ModuleDecoderRegistry::default(),
789 );
790 let mut dbtx = db.begin_transaction().await;
791 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
792 .await;
793 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
794 .await;
795 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
796 .await;
797 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
798 .await;
799 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
800 .await;
801 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
802 .await;
803 let query = dbtx
804 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
805 .await
806 .collect::<Vec<_>>()
807 .await;
808 assert_eq!(
809 query,
810 vec![
811 (TestKey(vec![255]), TestVal(vec![2])),
812 (TestKey(vec![254]), TestVal(vec![1])),
813 (TestKey(vec![0]), TestVal(vec![3]))
814 ]
815 );
816 let query = dbtx
817 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
818 .await
819 .collect::<Vec<_>>()
820 .await;
821 assert_eq!(
822 query,
823 vec![
824 (TestKey2(vec![255]), TestVal2(vec![2])),
825 (TestKey2(vec![254]), TestVal2(vec![1])),
826 (TestKey2(vec![0]), TestVal2(vec![3]))
827 ]
828 );
829 dbtx.commit_tx().await;
830 }
831 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
833 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
834 let mut dbtx = db_readonly.begin_transaction_nc().await;
835 let query = dbtx
836 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
837 .await
838 .collect::<Vec<_>>()
839 .await;
840 assert_eq!(
841 query,
842 vec![
843 (TestKey(vec![255]), TestVal(vec![2])),
844 (TestKey(vec![254]), TestVal(vec![1])),
845 (TestKey(vec![0]), TestVal(vec![3]))
846 ]
847 );
848 let query = dbtx
849 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
850 .await
851 .collect::<Vec<_>>()
852 .await;
853 assert_eq!(
854 query,
855 vec![
856 (TestKey2(vec![255]), TestVal2(vec![2])),
857 (TestKey2(vec![254]), TestVal2(vec![1])),
858 (TestKey2(vec![0]), TestVal2(vec![3]))
859 ]
860 );
861 }
862}