fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod db_locked;
7pub mod envs;
8
9use std::fmt;
10use std::ops::Range;
11use std::path::Path;
12use std::str::FromStr;
13
14use anyhow::{Context, Result, bail};
15use async_trait::async_trait;
16use db_locked::{Locked, LockedBuilder};
17use fedimint_core::db::{
18    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
19    PrefixStream,
20};
21use fedimint_core::task::block_in_place;
22use futures::stream;
23pub use rocksdb;
24use rocksdb::{
25    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
26};
27use tracing::debug;
28
29use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
30
31// turn an `iter` into a `Stream` where every `next` is ran inside
32// `block_in_place` to offload the blocking calls
33fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
34where
35    I: Iterator + Send + 'i,
36    I::Item: Send,
37{
38    stream::unfold(iter, |mut iter| async {
39        fedimint_core::runtime::block_in_place(|| {
40            let item = iter.next();
41            item.map(|item| (item, iter))
42        })
43    })
44}
45
46#[derive(Debug)]
47pub struct RocksDb(rocksdb::OptimisticTransactionDB);
48
49pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
50
51impl RocksDb {
52    #[allow(clippy::unused_async)]
53    pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
54        let db_path = db_path.as_ref();
55
56        block_in_place(|| Self::open_blocking(db_path))
57    }
58
59    pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
60        block_in_place(|| {
61            std::fs::create_dir_all(
62                db_path
63                    .parent()
64                    .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
65            )?;
66            LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
67        })
68    }
69
70    pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
71        let mut opts = get_default_options()?;
72        // Since we turned synchronous writes one we should never encounter a corrupted
73        // WAL and should rather fail in this case
74        opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
75        let db: rocksdb::OptimisticTransactionDB =
76            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
77        Ok(RocksDb(db))
78    }
79
80    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
81        &self.0
82    }
83}
84
85// TODO: Remove this and inline it in the places where it's used.
86fn is_power_of_two(num: usize) -> bool {
87    num.is_power_of_two()
88}
89
90impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.write_str("RocksDbTransaction")
93    }
94}
95
96impl fmt::Debug for RocksDbTransaction<'_> {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.write_str("RocksDbTransaction")
99    }
100}
101
102#[test]
103fn is_power_of_two_sanity() {
104    assert!(!is_power_of_two(0));
105    assert!(is_power_of_two(1));
106    assert!(is_power_of_two(2));
107    assert!(!is_power_of_two(3));
108    assert!(is_power_of_two(4));
109    assert!(!is_power_of_two(5));
110    assert!(is_power_of_two(2 << 10));
111    assert!(!is_power_of_two((2 << 10) + 1));
112}
113
114fn get_default_options() -> anyhow::Result<rocksdb::Options> {
115    let mut opts = rocksdb::Options::default();
116    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
117        debug!(var, "Using custom write buffer size");
118        let size: usize = FromStr::from_str(&var)
119            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
120        if !is_power_of_two(size) {
121            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
122        }
123        opts.set_write_buffer_size(size);
124    }
125    opts.create_if_missing(true);
126    Ok(opts)
127}
128
129#[derive(Debug)]
130pub struct RocksDbReadOnly(rocksdb::DB);
131
132pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
133
134impl RocksDbReadOnly {
135    #[allow(clippy::unused_async)]
136    pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
137        let db_path = db_path.as_ref();
138        block_in_place(|| Self::open_read_only_blocking(db_path))
139    }
140
141    pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
142        let opts = get_default_options()?;
143        // Note: rocksdb is OK if one process has write access, and other read-access
144        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
145        Ok(RocksDbReadOnly(db))
146    }
147}
148
149impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
150    fn from(db: OptimisticTransactionDB) -> Self {
151        RocksDb(db)
152    }
153}
154
155impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
156    fn from(db: RocksDb) -> Self {
157        db.0
158    }
159}
160
161// When finding by prefix iterating in Reverse order, we need to start from
162// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
163// below.
164// Will return None if there is no next prefix (i.e prefix is already the last
165// possible/max one)
166fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
167    let mut next_prefix = prefix.to_vec();
168    let mut is_last_prefix = true;
169    for i in (0..next_prefix.len()).rev() {
170        next_prefix[i] = next_prefix[i].wrapping_add(1);
171        if next_prefix[i] > 0 {
172            is_last_prefix = false;
173            break;
174        }
175    }
176    if is_last_prefix {
177        // The given prefix is already the last/max prefix, so there is no next prefix,
178        // return None to represent that
179        None
180    } else {
181        Some(next_prefix)
182    }
183}
184
185#[async_trait]
186impl IRawDatabase for RocksDb {
187    type Transaction<'a> = RocksDbTransaction<'a>;
188    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
189        let mut optimistic_options = OptimisticTransactionOptions::default();
190        optimistic_options.set_snapshot(true);
191
192        let mut write_options = WriteOptions::default();
193        // Make sure we never lose data on unclean shutdown
194        write_options.set_sync(true);
195
196        let mut rocksdb_tx =
197            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
198        rocksdb_tx
199            .set_tx_savepoint()
200            .await
201            .expect("setting tx savepoint failed");
202
203        rocksdb_tx
204    }
205
206    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
207        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
208        checkpoint.create_checkpoint(backup_path)?;
209        Ok(())
210    }
211}
212
213#[async_trait]
214impl IRawDatabase for RocksDbReadOnly {
215    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
216    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
217        RocksDbReadOnlyTransaction(&self.0)
218    }
219
220    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
221        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
222        checkpoint.create_checkpoint(backup_path)?;
223        Ok(())
224    }
225}
226
227#[async_trait]
228impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
229    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
230        fedimint_core::runtime::block_in_place(|| {
231            let val = self.0.snapshot().get(key).unwrap();
232            self.0.put(key, value)?;
233            Ok(val)
234        })
235    }
236
237    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
238        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
239    }
240
241    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
242        fedimint_core::runtime::block_in_place(|| {
243            let val = self.0.snapshot().get(key).unwrap();
244            self.0.delete(key)?;
245            Ok(val)
246        })
247    }
248
249    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
250        Ok(fedimint_core::runtime::block_in_place(|| {
251            let prefix = key_prefix.to_vec();
252            let mut options = rocksdb::ReadOptions::default();
253            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
254            let iter = self.0.snapshot().iterator_opt(
255                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
256                options,
257            );
258            let rocksdb_iter = iter.map_while(move |res| {
259                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
260                key_bytes
261                    .starts_with(&prefix)
262                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
263            });
264            Box::pin(convert_to_async_stream(rocksdb_iter))
265        }))
266    }
267
268    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
269        Ok(fedimint_core::runtime::block_in_place(|| {
270            let range = Range {
271                start: range.start.to_vec(),
272                end: range.end.to_vec(),
273            };
274            let mut options = rocksdb::ReadOptions::default();
275            options.set_iterate_range(range.clone());
276            let iter = self.0.snapshot().iterator_opt(
277                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
278                options,
279            );
280            let rocksdb_iter = iter.map_while(move |res| {
281                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
282                (key_bytes.as_ref() < range.end.as_slice())
283                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
284            });
285            Box::pin(convert_to_async_stream(rocksdb_iter))
286        }))
287    }
288
289    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
290        fedimint_core::runtime::block_in_place(|| {
291            // Note: delete_range is not supported in Transactions :/
292            let mut options = rocksdb::ReadOptions::default();
293            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
294            let iter = self
295                .0
296                .snapshot()
297                .iterator_opt(
298                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
299                    options,
300                )
301                .map_while(|res| {
302                    res.map(|(key_bytes, _)| {
303                        key_bytes
304                            .starts_with(key_prefix)
305                            .then_some(key_bytes.to_vec())
306                    })
307                    .transpose()
308                });
309
310            for item in iter {
311                let key = item?;
312                self.0.delete(key)?;
313            }
314
315            Ok(())
316        })
317    }
318
319    async fn raw_find_by_prefix_sorted_descending(
320        &mut self,
321        key_prefix: &[u8],
322    ) -> Result<PrefixStream<'_>> {
323        let prefix = key_prefix.to_vec();
324        let next_prefix = next_prefix(&prefix);
325        let iterator_mode = if let Some(next_prefix) = &next_prefix {
326            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
327        } else {
328            rocksdb::IteratorMode::End
329        };
330        Ok(fedimint_core::runtime::block_in_place(|| {
331            let mut options = rocksdb::ReadOptions::default();
332            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
333            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
334            let rocksdb_iter = iter.map_while(move |res| {
335                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
336                key_bytes
337                    .starts_with(&prefix)
338                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
339            });
340            Box::pin(convert_to_async_stream(rocksdb_iter))
341        }))
342    }
343}
344
345#[async_trait]
346impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
347    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
348        Ok(fedimint_core::runtime::block_in_place(|| {
349            self.0.rollback_to_savepoint()
350        })?)
351    }
352
353    async fn set_tx_savepoint(&mut self) -> Result<()> {
354        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
355
356        Ok(())
357    }
358}
359
360#[async_trait]
361impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
362    async fn commit_tx(self) -> Result<()> {
363        fedimint_core::runtime::block_in_place(|| {
364            self.0.commit()?;
365            Ok(())
366        })
367    }
368}
369
370#[async_trait]
371impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
372    async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
373        panic!("Cannot insert into a read only transaction");
374    }
375
376    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
377        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
378    }
379
380    async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
381        panic!("Cannot remove from a read only transaction");
382    }
383
384    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
385        Ok(fedimint_core::runtime::block_in_place(|| {
386            let range = Range {
387                start: range.start.to_vec(),
388                end: range.end.to_vec(),
389            };
390            let mut options = rocksdb::ReadOptions::default();
391            options.set_iterate_range(range.clone());
392            let iter = self.0.snapshot().iterator_opt(
393                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
394                options,
395            );
396            let rocksdb_iter = iter.map_while(move |res| {
397                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
398                (key_bytes.as_ref() < range.end.as_slice())
399                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
400            });
401            Box::pin(convert_to_async_stream(rocksdb_iter))
402        }))
403    }
404
405    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
406        Ok(fedimint_core::runtime::block_in_place(|| {
407            let prefix = key_prefix.to_vec();
408            let mut options = rocksdb::ReadOptions::default();
409            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
410            let iter = self.0.snapshot().iterator_opt(
411                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
412                options,
413            );
414            let rocksdb_iter = iter.map_while(move |res| {
415                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
416                key_bytes
417                    .starts_with(&prefix)
418                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
419            });
420            Box::pin(convert_to_async_stream(rocksdb_iter))
421        }))
422    }
423
424    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
425        panic!("Cannot remove from a read only transaction");
426    }
427
428    async fn raw_find_by_prefix_sorted_descending(
429        &mut self,
430        key_prefix: &[u8],
431    ) -> Result<PrefixStream<'_>> {
432        let prefix = key_prefix.to_vec();
433        let next_prefix = next_prefix(&prefix);
434        let iterator_mode = if let Some(next_prefix) = &next_prefix {
435            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
436        } else {
437            rocksdb::IteratorMode::End
438        };
439        Ok(fedimint_core::runtime::block_in_place(|| {
440            let mut options = rocksdb::ReadOptions::default();
441            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
442            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
443            let rocksdb_iter = iter.map_while(move |res| {
444                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
445                key_bytes
446                    .starts_with(&prefix)
447                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
448            });
449            Box::pin(stream::iter(rocksdb_iter))
450        }))
451    }
452}
453
454#[async_trait]
455impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
456    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
457        panic!("Cannot rollback a read only transaction");
458    }
459
460    async fn set_tx_savepoint(&mut self) -> Result<()> {
461        panic!("Cannot set a savepoint in a read only transaction");
462    }
463}
464
465#[async_trait]
466impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
467    async fn commit_tx(self) -> Result<()> {
468        panic!("Cannot commit a read only transaction");
469    }
470}
471
472#[cfg(test)]
473mod fedimint_rocksdb_tests {
474    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
475    use fedimint_core::encoding::{Decodable, Encodable};
476    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
477    use fedimint_core::{impl_db_lookup, impl_db_record};
478    use futures::StreamExt;
479
480    use super::*;
481
482    fn open_temp_db(temp_path: &str) -> Database {
483        let path = tempfile::Builder::new()
484            .prefix(temp_path)
485            .tempdir()
486            .unwrap();
487
488        Database::new(
489            RocksDb::open_blocking(path.as_ref()).unwrap(),
490            ModuleDecoderRegistry::default(),
491        )
492    }
493
494    #[tokio::test(flavor = "multi_thread")]
495    async fn test_dbtx_insert_elements() {
496        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
497            .await;
498    }
499
500    #[tokio::test(flavor = "multi_thread")]
501    async fn test_dbtx_remove_nonexisting() {
502        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
503            "fcb-rocksdb-test-remove-nonexisting",
504        ))
505        .await;
506    }
507
508    #[tokio::test(flavor = "multi_thread")]
509    async fn test_dbtx_remove_existing() {
510        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
511            .await;
512    }
513
514    #[tokio::test(flavor = "multi_thread")]
515    async fn test_dbtx_read_own_writes() {
516        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
517            .await;
518    }
519
520    #[tokio::test(flavor = "multi_thread")]
521    async fn test_dbtx_prevent_dirty_reads() {
522        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
523            "fcb-rocksdb-test-prevent-dirty-reads",
524        ))
525        .await;
526    }
527
528    #[tokio::test(flavor = "multi_thread")]
529    async fn test_dbtx_find_by_range() {
530        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
531            .await;
532    }
533
534    #[tokio::test(flavor = "multi_thread")]
535    async fn test_dbtx_find_by_prefix() {
536        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
537            .await;
538    }
539
540    #[tokio::test(flavor = "multi_thread")]
541    async fn test_dbtx_commit() {
542        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
543    }
544
545    #[tokio::test(flavor = "multi_thread")]
546    async fn test_dbtx_prevent_nonrepeatable_reads() {
547        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
548            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
549        ))
550        .await;
551    }
552
553    #[tokio::test(flavor = "multi_thread")]
554    async fn test_dbtx_snapshot_isolation() {
555        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
556            "fcb-rocksdb-test-snapshot-isolation",
557        ))
558        .await;
559    }
560
561    #[tokio::test(flavor = "multi_thread")]
562    async fn test_dbtx_rollback_to_savepoint() {
563        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
564            "fcb-rocksdb-test-rollback-to-savepoint",
565        ))
566        .await;
567    }
568
569    #[tokio::test(flavor = "multi_thread")]
570    async fn test_dbtx_phantom_entry() {
571        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
572            .await;
573    }
574
575    #[tokio::test(flavor = "multi_thread")]
576    async fn test_dbtx_write_conflict() {
577        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
578            .await;
579    }
580
581    #[tokio::test(flavor = "multi_thread")]
582    async fn test_dbtx_remove_by_prefix() {
583        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
584            "fcb-rocksdb-test-remove-by-prefix",
585        ))
586        .await;
587    }
588
589    #[tokio::test(flavor = "multi_thread")]
590    async fn test_module_dbtx() {
591        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
592            .await;
593    }
594
595    #[tokio::test(flavor = "multi_thread")]
596    async fn test_module_db() {
597        let module_instance_id = 1;
598        let path = tempfile::Builder::new()
599            .prefix("fcb-rocksdb-test-module-db-prefix")
600            .tempdir()
601            .unwrap();
602
603        let module_db = Database::new(
604            RocksDb::open_blocking(path.as_ref()).unwrap(),
605            ModuleDecoderRegistry::default(),
606        );
607
608        fedimint_core::db::verify_module_db(
609            open_temp_db("fcb-rocksdb-test-module-db"),
610            module_db.with_prefix_module_id(module_instance_id).0,
611        )
612        .await;
613    }
614
615    #[test]
616    fn test_next_prefix() {
617        // Note: although we are testing the general case of a vector with N elements,
618        // the prefixes currently use N = 1
619        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
620        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
621        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
622        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
623        // this is a "max" prefix
624        assert!(next_prefix(&[255, 255, 255]).is_none());
625        // these are the common case
626        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
627        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
628        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
629    }
630
631    #[repr(u8)]
632    #[derive(Clone)]
633    pub enum TestDbKeyPrefix {
634        Test = 254,
635        MaxTest = 255,
636    }
637
638    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
639    pub(super) struct TestKey(pub Vec<u8>);
640
641    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
642    pub(super) struct TestVal(pub Vec<u8>);
643
644    #[derive(Debug, Encodable, Decodable)]
645    struct DbPrefixTestPrefix;
646
647    impl_db_record!(
648        key = TestKey,
649        value = TestVal,
650        db_prefix = TestDbKeyPrefix::Test,
651        notify_on_modify = true,
652    );
653    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
654
655    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
656    pub(super) struct TestKey2(pub Vec<u8>);
657
658    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
659    pub(super) struct TestVal2(pub Vec<u8>);
660
661    #[derive(Debug, Encodable, Decodable)]
662    struct DbPrefixTestPrefixMax;
663
664    impl_db_record!(
665        key = TestKey2,
666        value = TestVal2,
667        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
668        notify_on_modify = true,
669    );
670    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
671
672    #[tokio::test(flavor = "multi_thread")]
673    async fn test_retrieve_descending_order() {
674        let path = tempfile::Builder::new()
675            .prefix("fcb-rocksdb-test-descending-order")
676            .tempdir()
677            .unwrap();
678        {
679            let db = Database::new(
680                RocksDb::open(&path).await.unwrap(),
681                ModuleDecoderRegistry::default(),
682            );
683            let mut dbtx = db.begin_transaction().await;
684            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
685                .await;
686            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
687                .await;
688            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
689                .await;
690            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
691                .await;
692            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
693                .await;
694            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
695                .await;
696            let query = dbtx
697                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
698                .await
699                .collect::<Vec<_>>()
700                .await;
701            assert_eq!(
702                query,
703                vec![
704                    (TestKey(vec![255]), TestVal(vec![2])),
705                    (TestKey(vec![254]), TestVal(vec![1])),
706                    (TestKey(vec![0]), TestVal(vec![3]))
707                ]
708            );
709            let query = dbtx
710                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
711                .await
712                .collect::<Vec<_>>()
713                .await;
714            assert_eq!(
715                query,
716                vec![
717                    (TestKey2(vec![255]), TestVal2(vec![2])),
718                    (TestKey2(vec![254]), TestVal2(vec![1])),
719                    (TestKey2(vec![0]), TestVal2(vec![3]))
720                ]
721            );
722            dbtx.commit_tx().await;
723        }
724        // Test readonly implementation
725        let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
726        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
727        let mut dbtx = db_readonly.begin_transaction_nc().await;
728        let query = dbtx
729            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
730            .await
731            .collect::<Vec<_>>()
732            .await;
733        assert_eq!(
734            query,
735            vec![
736                (TestKey(vec![255]), TestVal(vec![2])),
737                (TestKey(vec![254]), TestVal(vec![1])),
738                (TestKey(vec![0]), TestVal(vec![3]))
739            ]
740        );
741        let query = dbtx
742            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
743            .await
744            .collect::<Vec<_>>()
745            .await;
746        assert_eq!(
747            query,
748            vec![
749                (TestKey2(vec![255]), TestVal2(vec![2])),
750                (TestKey2(vec![254]), TestVal2(vec![1])),
751                (TestKey2(vec![0]), TestVal2(vec![3]))
752            ]
753        );
754    }
755}