fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52    /// Open the database using blocking IO
53    #[builder(start_fn = build)]
54    #[builder(finish_fn = open_blocking)]
55    pub fn open_blocking(
56        #[builder(start_fn)] db_path: impl AsRef<Path>,
57        /// Relaxed consistency allows opening the database
58        /// even if the wal got corrupted.
59        relaxed_consistency: Option<bool>,
60    ) -> anyhow::Result<Locked<RocksDb>> {
61        let db_path = db_path.as_ref();
62
63        block_in_place(|| {
64            std::fs::create_dir_all(
65                db_path
66                    .parent()
67                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68            )?;
69            LockedBuilder::new(db_path)?.with_db(|| {
70                Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71            })
72        })
73    }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78    S: rocks_db_open_blocking_builder::State,
79    I1: std::convert::AsRef<std::path::Path>,
80{
81    /// Open the database
82    #[allow(clippy::unused_async)]
83    pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84        block_in_place(|| self.open_blocking())
85    }
86}
87
88impl RocksDb {
89    fn open_blocking_unlocked(
90        db_path: &Path,
91        relaxed_consistency: bool,
92    ) -> anyhow::Result<RocksDb> {
93        let mut opts = get_default_options()?;
94        if relaxed_consistency {
95            // https://github.com/fedimint/fedimint/issues/8072
96            opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97        } else {
98            // Since we turned synchronous writes one we should never encounter a corrupted
99            // WAL and should rather fail in this case
100            opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101        }
102        let db: rocksdb::OptimisticTransactionDB =
103            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104        Ok(RocksDb(db))
105    }
106
107    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108        &self.0
109    }
110}
111
112// TODO: Remove this and inline it in the places where it's used.
113fn is_power_of_two(num: usize) -> bool {
114    num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.write_str("RocksDbTransaction")
120    }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.write_str("RocksDbTransaction")
126    }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131    assert!(!is_power_of_two(0));
132    assert!(is_power_of_two(1));
133    assert!(is_power_of_two(2));
134    assert!(!is_power_of_two(3));
135    assert!(is_power_of_two(4));
136    assert!(!is_power_of_two(5));
137    assert!(is_power_of_two(2 << 10));
138    assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141fn get_default_options() -> anyhow::Result<rocksdb::Options> {
142    let mut opts = rocksdb::Options::default();
143    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
144        debug!(var, "Using custom write buffer size");
145        let size: usize = FromStr::from_str(&var)
146            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
147        if !is_power_of_two(size) {
148            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
149        }
150        opts.set_write_buffer_size(size);
151    }
152    opts.create_if_missing(true);
153    Ok(opts)
154}
155
156#[derive(Debug)]
157pub struct RocksDbReadOnly(rocksdb::DB);
158
159pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
160
161impl RocksDbReadOnly {
162    #[allow(clippy::unused_async)]
163    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
164        let db_path = db_path.as_ref();
165        block_in_place(|| Self::open_read_only_blocking(db_path))
166    }
167
168    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
169        let opts = get_default_options()?;
170        // Note: rocksdb is OK if one process has write access, and other read-access
171        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
172        Ok(RocksDbReadOnly(db))
173    }
174}
175
176impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
177    fn from(db: OptimisticTransactionDB) -> Self {
178        RocksDb(db)
179    }
180}
181
182impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
183    fn from(db: RocksDb) -> Self {
184        db.0
185    }
186}
187
188// When finding by prefix iterating in Reverse order, we need to start from
189// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
190// below.
191// Will return None if there is no next prefix (i.e prefix is already the last
192// possible/max one)
193fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
194    let mut next_prefix = prefix.to_vec();
195    let mut is_last_prefix = true;
196    for i in (0..next_prefix.len()).rev() {
197        next_prefix[i] = next_prefix[i].wrapping_add(1);
198        if next_prefix[i] > 0 {
199            is_last_prefix = false;
200            break;
201        }
202    }
203    if is_last_prefix {
204        // The given prefix is already the last/max prefix, so there is no next prefix,
205        // return None to represent that
206        None
207    } else {
208        Some(next_prefix)
209    }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDb {
214    type Transaction<'a> = RocksDbTransaction<'a>;
215    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
216        let mut optimistic_options = OptimisticTransactionOptions::default();
217        optimistic_options.set_snapshot(true);
218
219        let mut write_options = WriteOptions::default();
220        // Make sure we never lose data on unclean shutdown
221        write_options.set_sync(true);
222
223        let mut rocksdb_tx =
224            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
225        rocksdb_tx
226            .set_tx_savepoint()
227            .await
228            .expect("setting tx savepoint failed");
229
230        rocksdb_tx
231    }
232
233    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
234        let checkpoint =
235            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
236        checkpoint
237            .create_checkpoint(backup_path)
238            .map_err(DatabaseError::backend)?;
239        Ok(())
240    }
241}
242
243#[async_trait]
244impl IRawDatabase for RocksDbReadOnly {
245    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
246    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
247        RocksDbReadOnlyTransaction(&self.0)
248    }
249
250    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
251        let checkpoint =
252            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
253        checkpoint
254            .create_checkpoint(backup_path)
255            .map_err(DatabaseError::backend)?;
256        Ok(())
257    }
258}
259
260#[async_trait]
261impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
262    async fn raw_insert_bytes(
263        &mut self,
264        key: &[u8],
265        value: &[u8],
266    ) -> DatabaseResult<Option<Vec<u8>>> {
267        fedimint_core::runtime::block_in_place(|| {
268            let val = self.0.snapshot().get(key).unwrap();
269            self.0.put(key, value).map_err(DatabaseError::backend)?;
270            Ok(val)
271        })
272    }
273
274    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
275        fedimint_core::runtime::block_in_place(|| {
276            self.0.snapshot().get(key).map_err(DatabaseError::backend)
277        })
278    }
279
280    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
281        fedimint_core::runtime::block_in_place(|| {
282            let val = self.0.snapshot().get(key).unwrap();
283            self.0.delete(key).map_err(DatabaseError::backend)?;
284            Ok(val)
285        })
286    }
287
288    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
289        Ok(fedimint_core::runtime::block_in_place(|| {
290            let prefix = key_prefix.to_vec();
291            let mut options = rocksdb::ReadOptions::default();
292            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
293            let iter = self.0.snapshot().iterator_opt(
294                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
295                options,
296            );
297            let rocksdb_iter = iter.map_while(move |res| {
298                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
299                key_bytes
300                    .starts_with(&prefix)
301                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
302            });
303            Box::pin(convert_to_async_stream(rocksdb_iter))
304        }))
305    }
306
307    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
308        Ok(fedimint_core::runtime::block_in_place(|| {
309            let range = Range {
310                start: range.start.to_vec(),
311                end: range.end.to_vec(),
312            };
313            let mut options = rocksdb::ReadOptions::default();
314            options.set_iterate_range(range.clone());
315            let iter = self.0.snapshot().iterator_opt(
316                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
317                options,
318            );
319            let rocksdb_iter = iter.map_while(move |res| {
320                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
321                (key_bytes.as_ref() < range.end.as_slice())
322                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
323            });
324            Box::pin(convert_to_async_stream(rocksdb_iter))
325        }))
326    }
327
328    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
329        fedimint_core::runtime::block_in_place(|| {
330            // Note: delete_range is not supported in Transactions :/
331            let mut options = rocksdb::ReadOptions::default();
332            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
333            let iter = self
334                .0
335                .snapshot()
336                .iterator_opt(
337                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
338                    options,
339                )
340                .map_while(|res| {
341                    res.map(|(key_bytes, _)| {
342                        key_bytes
343                            .starts_with(key_prefix)
344                            .then_some(key_bytes.to_vec())
345                    })
346                    .transpose()
347                });
348
349            for item in iter {
350                let key = item.map_err(DatabaseError::backend)?;
351                self.0.delete(key).map_err(DatabaseError::backend)?;
352            }
353
354            Ok(())
355        })
356    }
357
358    async fn raw_find_by_prefix_sorted_descending(
359        &mut self,
360        key_prefix: &[u8],
361    ) -> DatabaseResult<PrefixStream<'_>> {
362        let prefix = key_prefix.to_vec();
363        let next_prefix = next_prefix(&prefix);
364        let iterator_mode = if let Some(next_prefix) = &next_prefix {
365            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
366        } else {
367            rocksdb::IteratorMode::End
368        };
369        Ok(fedimint_core::runtime::block_in_place(|| {
370            let mut options = rocksdb::ReadOptions::default();
371            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
372            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
373            let rocksdb_iter = iter.map_while(move |res| {
374                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
375                key_bytes
376                    .starts_with(&prefix)
377                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
378            });
379            Box::pin(convert_to_async_stream(rocksdb_iter))
380        }))
381    }
382}
383
384#[async_trait]
385impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
386    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
387        fedimint_core::runtime::block_in_place(|| {
388            self.0
389                .rollback_to_savepoint()
390                .map_err(DatabaseError::backend)
391        })
392    }
393
394    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
395        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
396
397        Ok(())
398    }
399}
400
401#[async_trait]
402impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
403    async fn commit_tx(self) -> DatabaseResult<()> {
404        fedimint_core::runtime::block_in_place(|| {
405            match self.0.commit() {
406                Ok(()) => Ok(()),
407                Err(err) => {
408                    // RocksDB's OptimisticTransactionDB can return Busy/TryAgain errors
409                    // when concurrent transactions conflict on the same keys.
410                    // These are retriable - return WriteConflict so autocommit retries.
411                    // See: https://github.com/fedimint/fedimint/issues/8077
412                    match err.kind() {
413                        rocksdb::ErrorKind::Busy
414                        | rocksdb::ErrorKind::TryAgain
415                        | rocksdb::ErrorKind::MergeInProgress
416                        | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
417                        _ => Err(DatabaseError::backend(err)),
418                    }
419                }
420            }
421        })
422    }
423}
424
425#[async_trait]
426impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
427    async fn raw_insert_bytes(
428        &mut self,
429        _key: &[u8],
430        _value: &[u8],
431    ) -> DatabaseResult<Option<Vec<u8>>> {
432        panic!("Cannot insert into a read only transaction");
433    }
434
435    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
436        fedimint_core::runtime::block_in_place(|| {
437            self.0.snapshot().get(key).map_err(DatabaseError::backend)
438        })
439    }
440
441    async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
442        panic!("Cannot remove from a read only transaction");
443    }
444
445    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
446        Ok(fedimint_core::runtime::block_in_place(|| {
447            let range = Range {
448                start: range.start.to_vec(),
449                end: range.end.to_vec(),
450            };
451            let mut options = rocksdb::ReadOptions::default();
452            options.set_iterate_range(range.clone());
453            let iter = self.0.snapshot().iterator_opt(
454                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
455                options,
456            );
457            let rocksdb_iter = iter.map_while(move |res| {
458                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
459                (key_bytes.as_ref() < range.end.as_slice())
460                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
461            });
462            Box::pin(convert_to_async_stream(rocksdb_iter))
463        }))
464    }
465
466    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
467        Ok(fedimint_core::runtime::block_in_place(|| {
468            let prefix = key_prefix.to_vec();
469            let mut options = rocksdb::ReadOptions::default();
470            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
471            let iter = self.0.snapshot().iterator_opt(
472                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
473                options,
474            );
475            let rocksdb_iter = iter.map_while(move |res| {
476                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
477                key_bytes
478                    .starts_with(&prefix)
479                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
480            });
481            Box::pin(convert_to_async_stream(rocksdb_iter))
482        }))
483    }
484
485    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
486        panic!("Cannot remove from a read only transaction");
487    }
488
489    async fn raw_find_by_prefix_sorted_descending(
490        &mut self,
491        key_prefix: &[u8],
492    ) -> DatabaseResult<PrefixStream<'_>> {
493        let prefix = key_prefix.to_vec();
494        let next_prefix = next_prefix(&prefix);
495        let iterator_mode = if let Some(next_prefix) = &next_prefix {
496            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
497        } else {
498            rocksdb::IteratorMode::End
499        };
500        Ok(fedimint_core::runtime::block_in_place(|| {
501            let mut options = rocksdb::ReadOptions::default();
502            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
503            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
504            let rocksdb_iter = iter.map_while(move |res| {
505                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
506                key_bytes
507                    .starts_with(&prefix)
508                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
509            });
510            Box::pin(stream::iter(rocksdb_iter))
511        }))
512    }
513}
514
515#[async_trait]
516impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
517    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
518        panic!("Cannot rollback a read only transaction");
519    }
520
521    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
522        panic!("Cannot set a savepoint in a read only transaction");
523    }
524}
525
526#[async_trait]
527impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
528    async fn commit_tx(self) -> DatabaseResult<()> {
529        panic!("Cannot commit a read only transaction");
530    }
531}
532
533#[cfg(test)]
534mod fedimint_rocksdb_tests {
535    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
536    use fedimint_core::encoding::{Decodable, Encodable};
537    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
538    use fedimint_core::{impl_db_lookup, impl_db_record};
539    use futures::StreamExt;
540
541    use super::*;
542
543    fn open_temp_db(temp_path: &str) -> Database {
544        let path = tempfile::Builder::new()
545            .prefix(temp_path)
546            .tempdir()
547            .unwrap();
548
549        Database::new(
550            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
551            ModuleDecoderRegistry::default(),
552        )
553    }
554
555    #[tokio::test(flavor = "multi_thread")]
556    async fn test_dbtx_insert_elements() {
557        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
558            .await;
559    }
560
561    #[tokio::test(flavor = "multi_thread")]
562    async fn test_dbtx_remove_nonexisting() {
563        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
564            "fcb-rocksdb-test-remove-nonexisting",
565        ))
566        .await;
567    }
568
569    #[tokio::test(flavor = "multi_thread")]
570    async fn test_dbtx_remove_existing() {
571        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
572            .await;
573    }
574
575    #[tokio::test(flavor = "multi_thread")]
576    async fn test_dbtx_read_own_writes() {
577        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
578            .await;
579    }
580
581    #[tokio::test(flavor = "multi_thread")]
582    async fn test_dbtx_prevent_dirty_reads() {
583        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
584            "fcb-rocksdb-test-prevent-dirty-reads",
585        ))
586        .await;
587    }
588
589    #[tokio::test(flavor = "multi_thread")]
590    async fn test_dbtx_find_by_range() {
591        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
592            .await;
593    }
594
595    #[tokio::test(flavor = "multi_thread")]
596    async fn test_dbtx_find_by_prefix() {
597        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
598            .await;
599    }
600
601    #[tokio::test(flavor = "multi_thread")]
602    async fn test_dbtx_commit() {
603        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
604    }
605
606    #[tokio::test(flavor = "multi_thread")]
607    async fn test_dbtx_prevent_nonrepeatable_reads() {
608        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
609            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
610        ))
611        .await;
612    }
613
614    #[tokio::test(flavor = "multi_thread")]
615    async fn test_dbtx_snapshot_isolation() {
616        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
617            "fcb-rocksdb-test-snapshot-isolation",
618        ))
619        .await;
620    }
621
622    #[tokio::test(flavor = "multi_thread")]
623    async fn test_dbtx_rollback_to_savepoint() {
624        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
625            "fcb-rocksdb-test-rollback-to-savepoint",
626        ))
627        .await;
628    }
629
630    #[tokio::test(flavor = "multi_thread")]
631    async fn test_dbtx_phantom_entry() {
632        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
633            .await;
634    }
635
636    #[tokio::test(flavor = "multi_thread")]
637    async fn test_dbtx_write_conflict() {
638        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
639            .await;
640    }
641
642    /// Test that concurrent transaction conflicts are handled gracefully
643    /// with autocommit retry logic instead of panicking.
644    #[tokio::test(flavor = "multi_thread")]
645    async fn test_concurrent_transaction_conflict_with_autocommit() {
646        use std::sync::Arc;
647
648        let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
649
650        // Spawn multiple concurrent tasks that all write to the same key
651        // This will trigger optimistic transaction conflicts
652        let mut handles = Vec::new();
653
654        for i in 0u64..10 {
655            let db_clone = Arc::clone(&db);
656            let handle =
657                fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
658                    for j in 0u64..10 {
659                        // Use autocommit which handles retriable errors with retry logic
660                        let result = db_clone
661                            .autocommit::<_, _, anyhow::Error>(
662                                |dbtx, _| {
663                                    #[allow(clippy::cast_possible_truncation)]
664                                    let val = (i * 100 + j) as u8;
665                                    Box::pin(async move {
666                                        // All transactions write to the same key to force conflicts
667                                        dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
668                                            .await;
669                                        Ok(())
670                                    })
671                                },
672                                None, // unlimited retries
673                            )
674                            .await;
675
676                        // Should succeed after retries, must NOT panic with "Resource busy"
677                        assert!(
678                            result.is_ok(),
679                            "Transaction should succeed after retries, got: {result:?}",
680                        );
681                    }
682                });
683            handles.push(handle);
684        }
685
686        // Wait for all tasks - none should panic
687        for handle in handles {
688            handle.await.expect("Task should not panic");
689        }
690    }
691
692    #[tokio::test(flavor = "multi_thread")]
693    async fn test_dbtx_remove_by_prefix() {
694        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
695            "fcb-rocksdb-test-remove-by-prefix",
696        ))
697        .await;
698    }
699
700    #[tokio::test(flavor = "multi_thread")]
701    async fn test_module_dbtx() {
702        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
703            .await;
704    }
705
706    #[tokio::test(flavor = "multi_thread")]
707    async fn test_module_db() {
708        let module_instance_id = 1;
709        let path = tempfile::Builder::new()
710            .prefix("fcb-rocksdb-test-module-db-prefix")
711            .tempdir()
712            .unwrap();
713
714        let module_db = Database::new(
715            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
716            ModuleDecoderRegistry::default(),
717        );
718
719        fedimint_core::db::verify_module_db(
720            open_temp_db("fcb-rocksdb-test-module-db"),
721            module_db.with_prefix_module_id(module_instance_id).0,
722        )
723        .await;
724    }
725
726    #[test]
727    fn test_next_prefix() {
728        // Note: although we are testing the general case of a vector with N elements,
729        // the prefixes currently use N = 1
730        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
731        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
732        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
733        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
734        // this is a "max" prefix
735        assert!(next_prefix(&[255, 255, 255]).is_none());
736        // these are the common case
737        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
738        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
739        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
740    }
741
742    #[repr(u8)]
743    #[derive(Clone)]
744    pub enum TestDbKeyPrefix {
745        Test = 254,
746        MaxTest = 255,
747    }
748
749    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
750    pub(super) struct TestKey(pub Vec<u8>);
751
752    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
753    pub(super) struct TestVal(pub Vec<u8>);
754
755    #[derive(Debug, Encodable, Decodable)]
756    struct DbPrefixTestPrefix;
757
758    impl_db_record!(
759        key = TestKey,
760        value = TestVal,
761        db_prefix = TestDbKeyPrefix::Test,
762        notify_on_modify = true,
763    );
764    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
765
766    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
767    pub(super) struct TestKey2(pub Vec<u8>);
768
769    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
770    pub(super) struct TestVal2(pub Vec<u8>);
771
772    #[derive(Debug, Encodable, Decodable)]
773    struct DbPrefixTestPrefixMax;
774
775    impl_db_record!(
776        key = TestKey2,
777        value = TestVal2,
778        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
779        notify_on_modify = true,
780    );
781    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
782
783    #[tokio::test(flavor = "multi_thread")]
784    async fn test_retrieve_descending_order() {
785        let path = tempfile::Builder::new()
786            .prefix("fcb-rocksdb-test-descending-order")
787            .tempdir()
788            .unwrap();
789        {
790            let db = Database::new(
791                RocksDb::build(&path).open().await.unwrap(),
792                ModuleDecoderRegistry::default(),
793            );
794            let mut dbtx = db.begin_transaction().await;
795            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
796                .await;
797            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
798                .await;
799            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
800                .await;
801            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
802                .await;
803            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
804                .await;
805            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
806                .await;
807            let query = dbtx
808                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
809                .await
810                .collect::<Vec<_>>()
811                .await;
812            assert_eq!(
813                query,
814                vec![
815                    (TestKey(vec![255]), TestVal(vec![2])),
816                    (TestKey(vec![254]), TestVal(vec![1])),
817                    (TestKey(vec![0]), TestVal(vec![3]))
818                ]
819            );
820            let query = dbtx
821                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
822                .await
823                .collect::<Vec<_>>()
824                .await;
825            assert_eq!(
826                query,
827                vec![
828                    (TestKey2(vec![255]), TestVal2(vec![2])),
829                    (TestKey2(vec![254]), TestVal2(vec![1])),
830                    (TestKey2(vec![0]), TestVal2(vec![3]))
831                ]
832            );
833            dbtx.commit_tx().await;
834        }
835        // Test readonly implementation
836        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
837        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
838        let mut dbtx = db_readonly.begin_transaction_nc().await;
839        let query = dbtx
840            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
841            .await
842            .collect::<Vec<_>>()
843            .await;
844        assert_eq!(
845            query,
846            vec![
847                (TestKey(vec![255]), TestVal(vec![2])),
848                (TestKey(vec![254]), TestVal(vec![1])),
849                (TestKey(vec![0]), TestVal(vec![3]))
850            ]
851        );
852        let query = dbtx
853            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
854            .await
855            .collect::<Vec<_>>()
856            .await;
857        assert_eq!(
858            query,
859            vec![
860                (TestKey2(vec![255]), TestVal2(vec![2])),
861                (TestKey2(vec![254]), TestVal2(vec![1])),
862                (TestKey2(vec![0]), TestVal2(vec![3]))
863            ]
864        );
865    }
866}