1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17 IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34 I: Iterator + Send + 'i,
35 I::Item: Send,
36{
37 stream::unfold(iter, |mut iter| async {
38 fedimint_core::runtime::block_in_place(|| {
39 let item = iter.next();
40 item.map(|item| (item, iter))
41 })
42 })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52 #[builder(start_fn = build)]
54 #[builder(finish_fn = open_blocking)]
55 pub fn open_blocking(
56 #[builder(start_fn)] db_path: impl AsRef<Path>,
57 relaxed_consistency: Option<bool>,
60 ) -> anyhow::Result<Locked<RocksDb>> {
61 let db_path = db_path.as_ref();
62
63 block_in_place(|| {
64 std::fs::create_dir_all(
65 db_path
66 .parent()
67 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68 )?;
69 LockedBuilder::new(db_path)?.with_db(|| {
70 Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71 })
72 })
73 }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78 S: rocks_db_open_blocking_builder::State,
79 I1: std::convert::AsRef<std::path::Path>,
80{
81 #[allow(clippy::unused_async)]
83 pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84 block_in_place(|| self.open_blocking())
85 }
86}
87
88impl RocksDb {
89 fn open_blocking_unlocked(
90 db_path: &Path,
91 relaxed_consistency: bool,
92 ) -> anyhow::Result<RocksDb> {
93 let mut opts = get_default_options()?;
94 if relaxed_consistency {
95 opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97 } else {
98 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101 }
102 let db: rocksdb::OptimisticTransactionDB =
103 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104 Ok(RocksDb(db))
105 }
106
107 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108 &self.0
109 }
110}
111
112fn is_power_of_two(num: usize) -> bool {
114 num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.write_str("RocksDbTransaction")
120 }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.write_str("RocksDbTransaction")
126 }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131 assert!(!is_power_of_two(0));
132 assert!(is_power_of_two(1));
133 assert!(is_power_of_two(2));
134 assert!(!is_power_of_two(3));
135 assert!(is_power_of_two(4));
136 assert!(!is_power_of_two(5));
137 assert!(is_power_of_two(2 << 10));
138 assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141fn get_default_options() -> anyhow::Result<rocksdb::Options> {
142 let mut opts = rocksdb::Options::default();
143 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
144 debug!(var, "Using custom write buffer size");
145 let size: usize = FromStr::from_str(&var)
146 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
147 if !is_power_of_two(size) {
148 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
149 }
150 opts.set_write_buffer_size(size);
151 }
152 opts.create_if_missing(true);
153 Ok(opts)
154}
155
156#[derive(Debug)]
157pub struct RocksDbReadOnly(rocksdb::DB);
158
159pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
160
161impl RocksDbReadOnly {
162 #[allow(clippy::unused_async)]
163 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
164 let db_path = db_path.as_ref();
165 block_in_place(|| Self::open_read_only_blocking(db_path))
166 }
167
168 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
169 let opts = get_default_options()?;
170 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
172 Ok(RocksDbReadOnly(db))
173 }
174}
175
176impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
177 fn from(db: OptimisticTransactionDB) -> Self {
178 RocksDb(db)
179 }
180}
181
182impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
183 fn from(db: RocksDb) -> Self {
184 db.0
185 }
186}
187
188fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
194 let mut next_prefix = prefix.to_vec();
195 let mut is_last_prefix = true;
196 for i in (0..next_prefix.len()).rev() {
197 next_prefix[i] = next_prefix[i].wrapping_add(1);
198 if next_prefix[i] > 0 {
199 is_last_prefix = false;
200 break;
201 }
202 }
203 if is_last_prefix {
204 None
207 } else {
208 Some(next_prefix)
209 }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDb {
214 type Transaction<'a> = RocksDbTransaction<'a>;
215 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
216 let mut optimistic_options = OptimisticTransactionOptions::default();
217 optimistic_options.set_snapshot(true);
218
219 let mut write_options = WriteOptions::default();
220 write_options.set_sync(true);
222
223 let mut rocksdb_tx =
224 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
225 rocksdb_tx
226 .set_tx_savepoint()
227 .await
228 .expect("setting tx savepoint failed");
229
230 rocksdb_tx
231 }
232
233 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
234 let checkpoint =
235 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
236 checkpoint
237 .create_checkpoint(backup_path)
238 .map_err(DatabaseError::backend)?;
239 Ok(())
240 }
241}
242
243#[async_trait]
244impl IRawDatabase for RocksDbReadOnly {
245 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
246 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
247 RocksDbReadOnlyTransaction(&self.0)
248 }
249
250 fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
251 let checkpoint =
252 rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
253 checkpoint
254 .create_checkpoint(backup_path)
255 .map_err(DatabaseError::backend)?;
256 Ok(())
257 }
258}
259
260#[async_trait]
261impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
262 async fn raw_insert_bytes(
263 &mut self,
264 key: &[u8],
265 value: &[u8],
266 ) -> DatabaseResult<Option<Vec<u8>>> {
267 fedimint_core::runtime::block_in_place(|| {
268 let val = self.0.snapshot().get(key).unwrap();
269 self.0.put(key, value).map_err(DatabaseError::backend)?;
270 Ok(val)
271 })
272 }
273
274 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
275 fedimint_core::runtime::block_in_place(|| {
276 self.0.snapshot().get(key).map_err(DatabaseError::backend)
277 })
278 }
279
280 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
281 fedimint_core::runtime::block_in_place(|| {
282 let val = self.0.snapshot().get(key).unwrap();
283 self.0.delete(key).map_err(DatabaseError::backend)?;
284 Ok(val)
285 })
286 }
287
288 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
289 Ok(fedimint_core::runtime::block_in_place(|| {
290 let prefix = key_prefix.to_vec();
291 let mut options = rocksdb::ReadOptions::default();
292 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
293 let iter = self.0.snapshot().iterator_opt(
294 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
295 options,
296 );
297 let rocksdb_iter = iter.map_while(move |res| {
298 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
299 key_bytes
300 .starts_with(&prefix)
301 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
302 });
303 Box::pin(convert_to_async_stream(rocksdb_iter))
304 }))
305 }
306
307 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
308 Ok(fedimint_core::runtime::block_in_place(|| {
309 let range = Range {
310 start: range.start.to_vec(),
311 end: range.end.to_vec(),
312 };
313 let mut options = rocksdb::ReadOptions::default();
314 options.set_iterate_range(range.clone());
315 let iter = self.0.snapshot().iterator_opt(
316 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
317 options,
318 );
319 let rocksdb_iter = iter.map_while(move |res| {
320 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
321 (key_bytes.as_ref() < range.end.as_slice())
322 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
323 });
324 Box::pin(convert_to_async_stream(rocksdb_iter))
325 }))
326 }
327
328 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
329 fedimint_core::runtime::block_in_place(|| {
330 let mut options = rocksdb::ReadOptions::default();
332 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
333 let iter = self
334 .0
335 .snapshot()
336 .iterator_opt(
337 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
338 options,
339 )
340 .map_while(|res| {
341 res.map(|(key_bytes, _)| {
342 key_bytes
343 .starts_with(key_prefix)
344 .then_some(key_bytes.to_vec())
345 })
346 .transpose()
347 });
348
349 for item in iter {
350 let key = item.map_err(DatabaseError::backend)?;
351 self.0.delete(key).map_err(DatabaseError::backend)?;
352 }
353
354 Ok(())
355 })
356 }
357
358 async fn raw_find_by_prefix_sorted_descending(
359 &mut self,
360 key_prefix: &[u8],
361 ) -> DatabaseResult<PrefixStream<'_>> {
362 let prefix = key_prefix.to_vec();
363 let next_prefix = next_prefix(&prefix);
364 let iterator_mode = if let Some(next_prefix) = &next_prefix {
365 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
366 } else {
367 rocksdb::IteratorMode::End
368 };
369 Ok(fedimint_core::runtime::block_in_place(|| {
370 let mut options = rocksdb::ReadOptions::default();
371 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
372 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
373 let rocksdb_iter = iter.map_while(move |res| {
374 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
375 key_bytes
376 .starts_with(&prefix)
377 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
378 });
379 Box::pin(convert_to_async_stream(rocksdb_iter))
380 }))
381 }
382}
383
384#[async_trait]
385impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
386 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
387 fedimint_core::runtime::block_in_place(|| {
388 self.0
389 .rollback_to_savepoint()
390 .map_err(DatabaseError::backend)
391 })
392 }
393
394 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
395 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
396
397 Ok(())
398 }
399}
400
401#[async_trait]
402impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
403 async fn commit_tx(self) -> DatabaseResult<()> {
404 fedimint_core::runtime::block_in_place(|| {
405 match self.0.commit() {
406 Ok(()) => Ok(()),
407 Err(err) => {
408 match err.kind() {
413 rocksdb::ErrorKind::Busy
414 | rocksdb::ErrorKind::TryAgain
415 | rocksdb::ErrorKind::MergeInProgress
416 | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
417 _ => Err(DatabaseError::backend(err)),
418 }
419 }
420 }
421 })
422 }
423}
424
425#[async_trait]
426impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
427 async fn raw_insert_bytes(
428 &mut self,
429 _key: &[u8],
430 _value: &[u8],
431 ) -> DatabaseResult<Option<Vec<u8>>> {
432 panic!("Cannot insert into a read only transaction");
433 }
434
435 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
436 fedimint_core::runtime::block_in_place(|| {
437 self.0.snapshot().get(key).map_err(DatabaseError::backend)
438 })
439 }
440
441 async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
442 panic!("Cannot remove from a read only transaction");
443 }
444
445 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
446 Ok(fedimint_core::runtime::block_in_place(|| {
447 let range = Range {
448 start: range.start.to_vec(),
449 end: range.end.to_vec(),
450 };
451 let mut options = rocksdb::ReadOptions::default();
452 options.set_iterate_range(range.clone());
453 let iter = self.0.snapshot().iterator_opt(
454 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
455 options,
456 );
457 let rocksdb_iter = iter.map_while(move |res| {
458 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
459 (key_bytes.as_ref() < range.end.as_slice())
460 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
461 });
462 Box::pin(convert_to_async_stream(rocksdb_iter))
463 }))
464 }
465
466 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
467 Ok(fedimint_core::runtime::block_in_place(|| {
468 let prefix = key_prefix.to_vec();
469 let mut options = rocksdb::ReadOptions::default();
470 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
471 let iter = self.0.snapshot().iterator_opt(
472 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
473 options,
474 );
475 let rocksdb_iter = iter.map_while(move |res| {
476 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
477 key_bytes
478 .starts_with(&prefix)
479 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
480 });
481 Box::pin(convert_to_async_stream(rocksdb_iter))
482 }))
483 }
484
485 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
486 panic!("Cannot remove from a read only transaction");
487 }
488
489 async fn raw_find_by_prefix_sorted_descending(
490 &mut self,
491 key_prefix: &[u8],
492 ) -> DatabaseResult<PrefixStream<'_>> {
493 let prefix = key_prefix.to_vec();
494 let next_prefix = next_prefix(&prefix);
495 let iterator_mode = if let Some(next_prefix) = &next_prefix {
496 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
497 } else {
498 rocksdb::IteratorMode::End
499 };
500 Ok(fedimint_core::runtime::block_in_place(|| {
501 let mut options = rocksdb::ReadOptions::default();
502 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
503 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
504 let rocksdb_iter = iter.map_while(move |res| {
505 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
506 key_bytes
507 .starts_with(&prefix)
508 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
509 });
510 Box::pin(stream::iter(rocksdb_iter))
511 }))
512 }
513}
514
515#[async_trait]
516impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
517 async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
518 panic!("Cannot rollback a read only transaction");
519 }
520
521 async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
522 panic!("Cannot set a savepoint in a read only transaction");
523 }
524}
525
526#[async_trait]
527impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
528 async fn commit_tx(self) -> DatabaseResult<()> {
529 panic!("Cannot commit a read only transaction");
530 }
531}
532
533#[cfg(test)]
534mod fedimint_rocksdb_tests {
535 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
536 use fedimint_core::encoding::{Decodable, Encodable};
537 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
538 use fedimint_core::{impl_db_lookup, impl_db_record};
539 use futures::StreamExt;
540
541 use super::*;
542
543 fn open_temp_db(temp_path: &str) -> Database {
544 let path = tempfile::Builder::new()
545 .prefix(temp_path)
546 .tempdir()
547 .unwrap();
548
549 Database::new(
550 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
551 ModuleDecoderRegistry::default(),
552 )
553 }
554
555 #[tokio::test(flavor = "multi_thread")]
556 async fn test_dbtx_insert_elements() {
557 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
558 .await;
559 }
560
561 #[tokio::test(flavor = "multi_thread")]
562 async fn test_dbtx_remove_nonexisting() {
563 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
564 "fcb-rocksdb-test-remove-nonexisting",
565 ))
566 .await;
567 }
568
569 #[tokio::test(flavor = "multi_thread")]
570 async fn test_dbtx_remove_existing() {
571 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
572 .await;
573 }
574
575 #[tokio::test(flavor = "multi_thread")]
576 async fn test_dbtx_read_own_writes() {
577 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
578 .await;
579 }
580
581 #[tokio::test(flavor = "multi_thread")]
582 async fn test_dbtx_prevent_dirty_reads() {
583 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
584 "fcb-rocksdb-test-prevent-dirty-reads",
585 ))
586 .await;
587 }
588
589 #[tokio::test(flavor = "multi_thread")]
590 async fn test_dbtx_find_by_range() {
591 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
592 .await;
593 }
594
595 #[tokio::test(flavor = "multi_thread")]
596 async fn test_dbtx_find_by_prefix() {
597 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
598 .await;
599 }
600
601 #[tokio::test(flavor = "multi_thread")]
602 async fn test_dbtx_commit() {
603 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
604 }
605
606 #[tokio::test(flavor = "multi_thread")]
607 async fn test_dbtx_prevent_nonrepeatable_reads() {
608 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
609 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
610 ))
611 .await;
612 }
613
614 #[tokio::test(flavor = "multi_thread")]
615 async fn test_dbtx_snapshot_isolation() {
616 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
617 "fcb-rocksdb-test-snapshot-isolation",
618 ))
619 .await;
620 }
621
622 #[tokio::test(flavor = "multi_thread")]
623 async fn test_dbtx_rollback_to_savepoint() {
624 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
625 "fcb-rocksdb-test-rollback-to-savepoint",
626 ))
627 .await;
628 }
629
630 #[tokio::test(flavor = "multi_thread")]
631 async fn test_dbtx_phantom_entry() {
632 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
633 .await;
634 }
635
636 #[tokio::test(flavor = "multi_thread")]
637 async fn test_dbtx_write_conflict() {
638 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
639 .await;
640 }
641
642 #[tokio::test(flavor = "multi_thread")]
645 async fn test_concurrent_transaction_conflict_with_autocommit() {
646 use std::sync::Arc;
647
648 let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
649
650 let mut handles = Vec::new();
653
654 for i in 0u64..10 {
655 let db_clone = Arc::clone(&db);
656 let handle =
657 fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
658 for j in 0u64..10 {
659 let result = db_clone
661 .autocommit::<_, _, anyhow::Error>(
662 |dbtx, _| {
663 #[allow(clippy::cast_possible_truncation)]
664 let val = (i * 100 + j) as u8;
665 Box::pin(async move {
666 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
668 .await;
669 Ok(())
670 })
671 },
672 None, )
674 .await;
675
676 assert!(
678 result.is_ok(),
679 "Transaction should succeed after retries, got: {result:?}",
680 );
681 }
682 });
683 handles.push(handle);
684 }
685
686 for handle in handles {
688 handle.await.expect("Task should not panic");
689 }
690 }
691
692 #[tokio::test(flavor = "multi_thread")]
693 async fn test_dbtx_remove_by_prefix() {
694 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
695 "fcb-rocksdb-test-remove-by-prefix",
696 ))
697 .await;
698 }
699
700 #[tokio::test(flavor = "multi_thread")]
701 async fn test_module_dbtx() {
702 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
703 .await;
704 }
705
706 #[tokio::test(flavor = "multi_thread")]
707 async fn test_module_db() {
708 let module_instance_id = 1;
709 let path = tempfile::Builder::new()
710 .prefix("fcb-rocksdb-test-module-db-prefix")
711 .tempdir()
712 .unwrap();
713
714 let module_db = Database::new(
715 RocksDb::build(path.as_ref()).open_blocking().unwrap(),
716 ModuleDecoderRegistry::default(),
717 );
718
719 fedimint_core::db::verify_module_db(
720 open_temp_db("fcb-rocksdb-test-module-db"),
721 module_db.with_prefix_module_id(module_instance_id).0,
722 )
723 .await;
724 }
725
726 #[test]
727 fn test_next_prefix() {
728 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
731 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
732 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
733 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
734 assert!(next_prefix(&[255, 255, 255]).is_none());
736 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
738 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
739 assert!(next_prefix(&[255]).is_none()); }
741
742 #[repr(u8)]
743 #[derive(Clone)]
744 pub enum TestDbKeyPrefix {
745 Test = 254,
746 MaxTest = 255,
747 }
748
749 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
750 pub(super) struct TestKey(pub Vec<u8>);
751
752 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
753 pub(super) struct TestVal(pub Vec<u8>);
754
755 #[derive(Debug, Encodable, Decodable)]
756 struct DbPrefixTestPrefix;
757
758 impl_db_record!(
759 key = TestKey,
760 value = TestVal,
761 db_prefix = TestDbKeyPrefix::Test,
762 notify_on_modify = true,
763 );
764 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
765
766 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
767 pub(super) struct TestKey2(pub Vec<u8>);
768
769 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
770 pub(super) struct TestVal2(pub Vec<u8>);
771
772 #[derive(Debug, Encodable, Decodable)]
773 struct DbPrefixTestPrefixMax;
774
775 impl_db_record!(
776 key = TestKey2,
777 value = TestVal2,
778 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
780 );
781 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
782
783 #[tokio::test(flavor = "multi_thread")]
784 async fn test_retrieve_descending_order() {
785 let path = tempfile::Builder::new()
786 .prefix("fcb-rocksdb-test-descending-order")
787 .tempdir()
788 .unwrap();
789 {
790 let db = Database::new(
791 RocksDb::build(&path).open().await.unwrap(),
792 ModuleDecoderRegistry::default(),
793 );
794 let mut dbtx = db.begin_transaction().await;
795 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
796 .await;
797 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
798 .await;
799 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
800 .await;
801 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
802 .await;
803 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
804 .await;
805 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
806 .await;
807 let query = dbtx
808 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
809 .await
810 .collect::<Vec<_>>()
811 .await;
812 assert_eq!(
813 query,
814 vec![
815 (TestKey(vec![255]), TestVal(vec![2])),
816 (TestKey(vec![254]), TestVal(vec![1])),
817 (TestKey(vec![0]), TestVal(vec![3]))
818 ]
819 );
820 let query = dbtx
821 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
822 .await
823 .collect::<Vec<_>>()
824 .await;
825 assert_eq!(
826 query,
827 vec![
828 (TestKey2(vec![255]), TestVal2(vec![2])),
829 (TestKey2(vec![254]), TestVal2(vec![1])),
830 (TestKey2(vec![0]), TestVal2(vec![3]))
831 ]
832 );
833 dbtx.commit_tx().await;
834 }
835 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
837 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
838 let mut dbtx = db_readonly.begin_transaction_nc().await;
839 let query = dbtx
840 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
841 .await
842 .collect::<Vec<_>>()
843 .await;
844 assert_eq!(
845 query,
846 vec![
847 (TestKey(vec![255]), TestVal(vec![2])),
848 (TestKey(vec![254]), TestVal(vec![1])),
849 (TestKey(vec![0]), TestVal(vec![3]))
850 ]
851 );
852 let query = dbtx
853 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
854 .await
855 .collect::<Vec<_>>()
856 .await;
857 assert_eq!(
858 query,
859 vec![
860 (TestKey2(vec![255]), TestVal2(vec![2])),
861 (TestKey2(vec![254]), TestVal2(vec![1])),
862 (TestKey2(vec![0]), TestVal2(vec![3]))
863 ]
864 );
865 }
866}