Skip to main content

fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::{FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV, FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV};
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52    /// Open the database using blocking IO
53    #[builder(start_fn = build)]
54    #[builder(finish_fn = open_blocking)]
55    pub fn open_blocking(
56        #[builder(start_fn)] db_path: impl AsRef<Path>,
57    ) -> anyhow::Result<Locked<RocksDb>> {
58        let db_path = db_path.as_ref();
59
60        block_in_place(|| {
61            std::fs::create_dir_all(
62                db_path
63                    .parent()
64                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
65            )?;
66            LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
67        })
68    }
69}
70
71impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
72where
73    S: rocks_db_open_blocking_builder::State,
74    I1: std::convert::AsRef<std::path::Path>,
75{
76    /// Open the database
77    #[allow(clippy::unused_async)]
78    pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
79        block_in_place(|| self.open_blocking())
80    }
81}
82
83impl RocksDb {
84    fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
85        let mut opts = get_default_options()?;
86        // Synchronous writes (set_sync(true)) ensure completed writes are
87        // durable, but a SIGKILL mid-write can still leave a truncated WAL tail
88        // record. TolerateCorruptedTailRecords (RocksDB's own default) discards
89        // only incomplete tail records — no committed data is lost.
90        // AbsoluteConsistency was used previously but made the database
91        // permanently unrecoverable after any unclean shutdown.
92        // See: https://github.com/fedimint/fedimint/issues/8072
93        opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
94        let db: rocksdb::OptimisticTransactionDB =
95            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
96        Ok(RocksDb(db))
97    }
98
99    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
100        &self.0
101    }
102}
103
104// TODO: Remove this and inline it in the places where it's used.
105fn is_power_of_two(num: usize) -> bool {
106    num.is_power_of_two()
107}
108
109impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.write_str("RocksDbTransaction")
112    }
113}
114
115impl fmt::Debug for RocksDbTransaction<'_> {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.write_str("RocksDbTransaction")
118    }
119}
120
121#[test]
122fn is_power_of_two_sanity() {
123    assert!(!is_power_of_two(0));
124    assert!(is_power_of_two(1));
125    assert!(is_power_of_two(2));
126    assert!(!is_power_of_two(3));
127    assert!(is_power_of_two(4));
128    assert!(!is_power_of_two(5));
129    assert!(is_power_of_two(2 << 10));
130    assert!(!is_power_of_two((2 << 10) + 1));
131}
132
133/// Default write buffer size: 2 MiB (`RocksDB` default is 64 MiB)
134const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
135
136/// Default block cache size: 2 MiB (`RocksDB` default is 8 MiB).
137/// Index/filter blocks are placed in this cache too
138/// (`set_cache_index_and_filter_blocks`), so everything is bounded.
139/// We only need correctness, not throughput, so we keep this minimal.
140const DEFAULT_BLOCK_CACHE_SIZE: usize = 2 * 1024 * 1024;
141
142/// Default max open files: 256 (`RocksDB` default is unlimited which
143/// consumes memory for each open file handle and associated metadata)
144const DEFAULT_MAX_OPEN_FILES: i32 = 256;
145
146fn parse_env_size(env_name: &str) -> anyhow::Result<Option<usize>> {
147    let Ok(var) = std::env::var(env_name) else {
148        return Ok(None);
149    };
150    let size: usize =
151        FromStr::from_str(&var).with_context(|| format!("Could not parse {env_name}"))?;
152    if !is_power_of_two(size) {
153        bail!("{env_name} is not a power of 2");
154    }
155    Ok(Some(size))
156}
157
158fn get_default_options() -> anyhow::Result<rocksdb::Options> {
159    let mut opts = rocksdb::Options::default();
160
161    let write_buffer_size =
162        parse_env_size(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV)?.unwrap_or(DEFAULT_WRITE_BUFFER_SIZE);
163    opts.set_write_buffer_size(write_buffer_size);
164
165    // Keep at most 2 write buffers (1 active + 1 flushing)
166    opts.set_max_write_buffer_number(2);
167
168    let block_cache_size =
169        parse_env_size(FM_ROCKSDB_BLOCK_CACHE_SIZE_ENV)?.unwrap_or(DEFAULT_BLOCK_CACHE_SIZE);
170    let cache = rocksdb::Cache::new_lru_cache(block_cache_size);
171    let mut block_opts = rocksdb::BlockBasedOptions::default();
172    block_opts.set_block_cache(&cache);
173    // Put index and filter blocks into the block cache so they are
174    // bounded by the same memory budget instead of growing unbounded.
175    block_opts.set_cache_index_and_filter_blocks(true);
176    opts.set_block_based_table_factory(&block_opts);
177
178    opts.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
179
180    debug!(
181        write_buffer_size,
182        block_cache_size,
183        max_open_files = DEFAULT_MAX_OPEN_FILES,
184        "RocksDB memory options"
185    );
186
187    opts.create_if_missing(true);
188    Ok(opts)
189}
190
191#[derive(Debug)]
192pub struct RocksDbReadOnly(rocksdb::DB);
193
194pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
195
196impl RocksDbReadOnly {
197    #[allow(clippy::unused_async)]
198    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
199        let db_path = db_path.as_ref();
200        block_in_place(|| Self::open_read_only_blocking(db_path))
201    }
202
203    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
204        let opts = get_default_options()?;
205        // Note: rocksdb is OK if one process has write access, and other read-access
206        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
207        Ok(RocksDbReadOnly(db))
208    }
209}
210
211impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
212    fn from(db: OptimisticTransactionDB) -> Self {
213        RocksDb(db)
214    }
215}
216
217impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
218    fn from(db: RocksDb) -> Self {
219        db.0
220    }
221}
222
223// When finding by prefix iterating in Reverse order, we need to start from
224// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
225// below.
226// Will return None if there is no next prefix (i.e prefix is already the last
227// possible/max one)
228fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
229    let mut next_prefix = prefix.to_vec();
230    let mut is_last_prefix = true;
231    for i in (0..next_prefix.len()).rev() {
232        next_prefix[i] = next_prefix[i].wrapping_add(1);
233        if next_prefix[i] > 0 {
234            is_last_prefix = false;
235            break;
236        }
237    }
238    if is_last_prefix {
239        // The given prefix is already the last/max prefix, so there is no next prefix,
240        // return None to represent that
241        None
242    } else {
243        Some(next_prefix)
244    }
245}
246
247#[async_trait]
248impl IRawDatabase for RocksDb {
249    type Transaction<'a> = RocksDbTransaction<'a>;
250    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
251        let mut optimistic_options = OptimisticTransactionOptions::default();
252        optimistic_options.set_snapshot(true);
253
254        let mut write_options = WriteOptions::default();
255        // Make sure we never lose data on unclean shutdown
256        write_options.set_sync(true);
257
258        RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
259    }
260
261    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
262        let checkpoint =
263            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
264        checkpoint
265            .create_checkpoint(backup_path)
266            .map_err(DatabaseError::backend)?;
267        Ok(())
268    }
269}
270
271#[async_trait]
272impl IRawDatabase for RocksDbReadOnly {
273    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
274    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
275        RocksDbReadOnlyTransaction(&self.0)
276    }
277
278    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
279        let checkpoint =
280            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
281        checkpoint
282            .create_checkpoint(backup_path)
283            .map_err(DatabaseError::backend)?;
284        Ok(())
285    }
286}
287
288#[async_trait]
289impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
290    async fn raw_insert_bytes(
291        &mut self,
292        key: &[u8],
293        value: &[u8],
294    ) -> DatabaseResult<Option<Vec<u8>>> {
295        fedimint_core::runtime::block_in_place(|| {
296            let val = self.0.snapshot().get(key).unwrap();
297            self.0.put(key, value).map_err(DatabaseError::backend)?;
298            Ok(val)
299        })
300    }
301
302    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
303        fedimint_core::runtime::block_in_place(|| {
304            self.0.snapshot().get(key).map_err(DatabaseError::backend)
305        })
306    }
307
308    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
309        fedimint_core::runtime::block_in_place(|| {
310            let val = self.0.snapshot().get(key).unwrap();
311            self.0.delete(key).map_err(DatabaseError::backend)?;
312            Ok(val)
313        })
314    }
315
316    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
317        Ok(fedimint_core::runtime::block_in_place(|| {
318            let prefix = key_prefix.to_vec();
319            let mut options = rocksdb::ReadOptions::default();
320            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
321            let iter = self.0.snapshot().iterator_opt(
322                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
323                options,
324            );
325            let rocksdb_iter = iter.map_while(move |res| {
326                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
327                key_bytes
328                    .starts_with(&prefix)
329                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
330            });
331            Box::pin(convert_to_async_stream(rocksdb_iter))
332        }))
333    }
334
335    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
336        Ok(fedimint_core::runtime::block_in_place(|| {
337            let range = Range {
338                start: range.start.to_vec(),
339                end: range.end.to_vec(),
340            };
341            let mut options = rocksdb::ReadOptions::default();
342            options.set_iterate_range(range.clone());
343            let iter = self.0.snapshot().iterator_opt(
344                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
345                options,
346            );
347            let rocksdb_iter = iter.map_while(move |res| {
348                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
349                (key_bytes.as_ref() < range.end.as_slice())
350                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
351            });
352            Box::pin(convert_to_async_stream(rocksdb_iter))
353        }))
354    }
355
356    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
357        fedimint_core::runtime::block_in_place(|| {
358            // Note: delete_range is not supported in Transactions :/
359            let mut options = rocksdb::ReadOptions::default();
360            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
361            let iter = self
362                .0
363                .snapshot()
364                .iterator_opt(
365                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
366                    options,
367                )
368                .map_while(|res| {
369                    res.map(|(key_bytes, _)| {
370                        key_bytes
371                            .starts_with(key_prefix)
372                            .then_some(key_bytes.to_vec())
373                    })
374                    .transpose()
375                });
376
377            for item in iter {
378                let key = item.map_err(DatabaseError::backend)?;
379                self.0.delete(key).map_err(DatabaseError::backend)?;
380            }
381
382            Ok(())
383        })
384    }
385
386    async fn raw_find_by_prefix_sorted_descending(
387        &mut self,
388        key_prefix: &[u8],
389    ) -> DatabaseResult<PrefixStream<'_>> {
390        let prefix = key_prefix.to_vec();
391        let next_prefix = next_prefix(&prefix);
392        let iterator_mode = if let Some(next_prefix) = &next_prefix {
393            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
394        } else {
395            rocksdb::IteratorMode::End
396        };
397        Ok(fedimint_core::runtime::block_in_place(|| {
398            let mut options = rocksdb::ReadOptions::default();
399            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
400            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
401            let rocksdb_iter = iter.map_while(move |res| {
402                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
403                key_bytes
404                    .starts_with(&prefix)
405                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
406            });
407            Box::pin(convert_to_async_stream(rocksdb_iter))
408        }))
409    }
410}
411
412impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
413
414#[async_trait]
415impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
416    async fn commit_tx(self) -> DatabaseResult<()> {
417        fedimint_core::runtime::block_in_place(|| {
418            match self.0.commit() {
419                Ok(()) => Ok(()),
420                Err(err) => {
421                    // RocksDB's OptimisticTransactionDB can return Busy/TryAgain errors
422                    // when concurrent transactions conflict on the same keys.
423                    // These are retriable - return WriteConflict so autocommit retries.
424                    // See: https://github.com/fedimint/fedimint/issues/8077
425                    match err.kind() {
426                        rocksdb::ErrorKind::Busy
427                        | rocksdb::ErrorKind::TryAgain
428                        | rocksdb::ErrorKind::MergeInProgress
429                        | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
430                        _ => Err(DatabaseError::backend(err)),
431                    }
432                }
433            }
434        })
435    }
436}
437
438#[async_trait]
439impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
440    async fn raw_insert_bytes(
441        &mut self,
442        _key: &[u8],
443        _value: &[u8],
444    ) -> DatabaseResult<Option<Vec<u8>>> {
445        panic!("Cannot insert into a read only transaction");
446    }
447
448    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
449        fedimint_core::runtime::block_in_place(|| {
450            self.0.snapshot().get(key).map_err(DatabaseError::backend)
451        })
452    }
453
454    async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
455        panic!("Cannot remove from a read only transaction");
456    }
457
458    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
459        Ok(fedimint_core::runtime::block_in_place(|| {
460            let range = Range {
461                start: range.start.to_vec(),
462                end: range.end.to_vec(),
463            };
464            let mut options = rocksdb::ReadOptions::default();
465            options.set_iterate_range(range.clone());
466            let iter = self.0.snapshot().iterator_opt(
467                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
468                options,
469            );
470            let rocksdb_iter = iter.map_while(move |res| {
471                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
472                (key_bytes.as_ref() < range.end.as_slice())
473                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
474            });
475            Box::pin(convert_to_async_stream(rocksdb_iter))
476        }))
477    }
478
479    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
480        Ok(fedimint_core::runtime::block_in_place(|| {
481            let prefix = key_prefix.to_vec();
482            let mut options = rocksdb::ReadOptions::default();
483            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
484            let iter = self.0.snapshot().iterator_opt(
485                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
486                options,
487            );
488            let rocksdb_iter = iter.map_while(move |res| {
489                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
490                key_bytes
491                    .starts_with(&prefix)
492                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
493            });
494            Box::pin(convert_to_async_stream(rocksdb_iter))
495        }))
496    }
497
498    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
499        panic!("Cannot remove from a read only transaction");
500    }
501
502    async fn raw_find_by_prefix_sorted_descending(
503        &mut self,
504        key_prefix: &[u8],
505    ) -> DatabaseResult<PrefixStream<'_>> {
506        let prefix = key_prefix.to_vec();
507        let next_prefix = next_prefix(&prefix);
508        let iterator_mode = if let Some(next_prefix) = &next_prefix {
509            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
510        } else {
511            rocksdb::IteratorMode::End
512        };
513        Ok(fedimint_core::runtime::block_in_place(|| {
514            let mut options = rocksdb::ReadOptions::default();
515            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
516            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
517            let rocksdb_iter = iter.map_while(move |res| {
518                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
519                key_bytes
520                    .starts_with(&prefix)
521                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
522            });
523            Box::pin(stream::iter(rocksdb_iter))
524        }))
525    }
526}
527
528impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
529
530#[async_trait]
531impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
532    async fn commit_tx(self) -> DatabaseResult<()> {
533        panic!("Cannot commit a read only transaction");
534    }
535}
536
537#[cfg(test)]
538mod fedimint_rocksdb_tests {
539    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
540    use fedimint_core::encoding::{Decodable, Encodable};
541    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
542    use fedimint_core::{impl_db_lookup, impl_db_record};
543    use futures::StreamExt;
544
545    use super::*;
546
547    fn open_temp_db(temp_path: &str) -> Database {
548        let path = tempfile::Builder::new()
549            .prefix(temp_path)
550            .tempdir()
551            .unwrap();
552
553        Database::new(
554            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
555            ModuleDecoderRegistry::default(),
556        )
557    }
558
559    #[tokio::test(flavor = "multi_thread")]
560    async fn test_dbtx_insert_elements() {
561        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
562            .await;
563    }
564
565    #[tokio::test(flavor = "multi_thread")]
566    async fn test_dbtx_remove_nonexisting() {
567        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
568            "fcb-rocksdb-test-remove-nonexisting",
569        ))
570        .await;
571    }
572
573    #[tokio::test(flavor = "multi_thread")]
574    async fn test_dbtx_remove_existing() {
575        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
576            .await;
577    }
578
579    #[tokio::test(flavor = "multi_thread")]
580    async fn test_dbtx_read_own_writes() {
581        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
582            .await;
583    }
584
585    #[tokio::test(flavor = "multi_thread")]
586    async fn test_dbtx_prevent_dirty_reads() {
587        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
588            "fcb-rocksdb-test-prevent-dirty-reads",
589        ))
590        .await;
591    }
592
593    #[tokio::test(flavor = "multi_thread")]
594    async fn test_dbtx_find_by_range() {
595        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
596            .await;
597    }
598
599    #[tokio::test(flavor = "multi_thread")]
600    async fn test_dbtx_find_by_prefix() {
601        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
602            .await;
603    }
604
605    #[tokio::test(flavor = "multi_thread")]
606    async fn test_dbtx_commit() {
607        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
608    }
609
610    #[tokio::test(flavor = "multi_thread")]
611    async fn test_dbtx_prevent_nonrepeatable_reads() {
612        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
613            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
614        ))
615        .await;
616    }
617
618    #[tokio::test(flavor = "multi_thread")]
619    async fn test_dbtx_snapshot_isolation() {
620        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
621            "fcb-rocksdb-test-snapshot-isolation",
622        ))
623        .await;
624    }
625
626    #[tokio::test(flavor = "multi_thread")]
627    async fn test_dbtx_phantom_entry() {
628        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
629            .await;
630    }
631
632    #[tokio::test(flavor = "multi_thread")]
633    async fn test_dbtx_write_conflict() {
634        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
635            .await;
636    }
637
638    /// Test that concurrent transaction conflicts are handled gracefully
639    /// with autocommit retry logic instead of panicking.
640    #[tokio::test(flavor = "multi_thread")]
641    async fn test_concurrent_transaction_conflict_with_autocommit() {
642        use std::sync::Arc;
643
644        let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
645
646        // Spawn multiple concurrent tasks that all write to the same key
647        // This will trigger optimistic transaction conflicts
648        let mut handles = Vec::new();
649
650        for i in 0u64..10 {
651            let db_clone = Arc::clone(&db);
652            let handle =
653                fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
654                    for j in 0u64..10 {
655                        // Use autocommit which handles retriable errors with retry logic
656                        let result = db_clone
657                            .autocommit::<_, _, anyhow::Error>(
658                                |dbtx, _| {
659                                    #[allow(clippy::cast_possible_truncation)]
660                                    let val = (i * 100 + j) as u8;
661                                    Box::pin(async move {
662                                        // All transactions write to the same key to force conflicts
663                                        dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
664                                            .await;
665                                        Ok(())
666                                    })
667                                },
668                                None, // unlimited retries
669                            )
670                            .await;
671
672                        // Should succeed after retries, must NOT panic with "Resource busy"
673                        assert!(
674                            result.is_ok(),
675                            "Transaction should succeed after retries, got: {result:?}",
676                        );
677                    }
678                });
679            handles.push(handle);
680        }
681
682        // Wait for all tasks - none should panic
683        for handle in handles {
684            handle.await.expect("Task should not panic");
685        }
686    }
687
688    #[tokio::test(flavor = "multi_thread")]
689    async fn test_dbtx_remove_by_prefix() {
690        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
691            "fcb-rocksdb-test-remove-by-prefix",
692        ))
693        .await;
694    }
695
696    #[tokio::test(flavor = "multi_thread")]
697    async fn test_module_dbtx() {
698        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
699            .await;
700    }
701
702    #[tokio::test(flavor = "multi_thread")]
703    async fn test_module_db() {
704        let module_instance_id = 1;
705        let path = tempfile::Builder::new()
706            .prefix("fcb-rocksdb-test-module-db-prefix")
707            .tempdir()
708            .unwrap();
709
710        let module_db = Database::new(
711            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
712            ModuleDecoderRegistry::default(),
713        );
714
715        fedimint_core::db::verify_module_db(
716            open_temp_db("fcb-rocksdb-test-module-db"),
717            module_db.with_prefix_module_id(module_instance_id).0,
718        )
719        .await;
720    }
721
722    #[test]
723    fn test_next_prefix() {
724        // Note: although we are testing the general case of a vector with N elements,
725        // the prefixes currently use N = 1
726        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
727        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
728        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
729        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
730        // this is a "max" prefix
731        assert!(next_prefix(&[255, 255, 255]).is_none());
732        // these are the common case
733        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
734        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
735        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
736    }
737
738    #[repr(u8)]
739    #[derive(Clone)]
740    pub enum TestDbKeyPrefix {
741        Test = 254,
742        MaxTest = 255,
743    }
744
745    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
746    pub(super) struct TestKey(pub Vec<u8>);
747
748    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
749    pub(super) struct TestVal(pub Vec<u8>);
750
751    #[derive(Debug, Encodable, Decodable)]
752    struct DbPrefixTestPrefix;
753
754    impl_db_record!(
755        key = TestKey,
756        value = TestVal,
757        db_prefix = TestDbKeyPrefix::Test,
758        notify_on_modify = true,
759    );
760    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
761
762    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
763    pub(super) struct TestKey2(pub Vec<u8>);
764
765    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
766    pub(super) struct TestVal2(pub Vec<u8>);
767
768    #[derive(Debug, Encodable, Decodable)]
769    struct DbPrefixTestPrefixMax;
770
771    impl_db_record!(
772        key = TestKey2,
773        value = TestVal2,
774        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
775        notify_on_modify = true,
776    );
777    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
778
779    #[tokio::test(flavor = "multi_thread")]
780    async fn test_retrieve_descending_order() {
781        let path = tempfile::Builder::new()
782            .prefix("fcb-rocksdb-test-descending-order")
783            .tempdir()
784            .unwrap();
785        {
786            let db = Database::new(
787                RocksDb::build(&path).open().await.unwrap(),
788                ModuleDecoderRegistry::default(),
789            );
790            let mut dbtx = db.begin_transaction().await;
791            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
792                .await;
793            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
794                .await;
795            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
796                .await;
797            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
798                .await;
799            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
800                .await;
801            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
802                .await;
803            let query = dbtx
804                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
805                .await
806                .collect::<Vec<_>>()
807                .await;
808            assert_eq!(
809                query,
810                vec![
811                    (TestKey(vec![255]), TestVal(vec![2])),
812                    (TestKey(vec![254]), TestVal(vec![1])),
813                    (TestKey(vec![0]), TestVal(vec![3]))
814                ]
815            );
816            let query = dbtx
817                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
818                .await
819                .collect::<Vec<_>>()
820                .await;
821            assert_eq!(
822                query,
823                vec![
824                    (TestKey2(vec![255]), TestVal2(vec![2])),
825                    (TestKey2(vec![254]), TestVal2(vec![1])),
826                    (TestKey2(vec![0]), TestVal2(vec![3]))
827                ]
828            );
829            dbtx.commit_tx().await;
830        }
831        // Test readonly implementation
832        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
833        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
834        let mut dbtx = db_readonly.begin_transaction_nc().await;
835        let query = dbtx
836            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
837            .await
838            .collect::<Vec<_>>()
839            .await;
840        assert_eq!(
841            query,
842            vec![
843                (TestKey(vec![255]), TestVal(vec![2])),
844                (TestKey(vec![254]), TestVal(vec![1])),
845                (TestKey(vec![0]), TestVal(vec![3]))
846            ]
847        );
848        let query = dbtx
849            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
850            .await
851            .collect::<Vec<_>>()
852            .await;
853        assert_eq!(
854            query,
855            vec![
856                (TestKey2(vec![255]), TestVal2(vec![2])),
857                (TestKey2(vec![254]), TestVal2(vec![1])),
858                (TestKey2(vec![0]), TestVal2(vec![3]))
859            ]
860        );
861    }
862}