Skip to main content

fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::{FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV, FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV};
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52    /// Open the database using blocking IO
53    #[builder(start_fn = build)]
54    #[builder(finish_fn = open_blocking)]
55    pub fn open_blocking(
56        #[builder(start_fn)] db_path: impl AsRef<Path>,
57        /// Relaxed consistency allows opening the database
58        /// even if the wal got corrupted.
59        relaxed_consistency: Option<bool>,
60    ) -> anyhow::Result<Locked<RocksDb>> {
61        let db_path = db_path.as_ref();
62
63        block_in_place(|| {
64            std::fs::create_dir_all(
65                db_path
66                    .parent()
67                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68            )?;
69            LockedBuilder::new(db_path)?.with_db(|| {
70                Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71            })
72        })
73    }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78    S: rocks_db_open_blocking_builder::State,
79    I1: std::convert::AsRef<std::path::Path>,
80{
81    /// Open the database
82    #[allow(clippy::unused_async)]
83    pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84        block_in_place(|| self.open_blocking())
85    }
86}
87
88impl RocksDb {
89    fn open_blocking_unlocked(
90        db_path: &Path,
91        relaxed_consistency: bool,
92    ) -> anyhow::Result<RocksDb> {
93        let mut opts = get_default_options()?;
94        if relaxed_consistency {
95            // https://github.com/fedimint/fedimint/issues/8072
96            opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97        } else {
98            // Since we turned synchronous writes one we should never encounter a corrupted
99            // WAL and should rather fail in this case
100            opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101        }
102        let db: rocksdb::OptimisticTransactionDB =
103            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104        Ok(RocksDb(db))
105    }
106
107    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108        &self.0
109    }
110}
111
112// TODO: Remove this and inline it in the places where it's used.
113fn is_power_of_two(num: usize) -> bool {
114    num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.write_str("RocksDbTransaction")
120    }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.write_str("RocksDbTransaction")
126    }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131    assert!(!is_power_of_two(0));
132    assert!(is_power_of_two(1));
133    assert!(is_power_of_two(2));
134    assert!(!is_power_of_two(3));
135    assert!(is_power_of_two(4));
136    assert!(!is_power_of_two(5));
137    assert!(is_power_of_two(2 << 10));
138    assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141/// Default write buffer size: 2 MiB (`RocksDB` default is 64 MiB)
142const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
143
144/// Default block cache size: 2 MiB (`RocksDB` default is 8 MiB).
145/// Index/filter blocks are placed in this cache too
146/// (`set_cache_index_and_filter_blocks`), so everything is bounded.
147/// We only need correctness, not throughput, so we keep this minimal.
148const DEFAULT_BLOCK_CACHE_SIZE: usize = 2 * 1024 * 1024;
149
150/// Default max open files: 256 (`RocksDB` default is unlimited which
151/// consumes memory for each open file handle and associated metadata)
152const DEFAULT_MAX_OPEN_FILES: i32 = 256;
153
154fn parse_env_size(env_name: &str) -> anyhow::Result<Option<usize>> {
155    let Ok(var) = std::env::var(env_name) else {
156        return Ok(None);
157    };
158    let size: usize =
159        FromStr::from_str(&var).with_context(|| format!("Could not parse {env_name}"))?;
160    if !is_power_of_two(size) {
161        bail!("{env_name} is not a power of 2");
162    }
163    Ok(Some(size))
164}
165
166fn get_default_options() -> anyhow::Result<rocksdb::Options> {
167    let mut opts = rocksdb::Options::default();
168
169    let write_buffer_size =
170        parse_env_size(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV)?.unwrap_or(DEFAULT_WRITE_BUFFER_SIZE);
171    opts.set_write_buffer_size(write_buffer_size);
172
173    // Keep at most 2 write buffers (1 active + 1 flushing)
174    opts.set_max_write_buffer_number(2);
175
176    let block_cache_size =
177        parse_env_size(FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV)?.unwrap_or(DEFAULT_BLOCK_CACHE_SIZE);
178    let cache = rocksdb::Cache::new_lru_cache(block_cache_size);
179    let mut block_opts = rocksdb::BlockBasedOptions::default();
180    block_opts.set_block_cache(&cache);
181    // Put index and filter blocks into the block cache so they are
182    // bounded by the same memory budget instead of growing unbounded.
183    block_opts.set_cache_index_and_filter_blocks(true);
184    opts.set_block_based_table_factory(&block_opts);
185
186    opts.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
187
188    debug!(
189        write_buffer_size,
190        block_cache_size,
191        max_open_files = DEFAULT_MAX_OPEN_FILES,
192        "RocksDB memory options"
193    );
194
195    opts.create_if_missing(true);
196    Ok(opts)
197}
198
199#[derive(Debug)]
200pub struct RocksDbReadOnly(rocksdb::DB);
201
202pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
203
204impl RocksDbReadOnly {
205    #[allow(clippy::unused_async)]
206    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
207        let db_path = db_path.as_ref();
208        block_in_place(|| Self::open_read_only_blocking(db_path))
209    }
210
211    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
212        let opts = get_default_options()?;
213        // Note: rocksdb is OK if one process has write access, and other read-access
214        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
215        Ok(RocksDbReadOnly(db))
216    }
217}
218
219impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
220    fn from(db: OptimisticTransactionDB) -> Self {
221        RocksDb(db)
222    }
223}
224
225impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
226    fn from(db: RocksDb) -> Self {
227        db.0
228    }
229}
230
231// When finding by prefix iterating in Reverse order, we need to start from
232// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
233// below.
234// Will return None if there is no next prefix (i.e prefix is already the last
235// possible/max one)
236fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
237    let mut next_prefix = prefix.to_vec();
238    let mut is_last_prefix = true;
239    for i in (0..next_prefix.len()).rev() {
240        next_prefix[i] = next_prefix[i].wrapping_add(1);
241        if next_prefix[i] > 0 {
242            is_last_prefix = false;
243            break;
244        }
245    }
246    if is_last_prefix {
247        // The given prefix is already the last/max prefix, so there is no next prefix,
248        // return None to represent that
249        None
250    } else {
251        Some(next_prefix)
252    }
253}
254
255#[async_trait]
256impl IRawDatabase for RocksDb {
257    type Transaction<'a> = RocksDbTransaction<'a>;
258    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
259        let mut optimistic_options = OptimisticTransactionOptions::default();
260        optimistic_options.set_snapshot(true);
261
262        let mut write_options = WriteOptions::default();
263        // Make sure we never lose data on unclean shutdown
264        write_options.set_sync(true);
265
266        RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
267    }
268
269    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
270        let checkpoint =
271            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
272        checkpoint
273            .create_checkpoint(backup_path)
274            .map_err(DatabaseError::backend)?;
275        Ok(())
276    }
277}
278
279#[async_trait]
280impl IRawDatabase for RocksDbReadOnly {
281    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
282    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
283        RocksDbReadOnlyTransaction(&self.0)
284    }
285
286    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
287        let checkpoint =
288            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
289        checkpoint
290            .create_checkpoint(backup_path)
291            .map_err(DatabaseError::backend)?;
292        Ok(())
293    }
294}
295
296#[async_trait]
297impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
298    async fn raw_insert_bytes(
299        &mut self,
300        key: &[u8],
301        value: &[u8],
302    ) -> DatabaseResult<Option<Vec<u8>>> {
303        fedimint_core::runtime::block_in_place(|| {
304            let val = self.0.snapshot().get(key).unwrap();
305            self.0.put(key, value).map_err(DatabaseError::backend)?;
306            Ok(val)
307        })
308    }
309
310    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
311        fedimint_core::runtime::block_in_place(|| {
312            self.0.snapshot().get(key).map_err(DatabaseError::backend)
313        })
314    }
315
316    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
317        fedimint_core::runtime::block_in_place(|| {
318            let val = self.0.snapshot().get(key).unwrap();
319            self.0.delete(key).map_err(DatabaseError::backend)?;
320            Ok(val)
321        })
322    }
323
324    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
325        Ok(fedimint_core::runtime::block_in_place(|| {
326            let prefix = key_prefix.to_vec();
327            let mut options = rocksdb::ReadOptions::default();
328            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
329            let iter = self.0.snapshot().iterator_opt(
330                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
331                options,
332            );
333            let rocksdb_iter = iter.map_while(move |res| {
334                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
335                key_bytes
336                    .starts_with(&prefix)
337                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
338            });
339            Box::pin(convert_to_async_stream(rocksdb_iter))
340        }))
341    }
342
343    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
344        Ok(fedimint_core::runtime::block_in_place(|| {
345            let range = Range {
346                start: range.start.to_vec(),
347                end: range.end.to_vec(),
348            };
349            let mut options = rocksdb::ReadOptions::default();
350            options.set_iterate_range(range.clone());
351            let iter = self.0.snapshot().iterator_opt(
352                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
353                options,
354            );
355            let rocksdb_iter = iter.map_while(move |res| {
356                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
357                (key_bytes.as_ref() < range.end.as_slice())
358                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
359            });
360            Box::pin(convert_to_async_stream(rocksdb_iter))
361        }))
362    }
363
364    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
365        fedimint_core::runtime::block_in_place(|| {
366            // Note: delete_range is not supported in Transactions :/
367            let mut options = rocksdb::ReadOptions::default();
368            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
369            let iter = self
370                .0
371                .snapshot()
372                .iterator_opt(
373                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
374                    options,
375                )
376                .map_while(|res| {
377                    res.map(|(key_bytes, _)| {
378                        key_bytes
379                            .starts_with(key_prefix)
380                            .then_some(key_bytes.to_vec())
381                    })
382                    .transpose()
383                });
384
385            for item in iter {
386                let key = item.map_err(DatabaseError::backend)?;
387                self.0.delete(key).map_err(DatabaseError::backend)?;
388            }
389
390            Ok(())
391        })
392    }
393
394    async fn raw_find_by_prefix_sorted_descending(
395        &mut self,
396        key_prefix: &[u8],
397    ) -> DatabaseResult<PrefixStream<'_>> {
398        let prefix = key_prefix.to_vec();
399        let next_prefix = next_prefix(&prefix);
400        let iterator_mode = if let Some(next_prefix) = &next_prefix {
401            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
402        } else {
403            rocksdb::IteratorMode::End
404        };
405        Ok(fedimint_core::runtime::block_in_place(|| {
406            let mut options = rocksdb::ReadOptions::default();
407            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
408            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
409            let rocksdb_iter = iter.map_while(move |res| {
410                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
411                key_bytes
412                    .starts_with(&prefix)
413                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
414            });
415            Box::pin(convert_to_async_stream(rocksdb_iter))
416        }))
417    }
418}
419
420impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
421
422#[async_trait]
423impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
424    async fn commit_tx(self) -> DatabaseResult<()> {
425        fedimint_core::runtime::block_in_place(|| {
426            match self.0.commit() {
427                Ok(()) => Ok(()),
428                Err(err) => {
429                    // RocksDB's OptimisticTransactionDB can return Busy/TryAgain errors
430                    // when concurrent transactions conflict on the same keys.
431                    // These are retriable - return WriteConflict so autocommit retries.
432                    // See: https://github.com/fedimint/fedimint/issues/8077
433                    match err.kind() {
434                        rocksdb::ErrorKind::Busy
435                        | rocksdb::ErrorKind::TryAgain
436                        | rocksdb::ErrorKind::MergeInProgress
437                        | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
438                        _ => Err(DatabaseError::backend(err)),
439                    }
440                }
441            }
442        })
443    }
444}
445
446#[async_trait]
447impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
448    async fn raw_insert_bytes(
449        &mut self,
450        _key: &[u8],
451        _value: &[u8],
452    ) -> DatabaseResult<Option<Vec<u8>>> {
453        panic!("Cannot insert into a read only transaction");
454    }
455
456    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
457        fedimint_core::runtime::block_in_place(|| {
458            self.0.snapshot().get(key).map_err(DatabaseError::backend)
459        })
460    }
461
462    async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
463        panic!("Cannot remove from a read only transaction");
464    }
465
466    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
467        Ok(fedimint_core::runtime::block_in_place(|| {
468            let range = Range {
469                start: range.start.to_vec(),
470                end: range.end.to_vec(),
471            };
472            let mut options = rocksdb::ReadOptions::default();
473            options.set_iterate_range(range.clone());
474            let iter = self.0.snapshot().iterator_opt(
475                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
476                options,
477            );
478            let rocksdb_iter = iter.map_while(move |res| {
479                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
480                (key_bytes.as_ref() < range.end.as_slice())
481                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
482            });
483            Box::pin(convert_to_async_stream(rocksdb_iter))
484        }))
485    }
486
487    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
488        Ok(fedimint_core::runtime::block_in_place(|| {
489            let prefix = key_prefix.to_vec();
490            let mut options = rocksdb::ReadOptions::default();
491            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
492            let iter = self.0.snapshot().iterator_opt(
493                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
494                options,
495            );
496            let rocksdb_iter = iter.map_while(move |res| {
497                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
498                key_bytes
499                    .starts_with(&prefix)
500                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
501            });
502            Box::pin(convert_to_async_stream(rocksdb_iter))
503        }))
504    }
505
506    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
507        panic!("Cannot remove from a read only transaction");
508    }
509
510    async fn raw_find_by_prefix_sorted_descending(
511        &mut self,
512        key_prefix: &[u8],
513    ) -> DatabaseResult<PrefixStream<'_>> {
514        let prefix = key_prefix.to_vec();
515        let next_prefix = next_prefix(&prefix);
516        let iterator_mode = if let Some(next_prefix) = &next_prefix {
517            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
518        } else {
519            rocksdb::IteratorMode::End
520        };
521        Ok(fedimint_core::runtime::block_in_place(|| {
522            let mut options = rocksdb::ReadOptions::default();
523            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
524            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
525            let rocksdb_iter = iter.map_while(move |res| {
526                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
527                key_bytes
528                    .starts_with(&prefix)
529                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
530            });
531            Box::pin(stream::iter(rocksdb_iter))
532        }))
533    }
534}
535
536impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
537
538#[async_trait]
539impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
540    async fn commit_tx(self) -> DatabaseResult<()> {
541        panic!("Cannot commit a read only transaction");
542    }
543}
544
545#[cfg(test)]
546mod fedimint_rocksdb_tests {
547    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
548    use fedimint_core::encoding::{Decodable, Encodable};
549    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
550    use fedimint_core::{impl_db_lookup, impl_db_record};
551    use futures::StreamExt;
552
553    use super::*;
554
555    fn open_temp_db(temp_path: &str) -> Database {
556        let path = tempfile::Builder::new()
557            .prefix(temp_path)
558            .tempdir()
559            .unwrap();
560
561        Database::new(
562            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
563            ModuleDecoderRegistry::default(),
564        )
565    }
566
567    #[tokio::test(flavor = "multi_thread")]
568    async fn test_dbtx_insert_elements() {
569        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
570            .await;
571    }
572
573    #[tokio::test(flavor = "multi_thread")]
574    async fn test_dbtx_remove_nonexisting() {
575        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
576            "fcb-rocksdb-test-remove-nonexisting",
577        ))
578        .await;
579    }
580
581    #[tokio::test(flavor = "multi_thread")]
582    async fn test_dbtx_remove_existing() {
583        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
584            .await;
585    }
586
587    #[tokio::test(flavor = "multi_thread")]
588    async fn test_dbtx_read_own_writes() {
589        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
590            .await;
591    }
592
593    #[tokio::test(flavor = "multi_thread")]
594    async fn test_dbtx_prevent_dirty_reads() {
595        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
596            "fcb-rocksdb-test-prevent-dirty-reads",
597        ))
598        .await;
599    }
600
601    #[tokio::test(flavor = "multi_thread")]
602    async fn test_dbtx_find_by_range() {
603        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
604            .await;
605    }
606
607    #[tokio::test(flavor = "multi_thread")]
608    async fn test_dbtx_find_by_prefix() {
609        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
610            .await;
611    }
612
613    #[tokio::test(flavor = "multi_thread")]
614    async fn test_dbtx_commit() {
615        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
616    }
617
618    #[tokio::test(flavor = "multi_thread")]
619    async fn test_dbtx_prevent_nonrepeatable_reads() {
620        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
621            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
622        ))
623        .await;
624    }
625
626    #[tokio::test(flavor = "multi_thread")]
627    async fn test_dbtx_snapshot_isolation() {
628        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
629            "fcb-rocksdb-test-snapshot-isolation",
630        ))
631        .await;
632    }
633
634    #[tokio::test(flavor = "multi_thread")]
635    async fn test_dbtx_phantom_entry() {
636        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
637            .await;
638    }
639
640    #[tokio::test(flavor = "multi_thread")]
641    async fn test_dbtx_write_conflict() {
642        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
643            .await;
644    }
645
646    /// Test that concurrent transaction conflicts are handled gracefully
647    /// with autocommit retry logic instead of panicking.
648    #[tokio::test(flavor = "multi_thread")]
649    async fn test_concurrent_transaction_conflict_with_autocommit() {
650        use std::sync::Arc;
651
652        let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
653
654        // Spawn multiple concurrent tasks that all write to the same key
655        // This will trigger optimistic transaction conflicts
656        let mut handles = Vec::new();
657
658        for i in 0u64..10 {
659            let db_clone = Arc::clone(&db);
660            let handle =
661                fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
662                    for j in 0u64..10 {
663                        // Use autocommit which handles retriable errors with retry logic
664                        let result = db_clone
665                            .autocommit::<_, _, anyhow::Error>(
666                                |dbtx, _| {
667                                    #[allow(clippy::cast_possible_truncation)]
668                                    let val = (i * 100 + j) as u8;
669                                    Box::pin(async move {
670                                        // All transactions write to the same key to force conflicts
671                                        dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
672                                            .await;
673                                        Ok(())
674                                    })
675                                },
676                                None, // unlimited retries
677                            )
678                            .await;
679
680                        // Should succeed after retries, must NOT panic with "Resource busy"
681                        assert!(
682                            result.is_ok(),
683                            "Transaction should succeed after retries, got: {result:?}",
684                        );
685                    }
686                });
687            handles.push(handle);
688        }
689
690        // Wait for all tasks - none should panic
691        for handle in handles {
692            handle.await.expect("Task should not panic");
693        }
694    }
695
696    #[tokio::test(flavor = "multi_thread")]
697    async fn test_dbtx_remove_by_prefix() {
698        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
699            "fcb-rocksdb-test-remove-by-prefix",
700        ))
701        .await;
702    }
703
704    #[tokio::test(flavor = "multi_thread")]
705    async fn test_module_dbtx() {
706        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
707            .await;
708    }
709
710    #[tokio::test(flavor = "multi_thread")]
711    async fn test_module_db() {
712        let module_instance_id = 1;
713        let path = tempfile::Builder::new()
714            .prefix("fcb-rocksdb-test-module-db-prefix")
715            .tempdir()
716            .unwrap();
717
718        let module_db = Database::new(
719            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
720            ModuleDecoderRegistry::default(),
721        );
722
723        fedimint_core::db::verify_module_db(
724            open_temp_db("fcb-rocksdb-test-module-db"),
725            module_db.with_prefix_module_id(module_instance_id).0,
726        )
727        .await;
728    }
729
730    #[test]
731    fn test_next_prefix() {
732        // Note: although we are testing the general case of a vector with N elements,
733        // the prefixes currently use N = 1
734        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
735        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
736        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
737        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
738        // this is a "max" prefix
739        assert!(next_prefix(&[255, 255, 255]).is_none());
740        // these are the common case
741        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
742        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
743        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
744    }
745
746    #[repr(u8)]
747    #[derive(Clone)]
748    pub enum TestDbKeyPrefix {
749        Test = 254,
750        MaxTest = 255,
751    }
752
753    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
754    pub(super) struct TestKey(pub Vec<u8>);
755
756    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
757    pub(super) struct TestVal(pub Vec<u8>);
758
759    #[derive(Debug, Encodable, Decodable)]
760    struct DbPrefixTestPrefix;
761
762    impl_db_record!(
763        key = TestKey,
764        value = TestVal,
765        db_prefix = TestDbKeyPrefix::Test,
766        notify_on_modify = true,
767    );
768    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
769
770    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
771    pub(super) struct TestKey2(pub Vec<u8>);
772
773    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
774    pub(super) struct TestVal2(pub Vec<u8>);
775
776    #[derive(Debug, Encodable, Decodable)]
777    struct DbPrefixTestPrefixMax;
778
779    impl_db_record!(
780        key = TestKey2,
781        value = TestVal2,
782        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
783        notify_on_modify = true,
784    );
785    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
786
787    #[tokio::test(flavor = "multi_thread")]
788    async fn test_retrieve_descending_order() {
789        let path = tempfile::Builder::new()
790            .prefix("fcb-rocksdb-test-descending-order")
791            .tempdir()
792            .unwrap();
793        {
794            let db = Database::new(
795                RocksDb::build(&path).open().await.unwrap(),
796                ModuleDecoderRegistry::default(),
797            );
798            let mut dbtx = db.begin_transaction().await;
799            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
800                .await;
801            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
802                .await;
803            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
804                .await;
805            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
806                .await;
807            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
808                .await;
809            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
810                .await;
811            let query = dbtx
812                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
813                .await
814                .collect::<Vec<_>>()
815                .await;
816            assert_eq!(
817                query,
818                vec![
819                    (TestKey(vec![255]), TestVal(vec![2])),
820                    (TestKey(vec![254]), TestVal(vec![1])),
821                    (TestKey(vec![0]), TestVal(vec![3]))
822                ]
823            );
824            let query = dbtx
825                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
826                .await
827                .collect::<Vec<_>>()
828                .await;
829            assert_eq!(
830                query,
831                vec![
832                    (TestKey2(vec![255]), TestVal2(vec![2])),
833                    (TestKey2(vec![254]), TestVal2(vec![1])),
834                    (TestKey2(vec![0]), TestVal2(vec![3]))
835                ]
836            );
837            dbtx.commit_tx().await;
838        }
839        // Test readonly implementation
840        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
841        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
842        let mut dbtx = db_readonly.begin_transaction_nc().await;
843        let query = dbtx
844            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
845            .await
846            .collect::<Vec<_>>()
847            .await;
848        assert_eq!(
849            query,
850            vec![
851                (TestKey(vec![255]), TestVal(vec![2])),
852                (TestKey(vec![254]), TestVal(vec![1])),
853                (TestKey(vec![0]), TestVal(vec![3]))
854            ]
855        );
856        let query = dbtx
857            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
858            .await
859            .collect::<Vec<_>>()
860            .await;
861        assert_eq!(
862            query,
863            vec![
864                (TestKey2(vec![255]), TestVal2(vec![2])),
865                (TestKey2(vec![254]), TestVal2(vec![1])),
866                (TestKey2(vec![0]), TestVal2(vec![3]))
867            ]
868        );
869    }
870}