fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context as _, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
17    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50#[bon::bon]
51impl RocksDb {
52    /// Open the database using blocking IO
53    #[builder(start_fn = build)]
54    #[builder(finish_fn = open_blocking)]
55    pub fn open_blocking(
56        #[builder(start_fn)] db_path: impl AsRef<Path>,
57        /// Relaxed consistency allows opening the database
58        /// even if the wal got corrupted.
59        relaxed_consistency: Option<bool>,
60    ) -> anyhow::Result<Locked<RocksDb>> {
61        let db_path = db_path.as_ref();
62
63        block_in_place(|| {
64            std::fs::create_dir_all(
65                db_path
66                    .parent()
67                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
68            )?;
69            LockedBuilder::new(db_path)?.with_db(|| {
70                Self::open_blocking_unlocked(db_path, relaxed_consistency.unwrap_or_default())
71            })
72        })
73    }
74}
75
76impl<I1, S> RocksDbOpenBlockingBuilder<I1, S>
77where
78    S: rocks_db_open_blocking_builder::State,
79    I1: std::convert::AsRef<std::path::Path>,
80{
81    /// Open the database
82    #[allow(clippy::unused_async)]
83    pub async fn open(self) -> anyhow::Result<Locked<RocksDb>> {
84        block_in_place(|| self.open_blocking())
85    }
86}
87
88impl RocksDb {
89    fn open_blocking_unlocked(
90        db_path: &Path,
91        relaxed_consistency: bool,
92    ) -> anyhow::Result<RocksDb> {
93        let mut opts = get_default_options()?;
94        if relaxed_consistency {
95            // https://github.com/fedimint/fedimint/issues/8072
96            opts.set_wal_recovery_mode(DBRecoveryMode::TolerateCorruptedTailRecords);
97        } else {
98            // Since we turned synchronous writes one we should never encounter a corrupted
99            // WAL and should rather fail in this case
100            opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
101        }
102        let db: rocksdb::OptimisticTransactionDB =
103            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
104        Ok(RocksDb(db))
105    }
106
107    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
108        &self.0
109    }
110}
111
112// TODO: Remove this and inline it in the places where it's used.
113fn is_power_of_two(num: usize) -> bool {
114    num.is_power_of_two()
115}
116
117impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.write_str("RocksDbTransaction")
120    }
121}
122
123impl fmt::Debug for RocksDbTransaction<'_> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.write_str("RocksDbTransaction")
126    }
127}
128
129#[test]
130fn is_power_of_two_sanity() {
131    assert!(!is_power_of_two(0));
132    assert!(is_power_of_two(1));
133    assert!(is_power_of_two(2));
134    assert!(!is_power_of_two(3));
135    assert!(is_power_of_two(4));
136    assert!(!is_power_of_two(5));
137    assert!(is_power_of_two(2 << 10));
138    assert!(!is_power_of_two((2 << 10) + 1));
139}
140
141fn get_default_options() -> anyhow::Result<rocksdb::Options> {
142    let mut opts = rocksdb::Options::default();
143    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
144        debug!(var, "Using custom write buffer size");
145        let size: usize = FromStr::from_str(&var)
146            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
147        if !is_power_of_two(size) {
148            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
149        }
150        opts.set_write_buffer_size(size);
151    }
152    opts.create_if_missing(true);
153    Ok(opts)
154}
155
156#[derive(Debug)]
157pub struct RocksDbReadOnly(rocksdb::DB);
158
159pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
160
161impl RocksDbReadOnly {
162    #[allow(clippy::unused_async)]
163    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
164        let db_path = db_path.as_ref();
165        block_in_place(|| Self::open_read_only_blocking(db_path))
166    }
167
168    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
169        let opts = get_default_options()?;
170        // Note: rocksdb is OK if one process has write access, and other read-access
171        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
172        Ok(RocksDbReadOnly(db))
173    }
174}
175
176impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
177    fn from(db: OptimisticTransactionDB) -> Self {
178        RocksDb(db)
179    }
180}
181
182impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
183    fn from(db: RocksDb) -> Self {
184        db.0
185    }
186}
187
188// When finding by prefix iterating in Reverse order, we need to start from
189// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
190// below.
191// Will return None if there is no next prefix (i.e prefix is already the last
192// possible/max one)
193fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
194    let mut next_prefix = prefix.to_vec();
195    let mut is_last_prefix = true;
196    for i in (0..next_prefix.len()).rev() {
197        next_prefix[i] = next_prefix[i].wrapping_add(1);
198        if next_prefix[i] > 0 {
199            is_last_prefix = false;
200            break;
201        }
202    }
203    if is_last_prefix {
204        // The given prefix is already the last/max prefix, so there is no next prefix,
205        // return None to represent that
206        None
207    } else {
208        Some(next_prefix)
209    }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDb {
214    type Transaction<'a> = RocksDbTransaction<'a>;
215    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
216        let mut optimistic_options = OptimisticTransactionOptions::default();
217        optimistic_options.set_snapshot(true);
218
219        let mut write_options = WriteOptions::default();
220        // Make sure we never lose data on unclean shutdown
221        write_options.set_sync(true);
222
223        let mut rocksdb_tx =
224            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
225        rocksdb_tx
226            .set_tx_savepoint()
227            .await
228            .expect("setting tx savepoint failed");
229
230        rocksdb_tx
231    }
232
233    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
234        let checkpoint =
235            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
236        checkpoint
237            .create_checkpoint(backup_path)
238            .map_err(DatabaseError::backend)?;
239        Ok(())
240    }
241}
242
243#[async_trait]
244impl IRawDatabase for RocksDbReadOnly {
245    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
246    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
247        RocksDbReadOnlyTransaction(&self.0)
248    }
249
250    fn checkpoint(&self, backup_path: &Path) -> DatabaseResult<()> {
251        let checkpoint =
252            rocksdb::checkpoint::Checkpoint::new(&self.0).map_err(DatabaseError::backend)?;
253        checkpoint
254            .create_checkpoint(backup_path)
255            .map_err(DatabaseError::backend)?;
256        Ok(())
257    }
258}
259
260#[async_trait]
261impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
262    async fn raw_insert_bytes(
263        &mut self,
264        key: &[u8],
265        value: &[u8],
266    ) -> DatabaseResult<Option<Vec<u8>>> {
267        fedimint_core::runtime::block_in_place(|| {
268            let val = self.0.snapshot().get(key).unwrap();
269            self.0.put(key, value).map_err(DatabaseError::backend)?;
270            Ok(val)
271        })
272    }
273
274    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
275        fedimint_core::runtime::block_in_place(|| {
276            self.0.snapshot().get(key).map_err(DatabaseError::backend)
277        })
278    }
279
280    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
281        fedimint_core::runtime::block_in_place(|| {
282            let val = self.0.snapshot().get(key).unwrap();
283            self.0.delete(key).map_err(DatabaseError::backend)?;
284            Ok(val)
285        })
286    }
287
288    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
289        Ok(fedimint_core::runtime::block_in_place(|| {
290            let prefix = key_prefix.to_vec();
291            let mut options = rocksdb::ReadOptions::default();
292            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
293            let iter = self.0.snapshot().iterator_opt(
294                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
295                options,
296            );
297            let rocksdb_iter = iter.map_while(move |res| {
298                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
299                key_bytes
300                    .starts_with(&prefix)
301                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
302            });
303            Box::pin(convert_to_async_stream(rocksdb_iter))
304        }))
305    }
306
307    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
308        Ok(fedimint_core::runtime::block_in_place(|| {
309            let range = Range {
310                start: range.start.to_vec(),
311                end: range.end.to_vec(),
312            };
313            let mut options = rocksdb::ReadOptions::default();
314            options.set_iterate_range(range.clone());
315            let iter = self.0.snapshot().iterator_opt(
316                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
317                options,
318            );
319            let rocksdb_iter = iter.map_while(move |res| {
320                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
321                (key_bytes.as_ref() < range.end.as_slice())
322                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
323            });
324            Box::pin(convert_to_async_stream(rocksdb_iter))
325        }))
326    }
327
328    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
329        fedimint_core::runtime::block_in_place(|| {
330            // Note: delete_range is not supported in Transactions :/
331            let mut options = rocksdb::ReadOptions::default();
332            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
333            let iter = self
334                .0
335                .snapshot()
336                .iterator_opt(
337                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
338                    options,
339                )
340                .map_while(|res| {
341                    res.map(|(key_bytes, _)| {
342                        key_bytes
343                            .starts_with(key_prefix)
344                            .then_some(key_bytes.to_vec())
345                    })
346                    .transpose()
347                });
348
349            for item in iter {
350                let key = item.map_err(DatabaseError::backend)?;
351                self.0.delete(key).map_err(DatabaseError::backend)?;
352            }
353
354            Ok(())
355        })
356    }
357
358    async fn raw_find_by_prefix_sorted_descending(
359        &mut self,
360        key_prefix: &[u8],
361    ) -> DatabaseResult<PrefixStream<'_>> {
362        let prefix = key_prefix.to_vec();
363        let next_prefix = next_prefix(&prefix);
364        let iterator_mode = if let Some(next_prefix) = &next_prefix {
365            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
366        } else {
367            rocksdb::IteratorMode::End
368        };
369        Ok(fedimint_core::runtime::block_in_place(|| {
370            let mut options = rocksdb::ReadOptions::default();
371            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
372            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
373            let rocksdb_iter = iter.map_while(move |res| {
374                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
375                key_bytes
376                    .starts_with(&prefix)
377                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
378            });
379            Box::pin(convert_to_async_stream(rocksdb_iter))
380        }))
381    }
382}
383
384#[async_trait]
385impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
386    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
387        fedimint_core::runtime::block_in_place(|| {
388            self.0
389                .rollback_to_savepoint()
390                .map_err(DatabaseError::backend)
391        })
392    }
393
394    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
395        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
396
397        Ok(())
398    }
399}
400
401#[async_trait]
402impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
403    async fn commit_tx(self) -> DatabaseResult<()> {
404        fedimint_core::runtime::block_in_place(|| {
405            self.0.commit().map_err(|e| {
406                // RocksDB optimistic transactions return errors on write conflicts
407                // The error message typically contains "Resource busy" for conflicts
408                let error_string = e.to_string();
409                if error_string.contains("Resource busy") || error_string.contains("Conflict") {
410                    DatabaseError::WriteConflict
411                } else {
412                    DatabaseError::backend(e)
413                }
414            })?;
415            Ok(())
416        })
417    }
418}
419
420#[async_trait]
421impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
422    async fn raw_insert_bytes(
423        &mut self,
424        _key: &[u8],
425        _value: &[u8],
426    ) -> DatabaseResult<Option<Vec<u8>>> {
427        panic!("Cannot insert into a read only transaction");
428    }
429
430    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
431        fedimint_core::runtime::block_in_place(|| {
432            self.0.snapshot().get(key).map_err(DatabaseError::backend)
433        })
434    }
435
436    async fn raw_remove_entry(&mut self, _key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
437        panic!("Cannot remove from a read only transaction");
438    }
439
440    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
441        Ok(fedimint_core::runtime::block_in_place(|| {
442            let range = Range {
443                start: range.start.to_vec(),
444                end: range.end.to_vec(),
445            };
446            let mut options = rocksdb::ReadOptions::default();
447            options.set_iterate_range(range.clone());
448            let iter = self.0.snapshot().iterator_opt(
449                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
450                options,
451            );
452            let rocksdb_iter = iter.map_while(move |res| {
453                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
454                (key_bytes.as_ref() < range.end.as_slice())
455                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
456            });
457            Box::pin(convert_to_async_stream(rocksdb_iter))
458        }))
459    }
460
461    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
462        Ok(fedimint_core::runtime::block_in_place(|| {
463            let prefix = key_prefix.to_vec();
464            let mut options = rocksdb::ReadOptions::default();
465            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
466            let iter = self.0.snapshot().iterator_opt(
467                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
468                options,
469            );
470            let rocksdb_iter = iter.map_while(move |res| {
471                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
472                key_bytes
473                    .starts_with(&prefix)
474                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
475            });
476            Box::pin(convert_to_async_stream(rocksdb_iter))
477        }))
478    }
479
480    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> DatabaseResult<()> {
481        panic!("Cannot remove from a read only transaction");
482    }
483
484    async fn raw_find_by_prefix_sorted_descending(
485        &mut self,
486        key_prefix: &[u8],
487    ) -> DatabaseResult<PrefixStream<'_>> {
488        let prefix = key_prefix.to_vec();
489        let next_prefix = next_prefix(&prefix);
490        let iterator_mode = if let Some(next_prefix) = &next_prefix {
491            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
492        } else {
493            rocksdb::IteratorMode::End
494        };
495        Ok(fedimint_core::runtime::block_in_place(|| {
496            let mut options = rocksdb::ReadOptions::default();
497            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
498            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
499            let rocksdb_iter = iter.map_while(move |res| {
500                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
501                key_bytes
502                    .starts_with(&prefix)
503                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
504            });
505            Box::pin(stream::iter(rocksdb_iter))
506        }))
507    }
508}
509
510#[async_trait]
511impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
512    async fn rollback_tx_to_savepoint(&mut self) -> DatabaseResult<()> {
513        panic!("Cannot rollback a read only transaction");
514    }
515
516    async fn set_tx_savepoint(&mut self) -> DatabaseResult<()> {
517        panic!("Cannot set a savepoint in a read only transaction");
518    }
519}
520
521#[async_trait]
522impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
523    async fn commit_tx(self) -> DatabaseResult<()> {
524        panic!("Cannot commit a read only transaction");
525    }
526}
527
528#[cfg(test)]
529mod fedimint_rocksdb_tests {
530    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
531    use fedimint_core::encoding::{Decodable, Encodable};
532    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
533    use fedimint_core::{impl_db_lookup, impl_db_record};
534    use futures::StreamExt;
535
536    use super::*;
537
538    fn open_temp_db(temp_path: &str) -> Database {
539        let path = tempfile::Builder::new()
540            .prefix(temp_path)
541            .tempdir()
542            .unwrap();
543
544        Database::new(
545            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
546            ModuleDecoderRegistry::default(),
547        )
548    }
549
550    #[tokio::test(flavor = "multi_thread")]
551    async fn test_dbtx_insert_elements() {
552        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
553            .await;
554    }
555
556    #[tokio::test(flavor = "multi_thread")]
557    async fn test_dbtx_remove_nonexisting() {
558        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
559            "fcb-rocksdb-test-remove-nonexisting",
560        ))
561        .await;
562    }
563
564    #[tokio::test(flavor = "multi_thread")]
565    async fn test_dbtx_remove_existing() {
566        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
567            .await;
568    }
569
570    #[tokio::test(flavor = "multi_thread")]
571    async fn test_dbtx_read_own_writes() {
572        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
573            .await;
574    }
575
576    #[tokio::test(flavor = "multi_thread")]
577    async fn test_dbtx_prevent_dirty_reads() {
578        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
579            "fcb-rocksdb-test-prevent-dirty-reads",
580        ))
581        .await;
582    }
583
584    #[tokio::test(flavor = "multi_thread")]
585    async fn test_dbtx_find_by_range() {
586        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
587            .await;
588    }
589
590    #[tokio::test(flavor = "multi_thread")]
591    async fn test_dbtx_find_by_prefix() {
592        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
593            .await;
594    }
595
596    #[tokio::test(flavor = "multi_thread")]
597    async fn test_dbtx_commit() {
598        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
599    }
600
601    #[tokio::test(flavor = "multi_thread")]
602    async fn test_dbtx_prevent_nonrepeatable_reads() {
603        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
604            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
605        ))
606        .await;
607    }
608
609    #[tokio::test(flavor = "multi_thread")]
610    async fn test_dbtx_snapshot_isolation() {
611        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
612            "fcb-rocksdb-test-snapshot-isolation",
613        ))
614        .await;
615    }
616
617    #[tokio::test(flavor = "multi_thread")]
618    async fn test_dbtx_rollback_to_savepoint() {
619        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
620            "fcb-rocksdb-test-rollback-to-savepoint",
621        ))
622        .await;
623    }
624
625    #[tokio::test(flavor = "multi_thread")]
626    async fn test_dbtx_phantom_entry() {
627        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
628            .await;
629    }
630
631    #[tokio::test(flavor = "multi_thread")]
632    async fn test_dbtx_write_conflict() {
633        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
634            .await;
635    }
636
637    #[tokio::test(flavor = "multi_thread")]
638    async fn test_dbtx_remove_by_prefix() {
639        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
640            "fcb-rocksdb-test-remove-by-prefix",
641        ))
642        .await;
643    }
644
645    #[tokio::test(flavor = "multi_thread")]
646    async fn test_module_dbtx() {
647        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
648            .await;
649    }
650
651    #[tokio::test(flavor = "multi_thread")]
652    async fn test_module_db() {
653        let module_instance_id = 1;
654        let path = tempfile::Builder::new()
655            .prefix("fcb-rocksdb-test-module-db-prefix")
656            .tempdir()
657            .unwrap();
658
659        let module_db = Database::new(
660            RocksDb::build(path.as_ref()).open_blocking().unwrap(),
661            ModuleDecoderRegistry::default(),
662        );
663
664        fedimint_core::db::verify_module_db(
665            open_temp_db("fcb-rocksdb-test-module-db"),
666            module_db.with_prefix_module_id(module_instance_id).0,
667        )
668        .await;
669    }
670
671    #[test]
672    fn test_next_prefix() {
673        // Note: although we are testing the general case of a vector with N elements,
674        // the prefixes currently use N = 1
675        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
676        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
677        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
678        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
679        // this is a "max" prefix
680        assert!(next_prefix(&[255, 255, 255]).is_none());
681        // these are the common case
682        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
683        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
684        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
685    }
686
687    #[repr(u8)]
688    #[derive(Clone)]
689    pub enum TestDbKeyPrefix {
690        Test = 254,
691        MaxTest = 255,
692    }
693
694    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
695    pub(super) struct TestKey(pub Vec<u8>);
696
697    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
698    pub(super) struct TestVal(pub Vec<u8>);
699
700    #[derive(Debug, Encodable, Decodable)]
701    struct DbPrefixTestPrefix;
702
703    impl_db_record!(
704        key = TestKey,
705        value = TestVal,
706        db_prefix = TestDbKeyPrefix::Test,
707        notify_on_modify = true,
708    );
709    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
710
711    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
712    pub(super) struct TestKey2(pub Vec<u8>);
713
714    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
715    pub(super) struct TestVal2(pub Vec<u8>);
716
717    #[derive(Debug, Encodable, Decodable)]
718    struct DbPrefixTestPrefixMax;
719
720    impl_db_record!(
721        key = TestKey2,
722        value = TestVal2,
723        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
724        notify_on_modify = true,
725    );
726    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
727
728    #[tokio::test(flavor = "multi_thread")]
729    async fn test_retrieve_descending_order() {
730        let path = tempfile::Builder::new()
731            .prefix("fcb-rocksdb-test-descending-order")
732            .tempdir()
733            .unwrap();
734        {
735            let db = Database::new(
736                RocksDb::build(&path).open().await.unwrap(),
737                ModuleDecoderRegistry::default(),
738            );
739            let mut dbtx = db.begin_transaction().await;
740            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
741                .await;
742            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
743                .await;
744            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
745                .await;
746            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
747                .await;
748            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
749                .await;
750            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
751                .await;
752            let query = dbtx
753                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
754                .await
755                .collect::<Vec<_>>()
756                .await;
757            assert_eq!(
758                query,
759                vec![
760                    (TestKey(vec![255]), TestVal(vec![2])),
761                    (TestKey(vec![254]), TestVal(vec![1])),
762                    (TestKey(vec![0]), TestVal(vec![3]))
763                ]
764            );
765            let query = dbtx
766                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
767                .await
768                .collect::<Vec<_>>()
769                .await;
770            assert_eq!(
771                query,
772                vec![
773                    (TestKey2(vec![255]), TestVal2(vec![2])),
774                    (TestKey2(vec![254]), TestVal2(vec![1])),
775                    (TestKey2(vec![0]), TestVal2(vec![3]))
776                ]
777            );
778            dbtx.commit_tx().await;
779        }
780        // Test readonly implementation
781        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
782        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
783        let mut dbtx = db_readonly.begin_transaction_nc().await;
784        let query = dbtx
785            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
786            .await
787            .collect::<Vec<_>>()
788            .await;
789        assert_eq!(
790            query,
791            vec![
792                (TestKey(vec![255]), TestVal(vec![2])),
793                (TestKey(vec![254]), TestVal(vec![1])),
794                (TestKey(vec![0]), TestVal(vec![3]))
795            ]
796        );
797        let query = dbtx
798            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
799            .await
800            .collect::<Vec<_>>()
801            .await;
802        assert_eq!(
803            query,
804            vec![
805                (TestKey2(vec![255]), TestVal2(vec![2])),
806                (TestKey2(vec![254]), TestVal2(vec![1])),
807                (TestKey2(vec![0]), TestVal2(vec![3]))
808            ]
809        );
810    }
811}