fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod db_locked;
7pub mod envs;
8
9use std::fmt;
10use std::ops::Range;
11use std::path::Path;
12use std::str::FromStr;
13
14use anyhow::{Context, Result, bail};
15use async_trait::async_trait;
16use db_locked::{Locked, LockedBuilder};
17use fedimint_core::db::{
18    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
19    PrefixStream,
20};
21use fedimint_core::task::block_in_place;
22use futures::stream;
23pub use rocksdb;
24use rocksdb::{
25    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
26};
27use tracing::debug;
28
29use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
30
31// turn an `iter` into a `Stream` where every `next` is ran inside
32// `block_in_place` to offload the blocking calls
33fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
34where
35    I: Iterator + Send + 'i,
36    I::Item: Send,
37{
38    stream::unfold(iter, |mut iter| async {
39        fedimint_core::runtime::block_in_place(|| {
40            let item = iter.next();
41            item.map(|item| (item, iter))
42        })
43    })
44}
45
46#[derive(Debug)]
47pub struct RocksDb(rocksdb::OptimisticTransactionDB);
48
49pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
50
51impl RocksDb {
52    #[allow(clippy::unused_async)]
53    pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
54        let db_path = db_path.as_ref();
55
56        block_in_place(|| Self::open_blocking(db_path))
57    }
58    pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
59        block_in_place(|| {
60            std::fs::create_dir_all(
61                db_path
62                    .parent()
63                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
64            )?;
65            Ok(LockedBuilder::new(db_path)?.with_db(Self::open_blocking_unlocked(db_path)?))
66        })
67    }
68    pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
69        let mut opts = get_default_options()?;
70        // Since we turned synchronous writes one we should never encounter a corrupted
71        // WAL and should rather fail in this case
72        opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
73        let db: rocksdb::OptimisticTransactionDB =
74            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
75        Ok(RocksDb(db))
76    }
77
78    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
79        &self.0
80    }
81}
82
83// TODO: Remove this and inline it in the places where it's used.
84fn is_power_of_two(num: usize) -> bool {
85    num.is_power_of_two()
86}
87
88impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.write_str("RocksDbTransaction")
91    }
92}
93
94impl<'a> fmt::Debug for RocksDbTransaction<'a> {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.write_str("RocksDbTransaction")
97    }
98}
99
100#[test]
101fn is_power_of_two_sanity() {
102    assert!(!is_power_of_two(0));
103    assert!(is_power_of_two(1));
104    assert!(is_power_of_two(2));
105    assert!(!is_power_of_two(3));
106    assert!(is_power_of_two(4));
107    assert!(!is_power_of_two(5));
108    assert!(is_power_of_two(2 << 10));
109    assert!(!is_power_of_two((2 << 10) + 1));
110}
111
112fn get_default_options() -> anyhow::Result<rocksdb::Options> {
113    let mut opts = rocksdb::Options::default();
114    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
115        debug!(var, "Using custom write buffer size");
116        let size: usize = FromStr::from_str(&var)
117            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
118        if !is_power_of_two(size) {
119            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
120        }
121        opts.set_write_buffer_size(size);
122    }
123    opts.create_if_missing(true);
124    Ok(opts)
125}
126
127#[derive(Debug)]
128pub struct RocksDbReadOnly(rocksdb::DB);
129
130pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
131
132impl RocksDbReadOnly {
133    #[allow(clippy::unused_async)]
134    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
135        let db_path = db_path.as_ref();
136        block_in_place(|| Self::open_read_only_blocking(db_path))
137    }
138
139    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
140        let opts = get_default_options()?;
141        // Note: rocksdb is OK if one process has write access, and other read-access
142        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
143        Ok(RocksDbReadOnly(db))
144    }
145}
146
147impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
148    fn from(db: OptimisticTransactionDB) -> Self {
149        RocksDb(db)
150    }
151}
152
153impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
154    fn from(db: RocksDb) -> Self {
155        db.0
156    }
157}
158
159// When finding by prefix iterating in Reverse order, we need to start from
160// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
161// below.
162// Will return None if there is no next prefix (i.e prefix is already the last
163// possible/max one)
164fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
165    let mut next_prefix = prefix.to_vec();
166    let mut is_last_prefix = true;
167    for i in (0..next_prefix.len()).rev() {
168        next_prefix[i] = next_prefix[i].wrapping_add(1);
169        if next_prefix[i] > 0 {
170            is_last_prefix = false;
171            break;
172        }
173    }
174    if is_last_prefix {
175        // The given prefix is already the last/max prefix, so there is no next prefix,
176        // return None to represent that
177        None
178    } else {
179        Some(next_prefix)
180    }
181}
182
183#[async_trait]
184impl IRawDatabase for RocksDb {
185    type Transaction<'a> = RocksDbTransaction<'a>;
186    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
187        let mut optimistic_options = OptimisticTransactionOptions::default();
188        optimistic_options.set_snapshot(true);
189
190        let mut write_options = WriteOptions::default();
191        // Make sure we never lose data on unclean shutdown
192        write_options.set_sync(true);
193
194        let mut rocksdb_tx =
195            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
196        rocksdb_tx
197            .set_tx_savepoint()
198            .await
199            .expect("setting tx savepoint failed");
200
201        rocksdb_tx
202    }
203
204    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
205        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
206        checkpoint.create_checkpoint(backup_path)?;
207        Ok(())
208    }
209}
210
211#[async_trait]
212impl IRawDatabase for RocksDbReadOnly {
213    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
214    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
215        RocksDbReadOnlyTransaction(&self.0)
216    }
217
218    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
219        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
220        checkpoint.create_checkpoint(backup_path)?;
221        Ok(())
222    }
223}
224
225#[async_trait]
226impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
227    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
228        fedimint_core::runtime::block_in_place(|| {
229            let val = self.0.snapshot().get(key).unwrap();
230            self.0.put(key, value)?;
231            Ok(val)
232        })
233    }
234
235    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
236        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
237    }
238
239    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
240        fedimint_core::runtime::block_in_place(|| {
241            let val = self.0.snapshot().get(key).unwrap();
242            self.0.delete(key)?;
243            Ok(val)
244        })
245    }
246
247    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
248        Ok(fedimint_core::runtime::block_in_place(|| {
249            let prefix = key_prefix.to_vec();
250            let mut options = rocksdb::ReadOptions::default();
251            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
252            let iter = self.0.snapshot().iterator_opt(
253                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
254                options,
255            );
256            let rocksdb_iter = iter.map_while(move |res| {
257                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
258                key_bytes
259                    .starts_with(&prefix)
260                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
261            });
262            Box::pin(convert_to_async_stream(rocksdb_iter))
263        }))
264    }
265
266    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
267        Ok(fedimint_core::runtime::block_in_place(|| {
268            let range = Range {
269                start: range.start.to_vec(),
270                end: range.end.to_vec(),
271            };
272            let mut options = rocksdb::ReadOptions::default();
273            options.set_iterate_range(range.clone());
274            let iter = self.0.snapshot().iterator_opt(
275                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
276                options,
277            );
278            let rocksdb_iter = iter.map_while(move |res| {
279                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
280                (key_bytes.as_ref() < range.end.as_slice())
281                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
282            });
283            Box::pin(convert_to_async_stream(rocksdb_iter))
284        }))
285    }
286
287    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
288        fedimint_core::runtime::block_in_place(|| {
289            // Note: delete_range is not supported in Transactions :/
290            let mut options = rocksdb::ReadOptions::default();
291            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
292            let iter = self
293                .0
294                .snapshot()
295                .iterator_opt(
296                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
297                    options,
298                )
299                .map_while(|res| {
300                    res.map(|(key_bytes, _)| {
301                        key_bytes
302                            .starts_with(key_prefix)
303                            .then_some(key_bytes.to_vec())
304                    })
305                    .transpose()
306                });
307
308            for item in iter {
309                let key = item?;
310                self.0.delete(key)?;
311            }
312
313            Ok(())
314        })
315    }
316
317    async fn raw_find_by_prefix_sorted_descending(
318        &mut self,
319        key_prefix: &[u8],
320    ) -> Result<PrefixStream<'_>> {
321        let prefix = key_prefix.to_vec();
322        let next_prefix = next_prefix(&prefix);
323        let iterator_mode = if let Some(next_prefix) = &next_prefix {
324            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
325        } else {
326            rocksdb::IteratorMode::End
327        };
328        Ok(fedimint_core::runtime::block_in_place(|| {
329            let mut options = rocksdb::ReadOptions::default();
330            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
331            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
332            let rocksdb_iter = iter.map_while(move |res| {
333                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
334                key_bytes
335                    .starts_with(&prefix)
336                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
337            });
338            Box::pin(convert_to_async_stream(rocksdb_iter))
339        }))
340    }
341}
342
343#[async_trait]
344impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
345    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
346        Ok(fedimint_core::runtime::block_in_place(|| {
347            self.0.rollback_to_savepoint()
348        })?)
349    }
350
351    async fn set_tx_savepoint(&mut self) -> Result<()> {
352        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
353
354        Ok(())
355    }
356}
357
358#[async_trait]
359impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
360    async fn commit_tx(self) -> Result<()> {
361        fedimint_core::runtime::block_in_place(|| {
362            self.0.commit()?;
363            Ok(())
364        })
365    }
366}
367
368#[async_trait]
369impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
370    async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
371        panic!("Cannot insert into a read only transaction");
372    }
373
374    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
375        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
376    }
377
378    async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
379        panic!("Cannot remove from a read only transaction");
380    }
381
382    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
383        Ok(fedimint_core::runtime::block_in_place(|| {
384            let range = Range {
385                start: range.start.to_vec(),
386                end: range.end.to_vec(),
387            };
388            let mut options = rocksdb::ReadOptions::default();
389            options.set_iterate_range(range.clone());
390            let iter = self.0.snapshot().iterator_opt(
391                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
392                options,
393            );
394            let rocksdb_iter = iter.map_while(move |res| {
395                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
396                (key_bytes.as_ref() < range.end.as_slice())
397                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
398            });
399            Box::pin(convert_to_async_stream(rocksdb_iter))
400        }))
401    }
402
403    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
404        Ok(fedimint_core::runtime::block_in_place(|| {
405            let prefix = key_prefix.to_vec();
406            let mut options = rocksdb::ReadOptions::default();
407            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
408            let iter = self.0.snapshot().iterator_opt(
409                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
410                options,
411            );
412            let rocksdb_iter = iter.map_while(move |res| {
413                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
414                key_bytes
415                    .starts_with(&prefix)
416                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
417            });
418            Box::pin(convert_to_async_stream(rocksdb_iter))
419        }))
420    }
421
422    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
423        panic!("Cannot remove from a read only transaction");
424    }
425
426    async fn raw_find_by_prefix_sorted_descending(
427        &mut self,
428        key_prefix: &[u8],
429    ) -> Result<PrefixStream<'_>> {
430        let prefix = key_prefix.to_vec();
431        let next_prefix = next_prefix(&prefix);
432        let iterator_mode = if let Some(next_prefix) = &next_prefix {
433            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
434        } else {
435            rocksdb::IteratorMode::End
436        };
437        Ok(fedimint_core::runtime::block_in_place(|| {
438            let mut options = rocksdb::ReadOptions::default();
439            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
440            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
441            let rocksdb_iter = iter.map_while(move |res| {
442                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
443                key_bytes
444                    .starts_with(&prefix)
445                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
446            });
447            Box::pin(stream::iter(rocksdb_iter))
448        }))
449    }
450}
451
452#[async_trait]
453impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
454    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
455        panic!("Cannot rollback a read only transaction");
456    }
457
458    async fn set_tx_savepoint(&mut self) -> Result<()> {
459        panic!("Cannot set a savepoint in a read only transaction");
460    }
461}
462
463#[async_trait]
464impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
465    async fn commit_tx(self) -> Result<()> {
466        panic!("Cannot commit a read only transaction");
467    }
468}
469
470#[cfg(test)]
471mod fedimint_rocksdb_tests {
472    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
473    use fedimint_core::encoding::{Decodable, Encodable};
474    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
475    use fedimint_core::{impl_db_lookup, impl_db_record};
476    use futures::StreamExt;
477
478    use super::*;
479
480    fn open_temp_db(temp_path: &str) -> Database {
481        let path = tempfile::Builder::new()
482            .prefix(temp_path)
483            .tempdir()
484            .unwrap();
485
486        Database::new(
487            RocksDb::open_blocking(path.as_ref()).unwrap(),
488            ModuleDecoderRegistry::default(),
489        )
490    }
491
492    #[tokio::test(flavor = "multi_thread")]
493    async fn test_dbtx_insert_elements() {
494        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
495            .await;
496    }
497
498    #[tokio::test(flavor = "multi_thread")]
499    async fn test_dbtx_remove_nonexisting() {
500        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
501            "fcb-rocksdb-test-remove-nonexisting",
502        ))
503        .await;
504    }
505
506    #[tokio::test(flavor = "multi_thread")]
507    async fn test_dbtx_remove_existing() {
508        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
509            .await;
510    }
511
512    #[tokio::test(flavor = "multi_thread")]
513    async fn test_dbtx_read_own_writes() {
514        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
515            .await;
516    }
517
518    #[tokio::test(flavor = "multi_thread")]
519    async fn test_dbtx_prevent_dirty_reads() {
520        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
521            "fcb-rocksdb-test-prevent-dirty-reads",
522        ))
523        .await;
524    }
525
526    #[tokio::test(flavor = "multi_thread")]
527    async fn test_dbtx_find_by_range() {
528        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
529            .await;
530    }
531
532    #[tokio::test(flavor = "multi_thread")]
533    async fn test_dbtx_find_by_prefix() {
534        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
535            .await;
536    }
537
538    #[tokio::test(flavor = "multi_thread")]
539    async fn test_dbtx_commit() {
540        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
541    }
542
543    #[tokio::test(flavor = "multi_thread")]
544    async fn test_dbtx_prevent_nonrepeatable_reads() {
545        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
546            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
547        ))
548        .await;
549    }
550
551    #[tokio::test(flavor = "multi_thread")]
552    async fn test_dbtx_snapshot_isolation() {
553        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
554            "fcb-rocksdb-test-snapshot-isolation",
555        ))
556        .await;
557    }
558
559    #[tokio::test(flavor = "multi_thread")]
560    async fn test_dbtx_rollback_to_savepoint() {
561        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
562            "fcb-rocksdb-test-rollback-to-savepoint",
563        ))
564        .await;
565    }
566
567    #[tokio::test(flavor = "multi_thread")]
568    async fn test_dbtx_phantom_entry() {
569        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
570            .await;
571    }
572
573    #[tokio::test(flavor = "multi_thread")]
574    async fn test_dbtx_write_conflict() {
575        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
576            .await;
577    }
578
579    #[tokio::test(flavor = "multi_thread")]
580    async fn test_dbtx_remove_by_prefix() {
581        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
582            "fcb-rocksdb-test-remove-by-prefix",
583        ))
584        .await;
585    }
586
587    #[tokio::test(flavor = "multi_thread")]
588    async fn test_module_dbtx() {
589        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
590            .await;
591    }
592
593    #[tokio::test(flavor = "multi_thread")]
594    async fn test_module_db() {
595        let module_instance_id = 1;
596        let path = tempfile::Builder::new()
597            .prefix("fcb-rocksdb-test-module-db-prefix")
598            .tempdir()
599            .unwrap();
600
601        let module_db = Database::new(
602            RocksDb::open_blocking(path.as_ref()).unwrap(),
603            ModuleDecoderRegistry::default(),
604        );
605
606        fedimint_core::db::verify_module_db(
607            open_temp_db("fcb-rocksdb-test-module-db"),
608            module_db.with_prefix_module_id(module_instance_id).0,
609        )
610        .await;
611    }
612
613    #[test]
614    fn test_next_prefix() {
615        // Note: although we are testing the general case of a vector with N elements,
616        // the prefixes currently use N = 1
617        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
618        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
619        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
620        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
621        // this is a "max" prefix
622        assert!(next_prefix(&[255, 255, 255]).is_none());
623        // these are the common case
624        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
625        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
626        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
627    }
628
629    #[repr(u8)]
630    #[derive(Clone)]
631    pub enum TestDbKeyPrefix {
632        Test = 254,
633        MaxTest = 255,
634    }
635
636    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
637    pub(super) struct TestKey(pub Vec<u8>);
638
639    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
640    pub(super) struct TestVal(pub Vec<u8>);
641
642    #[derive(Debug, Encodable, Decodable)]
643    struct DbPrefixTestPrefix;
644
645    impl_db_record!(
646        key = TestKey,
647        value = TestVal,
648        db_prefix = TestDbKeyPrefix::Test,
649        notify_on_modify = true,
650    );
651    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
652
653    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
654    pub(super) struct TestKey2(pub Vec<u8>);
655
656    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
657    pub(super) struct TestVal2(pub Vec<u8>);
658
659    #[derive(Debug, Encodable, Decodable)]
660    struct DbPrefixTestPrefixMax;
661
662    impl_db_record!(
663        key = TestKey2,
664        value = TestVal2,
665        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
666        notify_on_modify = true,
667    );
668    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
669
670    #[tokio::test(flavor = "multi_thread")]
671    async fn test_retrieve_descending_order() {
672        let path = tempfile::Builder::new()
673            .prefix("fcb-rocksdb-test-descending-order")
674            .tempdir()
675            .unwrap();
676        {
677            let db = Database::new(
678                RocksDb::open(&path).await.unwrap(),
679                ModuleDecoderRegistry::default(),
680            );
681            let mut dbtx = db.begin_transaction().await;
682            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
683                .await;
684            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
685                .await;
686            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
687                .await;
688            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
689                .await;
690            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
691                .await;
692            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
693                .await;
694            let query = dbtx
695                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
696                .await
697                .collect::<Vec<_>>()
698                .await;
699            assert_eq!(
700                query,
701                vec![
702                    (TestKey(vec![255]), TestVal(vec![2])),
703                    (TestKey(vec![254]), TestVal(vec![1])),
704                    (TestKey(vec![0]), TestVal(vec![3]))
705                ]
706            );
707            let query = dbtx
708                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
709                .await
710                .collect::<Vec<_>>()
711                .await;
712            assert_eq!(
713                query,
714                vec![
715                    (TestKey2(vec![255]), TestVal2(vec![2])),
716                    (TestKey2(vec![254]), TestVal2(vec![1])),
717                    (TestKey2(vec![0]), TestVal2(vec![3]))
718                ]
719            );
720            dbtx.commit_tx().await;
721        }
722        // Test readonly implementation
723        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
724        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
725        let mut dbtx = db_readonly.begin_transaction_nc().await;
726        let query = dbtx
727            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
728            .await
729            .collect::<Vec<_>>()
730            .await;
731        assert_eq!(
732            query,
733            vec![
734                (TestKey(vec![255]), TestVal(vec![2])),
735                (TestKey(vec![254]), TestVal(vec![1])),
736                (TestKey(vec![0]), TestVal(vec![3]))
737            ]
738        );
739        let query = dbtx
740            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
741            .await
742            .collect::<Vec<_>>()
743            .await;
744        assert_eq!(
745            query,
746            vec![
747                (TestKey2(vec![255]), TestVal2(vec![2])),
748                (TestKey2(vec![254]), TestVal2(vec![1])),
749                (TestKey2(vec![0]), TestVal2(vec![3]))
750            ]
751        );
752    }
753}