fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52    /// Open the database using blocking IO
53    #[builder(start_fn = build)]
54    #[builder(finish_fn = open_blocking)]
55    pub fn open_blocking(
56        #[builder(start_fn)] db_path: impl AsRef<Path>,
57        /// Relaxed consistency allows opening the database
58        /// even if the wal got corrupted.
59        relaxed_consistency: Option<bool>,
60    ) -> anyhow::Result<Locked<RocksDb>> {
61        let db_path = db_path.as_ref();
62
63        block_in_place(|| {
64            std::fs::create_dir_all(
65                db_path
66                    .parent()
67                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68            )?;
69            LockedBuilder::new(db_path)?.with_db(|| {
70                Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71            })
72        })
73    }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78    S: rocks_db_open_blocking_builder::State,
79    I1: std::convert::AsRef<std::path::Path>,
80{
81    /// Open the database
82    #[allow(clippy::unused_async)]
83    pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84        block_in_place(|| self.open_blocking())
85    }
86}
87
88impl RocksDb {
89    fn open_blocking_unlocked(
90        db_path: &Path,
91        relaxed_consistency: bool,
92    ) -> anyhow::Result<RocksDb> {
93        let mut opts = get_default_options()?;
94        if relaxed_consistency {
95            // https://github.com/fedimint/fedimint/issues/8072
96            opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97        } else {
98            // Since we turned synchronous writes one we should never encounter a corrupted
99            // WAL and should rather fail in this case
100            opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101        }
102        let db: rocksdb::OptimisticTransactionDB =
103            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104        Ok(RocksDb(db))
105    }
106
107    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108        &self.0
109    }
110}
111
112// TODO: Remove this and inline it in the places where it's used.
113fn is_power_of_two(num: usize) -> bool {
114    num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.write_str("RocksDbTransaction")
120    }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.write_str("RocksDbTransaction")
126    }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131    assert!(!is_power_of_two(0));
132    assert!(is_power_of_two(1));
133    assert!(is_power_of_two(2));
134    assert!(!is_power_of_two(3));
135    assert!(is_power_of_two(4));
136    assert!(!is_power_of_two(5));
137    assert!(is_power_of_two(2 << 10));
138    assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141fn get_default_options() -> anyhow::Result<rocksdb::Options> {
142    let mut opts = rocksdb::Options::default();
143    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
144        debug!(var, "Using custom write buffer size");
145        let size: usize = FromStr::from_str(&var)
146            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
147        if !is_power_of_two(size) {
148            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
149        }
150        opts.set_write_buffer_size(size);
151    }
152    opts.create_if_missing(true);
153    Ok(opts)
154}
155
156#[derive(Debug)]
157pub struct RocksDbReadOnly(rocksdb::DB);
158
159pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
160
161impl RocksDbReadOnly {
162    #[allow(clippy::unused_async)]
163    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
164        let db_path = db_path.as_ref();
165        block_in_place(|| Self::open_read_only_blocking(db_path))
166    }
167
168    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
169        let opts = get_default_options()?;
170        // Note: rocksdb is OK if one process has write access, and other read-access
171        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
172        Ok(RocksDbReadOnly(db))
173    }
174}
175
176impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
177    fn from(db: OptimisticTransactionDB) -> Self {
178        RocksDb(db)
179    }
180}
181
182impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
183    fn from(db: RocksDb) -> Self {
184        db.0
185    }
186}
187
188// When finding by prefix iterating in Reverse order, we need to start from
189// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
190// below.
191// Will return None if there is no next prefix (i.e prefix is already the last
192// possible/max one)
193fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
194    let mut next_prefix = prefix.to_vec();
195    let mut is_last_prefix = true;
196    for i in (0..next_prefix.len()).rev() {
197        next_prefix[i] = next_prefix[i].wrapping_add(1);
198        if next_prefix[i] > 0 {
199            is_last_prefix = false;
200            break;
201        }
202    }
203    if is_last_prefix {
204        // The given prefix is already the last/max prefix, so there is no next prefix,
205        // return None to represent that
206        None
207    } else {
208        Some(next_prefix)
209    }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDb {
214    type Transaction<'a> = RocksDbTransaction<'a>;
215    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
216        let mut optimistic_options = OptimisticTransactionOptions::default();
217        optimistic_options.set_snapshot(true);
218
219        let mut write_options = WriteOptions::default();
220        // Make sure we never lose data on unclean shutdown
221        write_options.set_sync(true);
222
223        RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options))
224    }
225
226    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
227        let checkpoint =
228            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
229        checkpoint
230            .create_checkpoint(backup_path)
231            .map_err(DatabaseError::backend)?;
232        Ok(())
233    }
234}
235
236#[async_trait]
237impl IRawDatabase for RocksDbReadOnly {
238    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
239    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
240        RocksDbReadOnlyTransaction(&self.0)
241    }
242
243    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
244        let checkpoint =
245            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
246        checkpoint
247            .create_checkpoint(backup_path)
248            .map_err(DatabaseError::backend)?;
249        Ok(())
250    }
251}
252
253#[async_trait]
254impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
255    async fn raw_insert_bytes(
256        &mut self,
257        key: &[u8],
258        value: &[u8],
259    ) -> DatabaseResult<Option<Vec<u8>>> {
260        fedimint_core::runtime::block_in_place(|| {
261            let val = self.0.snapshot().get(key).unwrap();
262            self.0.put(key, value).map_err(DatabaseError::backend)?;
263            Ok(val)
264        })
265    }
266
267    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
268        fedimint_core::runtime::block_in_place(|| {
269            self.0.snapshot().get(key).map_err(DatabaseError::backend)
270        })
271    }
272
273    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
274        fedimint_core::runtime::block_in_place(|| {
275            let val = self.0.snapshot().get(key).unwrap();
276            self.0.delete(key).map_err(DatabaseError::backend)?;
277            Ok(val)
278        })
279    }
280
281    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
282        Ok(fedimint_core::runtime::block_in_place(|| {
283            let prefix = key_prefix.to_vec();
284            let mut options = rocksdb::ReadOptions::default();
285            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
286            let iter = self.0.snapshot().iterator_opt(
287                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
288                options,
289            );
290            let rocksdb_iter = iter.map_while(move |res| {
291                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
292                key_bytes
293                    .starts_with(&prefix)
294                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
295            });
296            Box::pin(convert_to_async_stream(rocksdb_iter))
297        }))
298    }
299
300    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
301        Ok(fedimint_core::runtime::block_in_place(|| {
302            let range = Range {
303                start: range.start.to_vec(),
304                end: range.end.to_vec(),
305            };
306            let mut options = rocksdb::ReadOptions::default();
307            options.set_iterate_range(range.clone());
308            let iter = self.0.snapshot().iterator_opt(
309                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
310                options,
311            );
312            let rocksdb_iter = iter.map_while(move |res| {
313                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
314                (key_bytes.as_ref() < range.end.as_slice())
315                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
316            });
317            Box::pin(convert_to_async_stream(rocksdb_iter))
318        }))
319    }
320
321    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
322        fedimint_core::runtime::block_in_place(|| {
323            // Note: delete_range is not supported in Transactions :/
324            let mut options = rocksdb::ReadOptions::default();
325            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
326            let iter = self
327                .0
328                .snapshot()
329                .iterator_opt(
330                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
331                    options,
332                )
333                .map_while(|res| {
334                    res.map(|(key_bytes, _)| {
335                        key_bytes
336                            .starts_with(key_prefix)
337                            .then_some(key_bytes.to_vec())
338                    })
339                    .transpose()
340                });
341
342            for item in iter {
343                let key = item.map_err(DatabaseError::backend)?;
344                self.0.delete(key).map_err(DatabaseError::backend)?;
345            }
346
347            Ok(())
348        })
349    }
350
351    async fn raw_find_by_prefix_sorted_descending(
352        &mut self,
353        key_prefix: &[u8],
354    ) -> DatabaseResult<PrefixStream<'_>> {
355        let prefix = key_prefix.to_vec();
356        let next_prefix = next_prefix(&prefix);
357        let iterator_mode = if let Some(next_prefix) = &next_prefix {
358            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
359        } else {
360            rocksdb::IteratorMode::End
361        };
362        Ok(fedimint_core::runtime::block_in_place(|| {
363            let mut options = rocksdb::ReadOptions::default();
364            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
365            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
366            let rocksdb_iter = iter.map_while(move |res| {
367                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
368                key_bytes
369                    .starts_with(&prefix)
370                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
371            });
372            Box::pin(convert_to_async_stream(rocksdb_iter))
373        }))
374    }
375}
376
377impl IDatabaseTransactionOps for RocksDbTransaction<'_> {}
378
379#[async_trait]
380impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
381    async fn commit_tx(self) -> DatabaseResult<()> {
382        fedimint_core::runtime::block_in_place(|| {
383            match self.0.commit() {
384                Ok(()) => Ok(()),
385                Err(err) => {
386                    // RocksDB's OptimisticTransactionDB can return Busy/TryAgain errors
387                    // when concurrent transactions conflict on the same keys.
388                    // These are retriable - return WriteConflict so autocommit retries.
389                    // See: https://github.com/fedimint/fedimint/issues/8077
390                    match err.kind() {
391                        rocksdb::ErrorKind::Busy
392                        | rocksdb::ErrorKind::TryAgain
393                        | rocksdb::ErrorKind::MergeInProgress
394                        | rocksdb::ErrorKind::TimedOut => Err(DatabaseError::WriteConflict),
395                        _ => Err(DatabaseError::backend(err)),
396                    }
397                }
398            }
399        })
400    }
401}
402
403#[async_trait]
404impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
405    async fn raw_insert_bytes(
406        &mut self,
407        _key: &[u8],
408        _value: &[u8],
409    ) -> DatabaseResult<Option<Vec<u8>>> {
410        panic!("Cannot insert into a read only transaction");
411    }
412
413    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
414        fedimint_core::runtime::block_in_place(|| {
415            self.0.snapshot().get(key).map_err(DatabaseError::backend)
416        })
417    }
418
419    async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
420        panic!("Cannot remove from a read only transaction");
421    }
422
423    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
424        Ok(fedimint_core::runtime::block_in_place(|| {
425            let range = Range {
426                start: range.start.to_vec(),
427                end: range.end.to_vec(),
428            };
429            let mut options = rocksdb::ReadOptions::default();
430            options.set_iterate_range(range.clone());
431            let iter = self.0.snapshot().iterator_opt(
432                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
433                options,
434            );
435            let rocksdb_iter = iter.map_while(move |res| {
436                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
437                (key_bytes.as_ref() < range.end.as_slice())
438                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
439            });
440            Box::pin(convert_to_async_stream(rocksdb_iter))
441        }))
442    }
443
444    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
445        Ok(fedimint_core::runtime::block_in_place(|| {
446            let prefix = key_prefix.to_vec();
447            let mut options = rocksdb::ReadOptions::default();
448            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
449            let iter = self.0.snapshot().iterator_opt(
450                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
451                options,
452            );
453            let rocksdb_iter = iter.map_while(move |res| {
454                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
455                key_bytes
456                    .starts_with(&prefix)
457                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
458            });
459            Box::pin(convert_to_async_stream(rocksdb_iter))
460        }))
461    }
462
463    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
464        panic!("Cannot remove from a read only transaction");
465    }
466
467    async fn raw_find_by_prefix_sorted_descending(
468        &mut self,
469        key_prefix: &[u8],
470    ) -> DatabaseResult<PrefixStream<'_>> {
471        let prefix = key_prefix.to_vec();
472        let next_prefix = next_prefix(&prefix);
473        let iterator_mode = if let Some(next_prefix) = &next_prefix {
474            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
475        } else {
476            rocksdb::IteratorMode::End
477        };
478        Ok(fedimint_core::runtime::block_in_place(|| {
479            let mut options = rocksdb::ReadOptions::default();
480            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
481            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
482            let rocksdb_iter = iter.map_while(move |res| {
483                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
484                key_bytes
485                    .starts_with(&prefix)
486                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
487            });
488            Box::pin(stream::iter(rocksdb_iter))
489        }))
490    }
491}
492
493impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {}
494
495#[async_trait]
496impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
497    async fn commit_tx(self) -> DatabaseResult<()> {
498        panic!("Cannot commit a read only transaction");
499    }
500}
501
502#[cfg(test)]
503mod fedimint_rocksdb_tests {
504    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
505    use fedimint_core::encoding::{Decodable, Encodable};
506    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
507    use fedimint_core::{impl_db_lookup, impl_db_record};
508    use futures::StreamExt;
509
510    use super::*;
511
512    fn open_temp_db(temp_path: &str) -> Database {
513        let path = tempfile::Builder::new()
514            .prefix(temp_path)
515            .tempdir()
516            .unwrap();
517
518        Database::new(
519            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
520            ModuleDecoderRegistry::default(),
521        )
522    }
523
524    #[tokio::test(flavor = "multi_thread")]
525    async fn test_dbtx_insert_elements() {
526        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
527            .await;
528    }
529
530    #[tokio::test(flavor = "multi_thread")]
531    async fn test_dbtx_remove_nonexisting() {
532        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
533            "fcb-rocksdb-test-remove-nonexisting",
534        ))
535        .await;
536    }
537
538    #[tokio::test(flavor = "multi_thread")]
539    async fn test_dbtx_remove_existing() {
540        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
541            .await;
542    }
543
544    #[tokio::test(flavor = "multi_thread")]
545    async fn test_dbtx_read_own_writes() {
546        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
547            .await;
548    }
549
550    #[tokio::test(flavor = "multi_thread")]
551    async fn test_dbtx_prevent_dirty_reads() {
552        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
553            "fcb-rocksdb-test-prevent-dirty-reads",
554        ))
555        .await;
556    }
557
558    #[tokio::test(flavor = "multi_thread")]
559    async fn test_dbtx_find_by_range() {
560        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
561            .await;
562    }
563
564    #[tokio::test(flavor = "multi_thread")]
565    async fn test_dbtx_find_by_prefix() {
566        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
567            .await;
568    }
569
570    #[tokio::test(flavor = "multi_thread")]
571    async fn test_dbtx_commit() {
572        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
573    }
574
575    #[tokio::test(flavor = "multi_thread")]
576    async fn test_dbtx_prevent_nonrepeatable_reads() {
577        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
578            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
579        ))
580        .await;
581    }
582
583    #[tokio::test(flavor = "multi_thread")]
584    async fn test_dbtx_snapshot_isolation() {
585        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
586            "fcb-rocksdb-test-snapshot-isolation",
587        ))
588        .await;
589    }
590
591    #[tokio::test(flavor = "multi_thread")]
592    async fn test_dbtx_phantom_entry() {
593        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
594            .await;
595    }
596
597    #[tokio::test(flavor = "multi_thread")]
598    async fn test_dbtx_write_conflict() {
599        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
600            .await;
601    }
602
603    /// Test that concurrent transaction conflicts are handled gracefully
604    /// with autocommit retry logic instead of panicking.
605    #[tokio::test(flavor = "multi_thread")]
606    async fn test_concurrent_transaction_conflict_with_autocommit() {
607        use std::sync::Arc;
608
609        let db = Arc::new(open_temp_db("fcb-rocksdb-test-concurrent-conflict"));
610
611        // Spawn multiple concurrent tasks that all write to the same key
612        // This will trigger optimistic transaction conflicts
613        let mut handles = Vec::new();
614
615        for i in 0u64..10 {
616            let db_clone = Arc::clone(&db);
617            let handle =
618                fedimint_core::runtime::spawn("rocksdb-transient-error-test", async move {
619                    for j in 0u64..10 {
620                        // Use autocommit which handles retriable errors with retry logic
621                        let result = db_clone
622                            .autocommit::<_, _, anyhow::Error>(
623                                |dbtx, _| {
624                                    #[allow(clippy::cast_possible_truncation)]
625                                    let val = (i * 100 + j) as u8;
626                                    Box::pin(async move {
627                                        // All transactions write to the same key to force conflicts
628                                        dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![val]))
629                                            .await;
630                                        Ok(())
631                                    })
632                                },
633                                None, // unlimited retries
634                            )
635                            .await;
636
637                        // Should succeed after retries, must NOT panic with "Resource busy"
638                        assert!(
639                            result.is_ok(),
640                            "Transaction should succeed after retries, got: {result:?}",
641                        );
642                    }
643                });
644            handles.push(handle);
645        }
646
647        // Wait for all tasks - none should panic
648        for handle in handles {
649            handle.await.expect("Task should not panic");
650        }
651    }
652
653    #[tokio::test(flavor = "multi_thread")]
654    async fn test_dbtx_remove_by_prefix() {
655        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
656            "fcb-rocksdb-test-remove-by-prefix",
657        ))
658        .await;
659    }
660
661    #[tokio::test(flavor = "multi_thread")]
662    async fn test_module_dbtx() {
663        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
664            .await;
665    }
666
667    #[tokio::test(flavor = "multi_thread")]
668    async fn test_module_db() {
669        let module_instance_id = 1;
670        let path = tempfile::Builder::new()
671            .prefix("fcb-rocksdb-test-module-db-prefix")
672            .tempdir()
673            .unwrap();
674
675        let module_db = Database::new(
676            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
677            ModuleDecoderRegistry::default(),
678        );
679
680        fedimint_core::db::verify_module_db(
681            open_temp_db("fcb-rocksdb-test-module-db"),
682            module_db.with_prefix_module_id(module_instance_id).0,
683        )
684        .await;
685    }
686
687    #[test]
688    fn test_next_prefix() {
689        // Note: although we are testing the general case of a vector with N elements,
690        // the prefixes currently use N = 1
691        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
692        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
693        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
694        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
695        // this is a "max" prefix
696        assert!(next_prefix(&[255, 255, 255]).is_none());
697        // these are the common case
698        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
699        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
700        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
701    }
702
703    #[repr(u8)]
704    #[derive(Clone)]
705    pub enum TestDbKeyPrefix {
706        Test = 254,
707        MaxTest = 255,
708    }
709
710    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
711    pub(super) struct TestKey(pub Vec<u8>);
712
713    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
714    pub(super) struct TestVal(pub Vec<u8>);
715
716    #[derive(Debug, Encodable, Decodable)]
717    struct DbPrefixTestPrefix;
718
719    impl_db_record!(
720        key = TestKey,
721        value = TestVal,
722        db_prefix = TestDbKeyPrefix::Test,
723        notify_on_modify = true,
724    );
725    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
726
727    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
728    pub(super) struct TestKey2(pub Vec<u8>);
729
730    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
731    pub(super) struct TestVal2(pub Vec<u8>);
732
733    #[derive(Debug, Encodable, Decodable)]
734    struct DbPrefixTestPrefixMax;
735
736    impl_db_record!(
737        key = TestKey2,
738        value = TestVal2,
739        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
740        notify_on_modify = true,
741    );
742    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
743
744    #[tokio::test(flavor = "multi_thread")]
745    async fn test_retrieve_descending_order() {
746        let path = tempfile::Builder::new()
747            .prefix("fcb-rocksdb-test-descending-order")
748            .tempdir()
749            .unwrap();
750        {
751            let db = Database::new(
752                RocksDb::build(&path).open().await.unwrap(),
753                ModuleDecoderRegistry::default(),
754            );
755            let mut dbtx = db.begin_transaction().await;
756            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
757                .await;
758            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
759                .await;
760            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
761                .await;
762            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
763                .await;
764            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
765                .await;
766            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
767                .await;
768            let query = dbtx
769                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
770                .await
771                .collect::<Vec<_>>()
772                .await;
773            assert_eq!(
774                query,
775                vec![
776                    (TestKey(vec![255]), TestVal(vec![2])),
777                    (TestKey(vec![254]), TestVal(vec![1])),
778                    (TestKey(vec![0]), TestVal(vec![3]))
779                ]
780            );
781            let query = dbtx
782                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
783                .await
784                .collect::<Vec<_>>()
785                .await;
786            assert_eq!(
787                query,
788                vec![
789                    (TestKey2(vec![255]), TestVal2(vec![2])),
790                    (TestKey2(vec![254]), TestVal2(vec![1])),
791                    (TestKey2(vec![0]), TestVal2(vec![3]))
792                ]
793            );
794            dbtx.commit_tx().await;
795        }
796        // Test readonly implementation
797        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
798        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
799        let mut dbtx = db_readonly.begin_transaction_nc().await;
800        let query = dbtx
801            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
802            .await
803            .collect::<Vec<_>>()
804            .await;
805        assert_eq!(
806            query,
807            vec![
808                (TestKey(vec![255]), TestVal(vec![2])),
809                (TestKey(vec![254]), TestVal(vec![1])),
810                (TestKey(vec![0]), TestVal(vec![3]))
811            ]
812        );
813        let query = dbtx
814            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
815            .await
816            .collect::<Vec<_>>()
817            .await;
818        assert_eq!(
819            query,
820            vec![
821                (TestKey2(vec![255]), TestVal2(vec![2])),
822                (TestKey2(vec![254]), TestVal2(vec![1])),
823                (TestKey2(vec![0]), TestVal2(vec![3]))
824            ]
825        );
826    }
827}