1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17 IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::{FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV, FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV};
29
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34 I: Iterator + Send + 'i,
35 I::Item: Send,
36{
37 stream::unfold(iter, |mut iter| async {
38 fedimint_core::runtime::block_in_place(|| {
39 let item = iter.next();
40 item.map(|item| (item, iter))
41 })
42 })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52 #[builder(start_fn = build)]
54 #[builder(finish_fn = open_blocking)]
55 pub fn open_blocking(
56 #[builder(start_fn)] db_path: impl AsRef<Path>,
57 relaxed_consistency: Option<bool>,
60 ) -> anyhow::Result<Locked<RocksDb>> {
61 let db_path = db_path.as_ref();
62
63 block_in_place(|| {
64 std::fs::create_dir_all(
65 db_path
66 .parent()
67 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68 )?;
69 LockedBuilder::new(db_path)?.with_db(|| {
70 Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71 })
72 })
73 }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78 S: rocks_db_open_blocking_builder::State,
79 I1: std::convert::AsRef<std::path::Path>,
80{
81 #[allow(clippy::unused_async)]
83 pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84 block_in_place(|| self.open_blocking())
85 }
86}
87
88impl RocksDb {
89 fn open_blocking_unlocked(
90 db_path: &Path,
91 relaxed_consistency: bool,
92 ) -> anyhow::Result<RocksDb> {
93 let mut opts = get_default_options()?;
94 if relaxed_consistency {
95 opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97 } else {
98 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101 }
102 let db: rocksdb::OptimisticTransactionDB =
103 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104 Ok(RocksDb(db))
105 }
106
107 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108 &self.0
109 }
110}
111
112fn is_power_of_two(num: usize) -> bool {
114 num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.write_str("RocksDbTransaction")
120 }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.write_str("RocksDbTransaction")
126 }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131 assert!(!is_power_of_two(0));
132 assert!(is_power_of_two(1));
133 assert!(is_power_of_two(2));
134 assert!(!is_power_of_two(3));
135 assert!(is_power_of_two(4));
136 assert!(!is_power_of_two(5));
137 assert!(is_power_of_two(2 << 10));
138 assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
143
144const DEFAULT_BLOCK_CACHE_SIZE: usize = 2 * 1024 * 1024;
149
150const DEFAULT_MAX_OPEN_FILES: i32 = 256;
153
154fn parse_env_size(env_name: &str) -> anyhow::Result<Option<usize>> {
155 let Ok(var) = std::env::var(env_name) else {
156 return Ok(None);
157 };
158 let size: usize =
159 FromStr::from_str(&var).with_context(|| format!("Could not parse {env_name}"))?;
160 if !is_power_of_two(size) {
161 bail!("{env_name} is not a power of 2");
162 }
163 Ok(Some(size))
164}
165
166fn get_default_options() -> anyhow::Result<rocksdb::Options> {
167 let mut opts = rocksdb::Options::default();
168
169 let write_buffer_size =
170 parse_env_size(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV)?.unwrap_or(DEFAULT_WRITE_BUFFER_SIZE);
171 opts.set_write_buffer_size(write_buffer_size);
172
173 opts.set_max_write_buffer_number(2);
175
176 let block_cache_size =
177 parse_env_size(FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV)?.unwrap_or(DEFAULT_BLOCK_CACHE_SIZE);
178 let cache = rocksdb::Cache::new_lru_cache(block_cache_size);
179 let mut block_opts = rocksdb::BlockBasedOptions::default();
180 block_opts.set_block_cache(&cache);
181 block_opts.set_cache_index_and_filter_blocks(true);
184 opts.set_block_based_table_factory(&block_opts);
185
186 opts.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
187
188 debug!(
189 write_buffer_size,
190 block_cache_size,
191 max_open_files = DEFAULT_MAX_OPEN_FILES,
192 "RocksDB memory options"
193 );
194
195 opts.create_if_missing(true);
196 Ok(opts)
197}
198
199#[derive(Debug)]
200pub struct RocksDbReadOnly(rocksdb::DB);
201
202pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
203
204impl RocksDbReadOnly {
205 #[allow(clippy::unused_async)]
206 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
207 let db_path = db_path.as_ref();
208 block_in_place(|| Self::open_read_only_blocking(db_path))
209 }
210
211 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
212 let opts = get_default_options()?;
213 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
215 Ok(RocksDbReadOnly(db))
216 }
217}
218
219impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
220 fn from(db: OptimisticTransactionDB) -> Self {
221 RocksDb(db)
222 }
223}
224
225impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
226 fn from(db: RocksDb) -> Self {
227 db.0
228 }
229}
230
231fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
237 let mut next_prefix = prefix.to_vec();
238 let mut is_last_prefix = true;
239 for i in (0..next_prefix.len()).rev() {
240 next_prefix[i] = next_prefix[i].wrapping_add(1);
241 if next_prefix[i] > 0 {
242 is_last_prefix = false;
243 break;
244 }
245 }
246 if is_last_prefix {
247 None
250 } else {
251 Some(next_prefix)
252 }
253}
254
255#[async_trait]
256impl IRawDatabase for RocksDb {
257 type Transaction<'a> = RocksDbTransaction<'a>;
258 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
259 let mut optimistic_options = OptimisticTransactionOptions::default();
260 optimistic_options.set_snapshot(true);
261
262 let mut write_options = WriteOptions::default();
263 write_options.set_sync(true);
265
266 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
267 }
268
269 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
270 let checkpoint =
271 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
272 checkpoint
273 .create_checkpoint(backup_path)
274 .map_err(DatabaseError::backend)?;
275 Ok(())
276 }
277}
278
279#[async_trait]
280impl IRawDatabase for RocksDbReadOnly {
281 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
282 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
283 RocksDbReadOnlyTransaction(&self.0)
284 }
285
286 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287 let checkpoint =
288 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
289 checkpoint
290 .create_checkpoint(backup_path)
291 .map_err(DatabaseError::backend)?;
292 Ok(())
293 }
294}
295
296#[async_trait]
297impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
298 async fn raw_insert_bytes(
299 &mut self,
300 key: &[u8],
301 value: &[u8],
302 ) -> DatabaseResult<Option<Vec<u8>>> {
303 fedimint_core::runtime::block_in_place(|| {
304 let val = self.0.snapshot().get(key).unwrap();
305 self.0.put(key, value).map_err(DatabaseError::backend)?;
306 Ok(val)
307 })
308 }
309
310 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
311 fedimint_core::runtime::block_in_place(|| {
312 self.0.snapshot().get(key).map_err(DatabaseError::backend)
313 })
314 }
315
316 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
317 fedimint_core::runtime::block_in_place(|| {
318 let val = self.0.snapshot().get(key).unwrap();
319 self.0.delete(key).map_err(DatabaseError::backend)?;
320 Ok(val)
321 })
322 }
323
324 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
325 Ok(fedimint_core::runtime::block_in_place(|| {
326 let prefix = key_prefix.to_vec();
327 let mut options = rocksdb::ReadOptions::default();
328 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
329 let iter = self.0.snapshot().iterator_opt(
330 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
331 options,
332 );
333 let rocksdb_iter = iter.map_while(move |res| {
334 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
335 key_bytes
336 .starts_with(&prefix)
337 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
338 });
339 Box::pin(convert_to_async_stream(rocksdb_iter))
340 }))
341 }
342
343 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
344 Ok(fedimint_core::runtime::block_in_place(|| {
345 let range = Range {
346 start: range.start.to_vec(),
347 end: range.end.to_vec(),
348 };
349 let mut options = rocksdb::ReadOptions::default();
350 options.set_iterate_range(range.clone());
351 let iter = self.0.snapshot().iterator_opt(
352 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
353 options,
354 );
355 let rocksdb_iter = iter.map_while(move |res| {
356 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
357 (key_bytes.as_ref() < range.end.as_slice())
358 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
359 });
360 Box::pin(convert_to_async_stream(rocksdb_iter))
361 }))
362 }
363
364 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
365 fedimint_core::runtime::block_in_place(|| {
366 let mut options = rocksdb::ReadOptions::default();
368 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
369 let iter = self
370 .0
371 .snapshot()
372 .iterator_opt(
373 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
374 options,
375 )
376 .map_while(|res| {
377 res.map(|(key_bytes, _)| {
378 key_bytes
379 .starts_with(key_prefix)
380 .then_some(key_bytes.to_vec())
381 })
382 .transpose()
383 });
384
385 for item in iter {
386 let key = item.map_err(DatabaseError::backend)?;
387 self.0.delete(key).map_err(DatabaseError::backend)?;
388 }
389
390 Ok(())
391 })
392 }
393
394 async fn raw_find_by_prefix_sorted_descending(
395 &mut self,
396 key_prefix: &[u8],
397 ) -> DatabaseResult<PrefixStream<'_>> {
398 let prefix = key_prefix.to_vec();
399 let next_prefix = next_prefix(&prefix);
400 let iterator_mode = if let Some(next_prefix) = &next_prefix {
401 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
402 } else {
403 rocksdb::IteratorMode::End
404 };
405 Ok(fedimint_core::runtime::block_in_place(|| {
406 let mut options = rocksdb::ReadOptions::default();
407 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
408 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
409 let rocksdb_iter = iter.map_while(move |res| {
410 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
411 key_bytes
412 .starts_with(&prefix)
413 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
414 });
415 Box::pin(convert_to_async_stream(rocksdb_iter))
416 }))
417 }
418}
419
420impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
421
422#[async_trait]
423impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
424 async fn commit_tx(self) -> DatabaseResult<()> {
425 fedimint_core::runtime::block_in_place(|| {
426 match self.0.commit() {
427 Ok(()) => Ok(()),
428 Err(err) => {
429 match err.kind() {
434 rocksdb::ErrorKind::Busy
435 | rocksdb::ErrorKind::TryAgain
436 | rocksdb::ErrorKind::MergeInProgress
437 | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
438 _ => Err(DatabaseError::backend(err)),
439 }
440 }
441 }
442 })
443 }
444}
445
446#[async_trait]
447impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
448 async fn raw_insert_bytes(
449 &mut self,
450 _key: &[u8],
451 _value: &[u8],
452 ) -> DatabaseResult<Option<Vec<u8>>> {
453 panic!("Cannot insert into a read only transaction");
454 }
455
456 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
457 fedimint_core::runtime::block_in_place(|| {
458 self.0.snapshot().get(key).map_err(DatabaseError::backend)
459 })
460 }
461
462 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
463 panic!("Cannot remove from a read only transaction");
464 }
465
466 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
467 Ok(fedimint_core::runtime::block_in_place(|| {
468 let range = Range {
469 start: range.start.to_vec(),
470 end: range.end.to_vec(),
471 };
472 let mut options = rocksdb::ReadOptions::default();
473 options.set_iterate_range(range.clone());
474 let iter = self.0.snapshot().iterator_opt(
475 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
476 options,
477 );
478 let rocksdb_iter = iter.map_while(move |res| {
479 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
480 (key_bytes.as_ref() < range.end.as_slice())
481 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
482 });
483 Box::pin(convert_to_async_stream(rocksdb_iter))
484 }))
485 }
486
487 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
488 Ok(fedimint_core::runtime::block_in_place(|| {
489 let prefix = key_prefix.to_vec();
490 let mut options = rocksdb::ReadOptions::default();
491 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
492 let iter = self.0.snapshot().iterator_opt(
493 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
494 options,
495 );
496 let rocksdb_iter = iter.map_while(move |res| {
497 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
498 key_bytes
499 .starts_with(&prefix)
500 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
501 });
502 Box::pin(convert_to_async_stream(rocksdb_iter))
503 }))
504 }
505
506 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
507 panic!("Cannot remove from a read only transaction");
508 }
509
510 async fn raw_find_by_prefix_sorted_descending(
511 &mut self,
512 key_prefix: &[u8],
513 ) -> DatabaseResult<PrefixStream<'_>> {
514 let prefix = key_prefix.to_vec();
515 let next_prefix = next_prefix(&prefix);
516 let iterator_mode = if let Some(next_prefix) = &next_prefix {
517 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
518 } else {
519 rocksdb::IteratorMode::End
520 };
521 Ok(fedimint_core::runtime::block_in_place(|| {
522 let mut options = rocksdb::ReadOptions::default();
523 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
524 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
525 let rocksdb_iter = iter.map_while(move |res| {
526 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
527 key_bytes
528 .starts_with(&prefix)
529 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
530 });
531 Box::pin(stream::iter(rocksdb_iter))
532 }))
533 }
534}
535
536impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
537
538#[async_trait]
539impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
540 async fn commit_tx(self) -> DatabaseResult<()> {
541 panic!("Cannot commit a read only transaction");
542 }
543}
544
545#[cfg(test)]
546mod fedimint_rocksdb_tests {
547 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
548 use fedimint_core::encoding::{Decodable, Encodable};
549 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
550 use fedimint_core::{impl_db_lookup, impl_db_record};
551 use futures::StreamExt;
552
553 use super::*;
554
555 fn open_temp_db(temp_path: &str) -> Database {
556 let path = tempfile::Builder::new()
557 .prefix(temp_path)
558 .tempdir()
559 .unwrap();
560
561 Database::new(
562 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
563 ModuleDecoderRegistry::default(),
564 )
565 }
566
567 #[tokio::test(flavor = "multi_thread")]
568 async fn test_dbtx_insert_elements() {
569 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
570 .await;
571 }
572
573 #[tokio::test(flavor = "multi_thread")]
574 async fn test_dbtx_remove_nonexisting() {
575 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
576 "fcb-rocksdb-test-remove-nonexisting",
577 ))
578 .await;
579 }
580
581 #[tokio::test(flavor = "multi_thread")]
582 async fn test_dbtx_remove_existing() {
583 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
584 .await;
585 }
586
587 #[tokio::test(flavor = "multi_thread")]
588 async fn test_dbtx_read_own_writes() {
589 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
590 .await;
591 }
592
593 #[tokio::test(flavor = "multi_thread")]
594 async fn test_dbtx_prevent_dirty_reads() {
595 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
596 "fcb-rocksdb-test-prevent-dirty-reads",
597 ))
598 .await;
599 }
600
601 #[tokio::test(flavor = "multi_thread")]
602 async fn test_dbtx_find_by_range() {
603 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
604 .await;
605 }
606
607 #[tokio::test(flavor = "multi_thread")]
608 async fn test_dbtx_find_by_prefix() {
609 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
610 .await;
611 }
612
613 #[tokio::test(flavor = "multi_thread")]
614 async fn test_dbtx_commit() {
615 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn test_dbtx_prevent_nonrepeatable_reads() {
620 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
621 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
622 ))
623 .await;
624 }
625
626 #[tokio::test(flavor = "multi_thread")]
627 async fn test_dbtx_snapshot_isolation() {
628 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
629 "fcb-rocksdb-test-snapshot-isolation",
630 ))
631 .await;
632 }
633
634 #[tokio::test(flavor = "multi_thread")]
635 async fn test_dbtx_phantom_entry() {
636 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
637 .await;
638 }
639
640 #[tokio::test(flavor = "multi_thread")]
641 async fn test_dbtx_write_conflict() {
642 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
643 .await;
644 }
645
646 #[tokio::test(flavor = "multi_thread")]
649 async fn test_concurrent_transaction_conflict_with_autocommit() {
650 use std::sync::Arc;
651
652 let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
653
654 let mut handles = Vec::new();
657
658 for i in 0u64..10 {
659 let db_clone = Arc::clone(&db);
660 let handle =
661 fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
662 for j in 0u64..10 {
663 let result = db_clone
665 .autocommit::<_, _, anyhow::Error>(
666 |dbtx, _| {
667 #[allow(clippy::cast_possible_truncation)]
668 let val = (i * 100 + j) as u8;
669 Box::pin(async move {
670 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
672 .await;
673 Ok(())
674 })
675 },
676 None, )
678 .await;
679
680 assert!(
682 result.is_ok(),
683 "Transaction should succeed after retries, got: {result:?}",
684 );
685 }
686 });
687 handles.push(handle);
688 }
689
690 for handle in handles {
692 handle.await.expect("Task should not panic");
693 }
694 }
695
696 #[tokio::test(flavor = "multi_thread")]
697 async fn test_dbtx_remove_by_prefix() {
698 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
699 "fcb-rocksdb-test-remove-by-prefix",
700 ))
701 .await;
702 }
703
704 #[tokio::test(flavor = "multi_thread")]
705 async fn test_module_dbtx() {
706 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
707 .await;
708 }
709
710 #[tokio::test(flavor = "multi_thread")]
711 async fn test_module_db() {
712 let module_instance_id = 1;
713 let path = tempfile::Builder::new()
714 .prefix("fcb-rocksdb-test-module-db-prefix")
715 .tempdir()
716 .unwrap();
717
718 let module_db = Database::new(
719 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
720 ModuleDecoderRegistry::default(),
721 );
722
723 fedimint_core::db::verify_module_db(
724 open_temp_db("fcb-rocksdb-test-module-db"),
725 module_db.with_prefix_module_id(module_instance_id).0,
726 )
727 .await;
728 }
729
730 #[test]
731 fn test_next_prefix() {
732 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
735 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
736 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
737 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
738 assert!(next_prefix(&[255, 255, 255]).is_none());
740 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
742 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
743 assert!(next_prefix(&[255]).is_none()); }
745
746 #[repr(u8)]
747 #[derive(Clone)]
748 pub enum TestDbKeyPrefix {
749 Test = 254,
750 MaxTest = 255,
751 }
752
753 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
754 pub(super) struct TestKey(pub Vec<u8>);
755
756 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
757 pub(super) struct TestVal(pub Vec<u8>);
758
759 #[derive(Debug, Encodable, Decodable)]
760 struct DbPrefixTestPrefix;
761
762 impl_db_record!(
763 key = TestKey,
764 value = TestVal,
765 db_prefix = TestDbKeyPrefix::Test,
766 notify_on_modify = true,
767 );
768 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
769
770 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
771 pub(super) struct TestKey2(pub Vec<u8>);
772
773 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
774 pub(super) struct TestVal2(pub Vec<u8>);
775
776 #[derive(Debug, Encodable, Decodable)]
777 struct DbPrefixTestPrefixMax;
778
779 impl_db_record!(
780 key = TestKey2,
781 value = TestVal2,
782 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
784 );
785 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
786
787 #[tokio::test(flavor = "multi_thread")]
788 async fn test_retrieve_descending_order() {
789 let path = tempfile::Builder::new()
790 .prefix("fcb-rocksdb-test-descending-order")
791 .tempdir()
792 .unwrap();
793 {
794 let db = Database::new(
795 RocksDb::build(&path).open().await.unwrap(),
796 ModuleDecoderRegistry::default(),
797 );
798 let mut dbtx = db.begin_transaction().await;
799 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
800 .await;
801 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
802 .await;
803 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
804 .await;
805 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
806 .await;
807 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
808 .await;
809 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
810 .await;
811 let query = dbtx
812 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
813 .await
814 .collect::<Vec<_>>()
815 .await;
816 assert_eq!(
817 query,
818 vec![
819 (TestKey(vec![255]), TestVal(vec![2])),
820 (TestKey(vec![254]), TestVal(vec![1])),
821 (TestKey(vec![0]), TestVal(vec![3]))
822 ]
823 );
824 let query = dbtx
825 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
826 .await
827 .collect::<Vec<_>>()
828 .await;
829 assert_eq!(
830 query,
831 vec![
832 (TestKey2(vec![255]), TestVal2(vec![2])),
833 (TestKey2(vec![254]), TestVal2(vec![1])),
834 (TestKey2(vec![0]), TestVal2(vec![3]))
835 ]
836 );
837 dbtx.commit_tx().await;
838 }
839 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
841 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
842 let mut dbtx = db_readonly.begin_transaction_nc().await;
843 let query = dbtx
844 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
845 .await
846 .collect::<Vec<_>>()
847 .await;
848 assert_eq!(
849 query,
850 vec![
851 (TestKey(vec![255]), TestVal(vec![2])),
852 (TestKey(vec![254]), TestVal(vec![1])),
853 (TestKey(vec![0]), TestVal(vec![3]))
854 ]
855 );
856 let query = dbtx
857 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
858 .await
859 .collect::<Vec<_>>()
860 .await;
861 assert_eq!(
862 query,
863 vec![
864 (TestKey2(vec![255]), TestVal2(vec![2])),
865 (TestKey2(vec![254]), TestVal2(vec![1])),
866 (TestKey2(vec![0]), TestVal2(vec![3]))
867 ]
868 );
869 }
870}