fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
17    PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30// turn an `iter` into a `Stream` where every `next` is ran inside
31// `block_in_place` to offload the blocking calls
32fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34    I: Iterator + Send + 'i,
35    I::Item: Send,
36{
37    stream::unfold(iter, |mut iter| async {
38        fedimint_core::runtime::block_in_place(|| {
39            let item = iter.next();
40            item.map(|item| (item, iter))
41        })
42    })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50impl RocksDb {
51    #[allow(clippy::unused_async)]
52    pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
53        let db_path = db_path.as_ref();
54
55        block_in_place(|| Self::open_blocking(db_path))
56    }
57
58    pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
59        block_in_place(|| {
60            std::fs::create_dir_all(
61                db_path
62                    .parent()
63                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
64            )?;
65            LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
66        })
67    }
68
69    pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
70        let mut opts = get_default_options()?;
71        // Since we turned synchronous writes one we should never encounter a corrupted
72        // WAL and should rather fail in this case
73        opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
74        let db: rocksdb::OptimisticTransactionDB =
75            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
76        Ok(RocksDb(db))
77    }
78
79    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
80        &self.0
81    }
82}
83
84// TODO: Remove this and inline it in the places where it's used.
85fn is_power_of_two(num: usize) -> bool {
86    num.is_power_of_two()
87}
88
89impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.write_str("RocksDbTransaction")
92    }
93}
94
95impl fmt::Debug for RocksDbTransaction<'_> {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.write_str("RocksDbTransaction")
98    }
99}
100
101#[test]
102fn is_power_of_two_sanity() {
103    assert!(!is_power_of_two(0));
104    assert!(is_power_of_two(1));
105    assert!(is_power_of_two(2));
106    assert!(!is_power_of_two(3));
107    assert!(is_power_of_two(4));
108    assert!(!is_power_of_two(5));
109    assert!(is_power_of_two(2 << 10));
110    assert!(!is_power_of_two((2 << 10) + 1));
111}
112
113fn get_default_options() -> anyhow::Result<rocksdb::Options> {
114    let mut opts = rocksdb::Options::default();
115    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
116        debug!(var, "Using custom write buffer size");
117        let size: usize = FromStr::from_str(&var)
118            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
119        if !is_power_of_two(size) {
120            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
121        }
122        opts.set_write_buffer_size(size);
123    }
124    opts.create_if_missing(true);
125    Ok(opts)
126}
127
128#[derive(Debug)]
129pub struct RocksDbReadOnly(rocksdb::DB);
130
131pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
132
133impl RocksDbReadOnly {
134    #[allow(clippy::unused_async)]
135    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
136        let db_path = db_path.as_ref();
137        block_in_place(|| Self::open_read_only_blocking(db_path))
138    }
139
140    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
141        let opts = get_default_options()?;
142        // Note: rocksdb is OK if one process has write access, and other read-access
143        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
144        Ok(RocksDbReadOnly(db))
145    }
146}
147
148impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
149    fn from(db: OptimisticTransactionDB) -> Self {
150        RocksDb(db)
151    }
152}
153
154impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
155    fn from(db: RocksDb) -> Self {
156        db.0
157    }
158}
159
160// When finding by prefix iterating in Reverse order, we need to start from
161// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
162// below.
163// Will return None if there is no next prefix (i.e prefix is already the last
164// possible/max one)
165fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
166    let mut next_prefix = prefix.to_vec();
167    let mut is_last_prefix = true;
168    for i in (0..next_prefix.len()).rev() {
169        next_prefix[i] = next_prefix[i].wrapping_add(1);
170        if next_prefix[i] > 0 {
171            is_last_prefix = false;
172            break;
173        }
174    }
175    if is_last_prefix {
176        // The given prefix is already the last/max prefix, so there is no next prefix,
177        // return None to represent that
178        None
179    } else {
180        Some(next_prefix)
181    }
182}
183
184#[async_trait]
185impl IRawDatabase for RocksDb {
186    type Transaction<'a> = RocksDbTransaction<'a>;
187    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
188        let mut optimistic_options = OptimisticTransactionOptions::default();
189        optimistic_options.set_snapshot(true);
190
191        let mut write_options = WriteOptions::default();
192        // Make sure we never lose data on unclean shutdown
193        write_options.set_sync(true);
194
195        let mut rocksdb_tx =
196            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
197        rocksdb_tx
198            .set_tx_savepoint()
199            .await
200            .expect("setting tx savepoint failed");
201
202        rocksdb_tx
203    }
204
205    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
206        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
207        checkpoint.create_checkpoint(backup_path)?;
208        Ok(())
209    }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDbReadOnly {
214    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
215    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
216        RocksDbReadOnlyTransaction(&self.0)
217    }
218
219    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
220        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
221        checkpoint.create_checkpoint(backup_path)?;
222        Ok(())
223    }
224}
225
226#[async_trait]
227impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
228    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
229        fedimint_core::runtime::block_in_place(|| {
230            let val = self.0.snapshot().get(key).unwrap();
231            self.0.put(key, value)?;
232            Ok(val)
233        })
234    }
235
236    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
237        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
238    }
239
240    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
241        fedimint_core::runtime::block_in_place(|| {
242            let val = self.0.snapshot().get(key).unwrap();
243            self.0.delete(key)?;
244            Ok(val)
245        })
246    }
247
248    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
249        Ok(fedimint_core::runtime::block_in_place(|| {
250            let prefix = key_prefix.to_vec();
251            let mut options = rocksdb::ReadOptions::default();
252            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
253            let iter = self.0.snapshot().iterator_opt(
254                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
255                options,
256            );
257            let rocksdb_iter = iter.map_while(move |res| {
258                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
259                key_bytes
260                    .starts_with(&prefix)
261                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
262            });
263            Box::pin(convert_to_async_stream(rocksdb_iter))
264        }))
265    }
266
267    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
268        Ok(fedimint_core::runtime::block_in_place(|| {
269            let range = Range {
270                start: range.start.to_vec(),
271                end: range.end.to_vec(),
272            };
273            let mut options = rocksdb::ReadOptions::default();
274            options.set_iterate_range(range.clone());
275            let iter = self.0.snapshot().iterator_opt(
276                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
277                options,
278            );
279            let rocksdb_iter = iter.map_while(move |res| {
280                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
281                (key_bytes.as_ref() < range.end.as_slice())
282                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
283            });
284            Box::pin(convert_to_async_stream(rocksdb_iter))
285        }))
286    }
287
288    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
289        fedimint_core::runtime::block_in_place(|| {
290            // Note: delete_range is not supported in Transactions :/
291            let mut options = rocksdb::ReadOptions::default();
292            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
293            let iter = self
294                .0
295                .snapshot()
296                .iterator_opt(
297                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
298                    options,
299                )
300                .map_while(|res| {
301                    res.map(|(key_bytes, _)| {
302                        key_bytes
303                            .starts_with(key_prefix)
304                            .then_some(key_bytes.to_vec())
305                    })
306                    .transpose()
307                });
308
309            for item in iter {
310                let key = item?;
311                self.0.delete(key)?;
312            }
313
314            Ok(())
315        })
316    }
317
318    async fn raw_find_by_prefix_sorted_descending(
319        &mut self,
320        key_prefix: &[u8],
321    ) -> Result<PrefixStream<'_>> {
322        let prefix = key_prefix.to_vec();
323        let next_prefix = next_prefix(&prefix);
324        let iterator_mode = if let Some(next_prefix) = &next_prefix {
325            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
326        } else {
327            rocksdb::IteratorMode::End
328        };
329        Ok(fedimint_core::runtime::block_in_place(|| {
330            let mut options = rocksdb::ReadOptions::default();
331            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
332            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
333            let rocksdb_iter = iter.map_while(move |res| {
334                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
335                key_bytes
336                    .starts_with(&prefix)
337                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
338            });
339            Box::pin(convert_to_async_stream(rocksdb_iter))
340        }))
341    }
342}
343
344#[async_trait]
345impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
346    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
347        Ok(fedimint_core::runtime::block_in_place(|| {
348            self.0.rollback_to_savepoint()
349        })?)
350    }
351
352    async fn set_tx_savepoint(&mut self) -> Result<()> {
353        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
354
355        Ok(())
356    }
357}
358
359#[async_trait]
360impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
361    async fn commit_tx(self) -> Result<()> {
362        fedimint_core::runtime::block_in_place(|| {
363            self.0.commit()?;
364            Ok(())
365        })
366    }
367}
368
369#[async_trait]
370impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
371    async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
372        panic!("Cannot insert into a read only transaction");
373    }
374
375    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
376        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
377    }
378
379    async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
380        panic!("Cannot remove from a read only transaction");
381    }
382
383    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
384        Ok(fedimint_core::runtime::block_in_place(|| {
385            let range = Range {
386                start: range.start.to_vec(),
387                end: range.end.to_vec(),
388            };
389            let mut options = rocksdb::ReadOptions::default();
390            options.set_iterate_range(range.clone());
391            let iter = self.0.snapshot().iterator_opt(
392                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
393                options,
394            );
395            let rocksdb_iter = iter.map_while(move |res| {
396                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
397                (key_bytes.as_ref() < range.end.as_slice())
398                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
399            });
400            Box::pin(convert_to_async_stream(rocksdb_iter))
401        }))
402    }
403
404    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
405        Ok(fedimint_core::runtime::block_in_place(|| {
406            let prefix = key_prefix.to_vec();
407            let mut options = rocksdb::ReadOptions::default();
408            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
409            let iter = self.0.snapshot().iterator_opt(
410                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
411                options,
412            );
413            let rocksdb_iter = iter.map_while(move |res| {
414                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
415                key_bytes
416                    .starts_with(&prefix)
417                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
418            });
419            Box::pin(convert_to_async_stream(rocksdb_iter))
420        }))
421    }
422
423    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
424        panic!("Cannot remove from a read only transaction");
425    }
426
427    async fn raw_find_by_prefix_sorted_descending(
428        &mut self,
429        key_prefix: &[u8],
430    ) -> Result<PrefixStream<'_>> {
431        let prefix = key_prefix.to_vec();
432        let next_prefix = next_prefix(&prefix);
433        let iterator_mode = if let Some(next_prefix) = &next_prefix {
434            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
435        } else {
436            rocksdb::IteratorMode::End
437        };
438        Ok(fedimint_core::runtime::block_in_place(|| {
439            let mut options = rocksdb::ReadOptions::default();
440            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
441            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
442            let rocksdb_iter = iter.map_while(move |res| {
443                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
444                key_bytes
445                    .starts_with(&prefix)
446                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
447            });
448            Box::pin(stream::iter(rocksdb_iter))
449        }))
450    }
451}
452
453#[async_trait]
454impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
455    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
456        panic!("Cannot rollback a read only transaction");
457    }
458
459    async fn set_tx_savepoint(&mut self) -> Result<()> {
460        panic!("Cannot set a savepoint in a read only transaction");
461    }
462}
463
464#[async_trait]
465impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
466    async fn commit_tx(self) -> Result<()> {
467        panic!("Cannot commit a read only transaction");
468    }
469}
470
471#[cfg(test)]
472mod fedimint_rocksdb_tests {
473    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
474    use fedimint_core::encoding::{Decodable, Encodable};
475    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
476    use fedimint_core::{impl_db_lookup, impl_db_record};
477    use futures::StreamExt;
478
479    use super::*;
480
481    fn open_temp_db(temp_path: &str) -> Database {
482        let path = tempfile::Builder::new()
483            .prefix(temp_path)
484            .tempdir()
485            .unwrap();
486
487        Database::new(
488            RocksDb::open_blocking(path.as_ref()).unwrap(),
489            ModuleDecoderRegistry::default(),
490        )
491    }
492
493    #[tokio::test(flavor = "multi_thread")]
494    async fn test_dbtx_insert_elements() {
495        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
496            .await;
497    }
498
499    #[tokio::test(flavor = "multi_thread")]
500    async fn test_dbtx_remove_nonexisting() {
501        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
502            "fcb-rocksdb-test-remove-nonexisting",
503        ))
504        .await;
505    }
506
507    #[tokio::test(flavor = "multi_thread")]
508    async fn test_dbtx_remove_existing() {
509        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
510            .await;
511    }
512
513    #[tokio::test(flavor = "multi_thread")]
514    async fn test_dbtx_read_own_writes() {
515        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
516            .await;
517    }
518
519    #[tokio::test(flavor = "multi_thread")]
520    async fn test_dbtx_prevent_dirty_reads() {
521        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
522            "fcb-rocksdb-test-prevent-dirty-reads",
523        ))
524        .await;
525    }
526
527    #[tokio::test(flavor = "multi_thread")]
528    async fn test_dbtx_find_by_range() {
529        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
530            .await;
531    }
532
533    #[tokio::test(flavor = "multi_thread")]
534    async fn test_dbtx_find_by_prefix() {
535        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
536            .await;
537    }
538
539    #[tokio::test(flavor = "multi_thread")]
540    async fn test_dbtx_commit() {
541        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
542    }
543
544    #[tokio::test(flavor = "multi_thread")]
545    async fn test_dbtx_prevent_nonrepeatable_reads() {
546        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
547            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
548        ))
549        .await;
550    }
551
552    #[tokio::test(flavor = "multi_thread")]
553    async fn test_dbtx_snapshot_isolation() {
554        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
555            "fcb-rocksdb-test-snapshot-isolation",
556        ))
557        .await;
558    }
559
560    #[tokio::test(flavor = "multi_thread")]
561    async fn test_dbtx_rollback_to_savepoint() {
562        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
563            "fcb-rocksdb-test-rollback-to-savepoint",
564        ))
565        .await;
566    }
567
568    #[tokio::test(flavor = "multi_thread")]
569    async fn test_dbtx_phantom_entry() {
570        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
571            .await;
572    }
573
574    #[tokio::test(flavor = "multi_thread")]
575    async fn test_dbtx_write_conflict() {
576        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
577            .await;
578    }
579
580    #[tokio::test(flavor = "multi_thread")]
581    async fn test_dbtx_remove_by_prefix() {
582        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
583            "fcb-rocksdb-test-remove-by-prefix",
584        ))
585        .await;
586    }
587
588    #[tokio::test(flavor = "multi_thread")]
589    async fn test_module_dbtx() {
590        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
591            .await;
592    }
593
594    #[tokio::test(flavor = "multi_thread")]
595    async fn test_module_db() {
596        let module_instance_id = 1;
597        let path = tempfile::Builder::new()
598            .prefix("fcb-rocksdb-test-module-db-prefix")
599            .tempdir()
600            .unwrap();
601
602        let module_db = Database::new(
603            RocksDb::open_blocking(path.as_ref()).unwrap(),
604            ModuleDecoderRegistry::default(),
605        );
606
607        fedimint_core::db::verify_module_db(
608            open_temp_db("fcb-rocksdb-test-module-db"),
609            module_db.with_prefix_module_id(module_instance_id).0,
610        )
611        .await;
612    }
613
614    #[test]
615    fn test_next_prefix() {
616        // Note: although we are testing the general case of a vector with N elements,
617        // the prefixes currently use N = 1
618        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
619        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
620        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
621        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
622        // this is a "max" prefix
623        assert!(next_prefix(&[255, 255, 255]).is_none());
624        // these are the common case
625        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
626        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
627        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
628    }
629
630    #[repr(u8)]
631    #[derive(Clone)]
632    pub enum TestDbKeyPrefix {
633        Test = 254,
634        MaxTest = 255,
635    }
636
637    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
638    pub(super) struct TestKey(pub Vec<u8>);
639
640    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
641    pub(super) struct TestVal(pub Vec<u8>);
642
643    #[derive(Debug, Encodable, Decodable)]
644    struct DbPrefixTestPrefix;
645
646    impl_db_record!(
647        key = TestKey,
648        value = TestVal,
649        db_prefix = TestDbKeyPrefix::Test,
650        notify_on_modify = true,
651    );
652    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
653
654    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
655    pub(super) struct TestKey2(pub Vec<u8>);
656
657    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
658    pub(super) struct TestVal2(pub Vec<u8>);
659
660    #[derive(Debug, Encodable, Decodable)]
661    struct DbPrefixTestPrefixMax;
662
663    impl_db_record!(
664        key = TestKey2,
665        value = TestVal2,
666        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
667        notify_on_modify = true,
668    );
669    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
670
671    #[tokio::test(flavor = "multi_thread")]
672    async fn test_retrieve_descending_order() {
673        let path = tempfile::Builder::new()
674            .prefix("fcb-rocksdb-test-descending-order")
675            .tempdir()
676            .unwrap();
677        {
678            let db = Database::new(
679                RocksDb::open(&path).await.unwrap(),
680                ModuleDecoderRegistry::default(),
681            );
682            let mut dbtx = db.begin_transaction().await;
683            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
684                .await;
685            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
686                .await;
687            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
688                .await;
689            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
690                .await;
691            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
692                .await;
693            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
694                .await;
695            let query = dbtx
696                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
697                .await
698                .collect::<Vec<_>>()
699                .await;
700            assert_eq!(
701                query,
702                vec![
703                    (TestKey(vec![255]), TestVal(vec![2])),
704                    (TestKey(vec![254]), TestVal(vec![1])),
705                    (TestKey(vec![0]), TestVal(vec![3]))
706                ]
707            );
708            let query = dbtx
709                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
710                .await
711                .collect::<Vec<_>>()
712                .await;
713            assert_eq!(
714                query,
715                vec![
716                    (TestKey2(vec![255]), TestVal2(vec![2])),
717                    (TestKey2(vec![254]), TestVal2(vec![1])),
718                    (TestKey2(vec![0]), TestVal2(vec![3]))
719                ]
720            );
721            dbtx.commit_tx().await;
722        }
723        // Test readonly implementation
724        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
725        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
726        let mut dbtx = db_readonly.begin_transaction_nc().await;
727        let query = dbtx
728            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
729            .await
730            .collect::<Vec<_>>()
731            .await;
732        assert_eq!(
733            query,
734            vec![
735                (TestKey(vec![255]), TestVal(vec![2])),
736                (TestKey(vec![254]), TestVal(vec![1])),
737                (TestKey(vec![0]), TestVal(vec![3]))
738            ]
739        );
740        let query = dbtx
741            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
742            .await
743            .collect::<Vec<_>>()
744            .await;
745        assert_eq!(
746            query,
747            vec![
748                (TestKey2(vec![255]), TestVal2(vec![2])),
749                (TestKey2(vec![254]), TestVal2(vec![1])),
750                (TestKey2(vec![0]), TestVal2(vec![3]))
751            ]
752        );
753    }
754}