fedimint_rocksdb/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
17    PrefixStream,
18};
19use futures::stream;
20pub use rocksdb;
21use rocksdb::{
22    DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
23};
24use tracing::debug;
25
26use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
27
28// turn an `iter` into a `Stream` where every `next` is ran inside
29// `block_in_place` to offload the blocking calls
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
31where
32    I: Iterator + Send + 'i,
33    I::Item: Send,
34{
35    stream::unfold(iter, |mut iter| async {
36        fedimint_core::runtime::block_in_place(|| {
37            let item = iter.next();
38            item.map(|item| (item, iter))
39        })
40    })
41}
42
43#[derive(Debug)]
44pub struct RocksDb(rocksdb::OptimisticTransactionDB);
45
46pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
47
48impl RocksDb {
49    pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> {
50        let mut opts = get_default_options()?;
51        // Since we turned synchronous writes one we should never encounter a corrupted
52        // WAL and should rather fail in this case
53        opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
54        let db: rocksdb::OptimisticTransactionDB =
55            rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?;
56        Ok(RocksDb(db))
57    }
58
59    pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
60        &self.0
61    }
62}
63
64// TODO: Remove this and inline it in the places where it's used.
65fn is_power_of_two(num: usize) -> bool {
66    num.is_power_of_two()
67}
68
69impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.write_str("RocksDbTransaction")
72    }
73}
74
75impl<'a> fmt::Debug for RocksDbTransaction<'a> {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.write_str("RocksDbTransaction")
78    }
79}
80
81#[test]
82fn is_power_of_two_sanity() {
83    assert!(!is_power_of_two(0));
84    assert!(is_power_of_two(1));
85    assert!(is_power_of_two(2));
86    assert!(!is_power_of_two(3));
87    assert!(is_power_of_two(4));
88    assert!(!is_power_of_two(5));
89    assert!(is_power_of_two(2 << 10));
90    assert!(!is_power_of_two((2 << 10) + 1));
91}
92
93fn get_default_options() -> anyhow::Result<rocksdb::Options> {
94    let mut opts = rocksdb::Options::default();
95    if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
96        debug!(var, "Using custom write buffer size");
97        let size: usize = FromStr::from_str(&var)
98            .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
99        if !is_power_of_two(size) {
100            bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
101        }
102        opts.set_write_buffer_size(size);
103    }
104    opts.create_if_missing(true);
105    Ok(opts)
106}
107
108#[derive(Debug)]
109pub struct RocksDbReadOnly(rocksdb::DB);
110
111pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
112
113impl RocksDbReadOnly {
114    pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
115        let opts = get_default_options()?;
116        let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
117        Ok(RocksDbReadOnly(db))
118    }
119}
120
121impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
122    fn from(db: OptimisticTransactionDB) -> Self {
123        RocksDb(db)
124    }
125}
126
127impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
128    fn from(db: RocksDb) -> Self {
129        db.0
130    }
131}
132
133// When finding by prefix iterating in Reverse order, we need to start from
134// "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
135// below.
136// Will return None if there is no next prefix (i.e prefix is already the last
137// possible/max one)
138fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
139    let mut next_prefix = prefix.to_vec();
140    let mut is_last_prefix = true;
141    for i in (0..next_prefix.len()).rev() {
142        next_prefix[i] = next_prefix[i].wrapping_add(1);
143        if next_prefix[i] > 0 {
144            is_last_prefix = false;
145            break;
146        }
147    }
148    if is_last_prefix {
149        // The given prefix is already the last/max prefix, so there is no next prefix,
150        // return None to represent that
151        None
152    } else {
153        Some(next_prefix)
154    }
155}
156
157#[async_trait]
158impl IRawDatabase for RocksDb {
159    type Transaction<'a> = RocksDbTransaction<'a>;
160    async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
161        let mut optimistic_options = OptimisticTransactionOptions::default();
162        optimistic_options.set_snapshot(true);
163
164        let mut write_options = WriteOptions::default();
165        // Make sure we never lose data on unclean shutdown
166        write_options.set_sync(true);
167
168        let mut rocksdb_tx =
169            RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
170        rocksdb_tx
171            .set_tx_savepoint()
172            .await
173            .expect("setting tx savepoint failed");
174
175        rocksdb_tx
176    }
177
178    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
179        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
180        checkpoint.create_checkpoint(backup_path)?;
181        Ok(())
182    }
183}
184
185#[async_trait]
186impl IRawDatabase for RocksDbReadOnly {
187    type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
188    async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
189        RocksDbReadOnlyTransaction(&self.0)
190    }
191
192    fn checkpoint(&self, backup_path: &Path) -> Result<()> {
193        let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
194        checkpoint.create_checkpoint(backup_path)?;
195        Ok(())
196    }
197}
198
199#[async_trait]
200impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
201    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
202        fedimint_core::runtime::block_in_place(|| {
203            let val = self.0.snapshot().get(key).unwrap();
204            self.0.put(key, value)?;
205            Ok(val)
206        })
207    }
208
209    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
210        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
211    }
212
213    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
214        fedimint_core::runtime::block_in_place(|| {
215            let val = self.0.snapshot().get(key).unwrap();
216            self.0.delete(key)?;
217            Ok(val)
218        })
219    }
220
221    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
222        Ok(fedimint_core::runtime::block_in_place(|| {
223            let prefix = key_prefix.to_vec();
224            let mut options = rocksdb::ReadOptions::default();
225            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
226            let iter = self.0.snapshot().iterator_opt(
227                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
228                options,
229            );
230            let rocksdb_iter = iter.map_while(move |res| {
231                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
232                key_bytes
233                    .starts_with(&prefix)
234                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
235            });
236            Box::pin(convert_to_async_stream(rocksdb_iter))
237        }))
238    }
239
240    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
241        Ok(fedimint_core::runtime::block_in_place(|| {
242            let range = Range {
243                start: range.start.to_vec(),
244                end: range.end.to_vec(),
245            };
246            let mut options = rocksdb::ReadOptions::default();
247            options.set_iterate_range(range.clone());
248            let iter = self.0.snapshot().iterator_opt(
249                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
250                options,
251            );
252            let rocksdb_iter = iter.map_while(move |res| {
253                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
254                (key_bytes.as_ref() < range.end.as_slice())
255                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
256            });
257            Box::pin(convert_to_async_stream(rocksdb_iter))
258        }))
259    }
260
261    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
262        fedimint_core::runtime::block_in_place(|| {
263            // Note: delete_range is not supported in Transactions :/
264            let mut options = rocksdb::ReadOptions::default();
265            options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
266            let iter = self
267                .0
268                .snapshot()
269                .iterator_opt(
270                    rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
271                    options,
272                )
273                .map_while(|res| {
274                    res.map(|(key_bytes, _)| {
275                        key_bytes
276                            .starts_with(key_prefix)
277                            .then_some(key_bytes.to_vec())
278                    })
279                    .transpose()
280                });
281
282            for item in iter {
283                let key = item?;
284                self.0.delete(key)?;
285            }
286
287            Ok(())
288        })
289    }
290
291    async fn raw_find_by_prefix_sorted_descending(
292        &mut self,
293        key_prefix: &[u8],
294    ) -> Result<PrefixStream<'_>> {
295        let prefix = key_prefix.to_vec();
296        let next_prefix = next_prefix(&prefix);
297        let iterator_mode = if let Some(next_prefix) = &next_prefix {
298            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
299        } else {
300            rocksdb::IteratorMode::End
301        };
302        Ok(fedimint_core::runtime::block_in_place(|| {
303            let mut options = rocksdb::ReadOptions::default();
304            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
305            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
306            let rocksdb_iter = iter.map_while(move |res| {
307                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
308                key_bytes
309                    .starts_with(&prefix)
310                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
311            });
312            Box::pin(convert_to_async_stream(rocksdb_iter))
313        }))
314    }
315}
316
317#[async_trait]
318impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
319    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
320        Ok(fedimint_core::runtime::block_in_place(|| {
321            self.0.rollback_to_savepoint()
322        })?)
323    }
324
325    async fn set_tx_savepoint(&mut self) -> Result<()> {
326        fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
327
328        Ok(())
329    }
330}
331
332#[async_trait]
333impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
334    async fn commit_tx(self) -> Result<()> {
335        fedimint_core::runtime::block_in_place(|| {
336            self.0.commit()?;
337            Ok(())
338        })
339    }
340}
341
342#[async_trait]
343impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
344    async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
345        panic!("Cannot insert into a read only transaction");
346    }
347
348    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
349        fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
350    }
351
352    async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
353        panic!("Cannot remove from a read only transaction");
354    }
355
356    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
357        Ok(fedimint_core::runtime::block_in_place(|| {
358            let range = Range {
359                start: range.start.to_vec(),
360                end: range.end.to_vec(),
361            };
362            let mut options = rocksdb::ReadOptions::default();
363            options.set_iterate_range(range.clone());
364            let iter = self.0.snapshot().iterator_opt(
365                rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
366                options,
367            );
368            let rocksdb_iter = iter.map_while(move |res| {
369                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
370                (key_bytes.as_ref() < range.end.as_slice())
371                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
372            });
373            Box::pin(convert_to_async_stream(rocksdb_iter))
374        }))
375    }
376
377    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
378        Ok(fedimint_core::runtime::block_in_place(|| {
379            let prefix = key_prefix.to_vec();
380            let mut options = rocksdb::ReadOptions::default();
381            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
382            let iter = self.0.snapshot().iterator_opt(
383                rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
384                options,
385            );
386            let rocksdb_iter = iter.map_while(move |res| {
387                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
388                key_bytes
389                    .starts_with(&prefix)
390                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
391            });
392            Box::pin(convert_to_async_stream(rocksdb_iter))
393        }))
394    }
395
396    async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
397        panic!("Cannot remove from a read only transaction");
398    }
399
400    async fn raw_find_by_prefix_sorted_descending(
401        &mut self,
402        key_prefix: &[u8],
403    ) -> Result<PrefixStream<'_>> {
404        let prefix = key_prefix.to_vec();
405        let next_prefix = next_prefix(&prefix);
406        let iterator_mode = if let Some(next_prefix) = &next_prefix {
407            rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
408        } else {
409            rocksdb::IteratorMode::End
410        };
411        Ok(fedimint_core::runtime::block_in_place(|| {
412            let mut options = rocksdb::ReadOptions::default();
413            options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
414            let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
415            let rocksdb_iter = iter.map_while(move |res| {
416                let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
417                key_bytes
418                    .starts_with(&prefix)
419                    .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
420            });
421            Box::pin(stream::iter(rocksdb_iter))
422        }))
423    }
424}
425
426#[async_trait]
427impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
428    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
429        panic!("Cannot rollback a read only transaction");
430    }
431
432    async fn set_tx_savepoint(&mut self) -> Result<()> {
433        panic!("Cannot set a savepoint in a read only transaction");
434    }
435}
436
437#[async_trait]
438impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
439    async fn commit_tx(self) -> Result<()> {
440        panic!("Cannot commit a read only transaction");
441    }
442}
443
444#[cfg(test)]
445mod fedimint_rocksdb_tests {
446    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
447    use fedimint_core::encoding::{Decodable, Encodable};
448    use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
449    use fedimint_core::{impl_db_lookup, impl_db_record};
450    use futures::StreamExt;
451
452    use super::*;
453
454    fn open_temp_db(temp_path: &str) -> Database {
455        let path = tempfile::Builder::new()
456            .prefix(temp_path)
457            .tempdir()
458            .unwrap();
459
460        Database::new(
461            RocksDb::open(path).unwrap(),
462            ModuleDecoderRegistry::default(),
463        )
464    }
465
466    #[tokio::test(flavor = "multi_thread")]
467    async fn test_dbtx_insert_elements() {
468        fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
469            .await;
470    }
471
472    #[tokio::test(flavor = "multi_thread")]
473    async fn test_dbtx_remove_nonexisting() {
474        fedimint_core::db::verify_remove_nonexisting(open_temp_db(
475            "fcb-rocksdb-test-remove-nonexisting",
476        ))
477        .await;
478    }
479
480    #[tokio::test(flavor = "multi_thread")]
481    async fn test_dbtx_remove_existing() {
482        fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
483            .await;
484    }
485
486    #[tokio::test(flavor = "multi_thread")]
487    async fn test_dbtx_read_own_writes() {
488        fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
489            .await;
490    }
491
492    #[tokio::test(flavor = "multi_thread")]
493    async fn test_dbtx_prevent_dirty_reads() {
494        fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
495            "fcb-rocksdb-test-prevent-dirty-reads",
496        ))
497        .await;
498    }
499
500    #[tokio::test(flavor = "multi_thread")]
501    async fn test_dbtx_find_by_range() {
502        fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
503            .await;
504    }
505
506    #[tokio::test(flavor = "multi_thread")]
507    async fn test_dbtx_find_by_prefix() {
508        fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
509            .await;
510    }
511
512    #[tokio::test(flavor = "multi_thread")]
513    async fn test_dbtx_commit() {
514        fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
515    }
516
517    #[tokio::test(flavor = "multi_thread")]
518    async fn test_dbtx_prevent_nonrepeatable_reads() {
519        fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
520            "fcb-rocksdb-test-prevent-nonrepeatable-reads",
521        ))
522        .await;
523    }
524
525    #[tokio::test(flavor = "multi_thread")]
526    async fn test_dbtx_snapshot_isolation() {
527        fedimint_core::db::verify_snapshot_isolation(open_temp_db(
528            "fcb-rocksdb-test-snapshot-isolation",
529        ))
530        .await;
531    }
532
533    #[tokio::test(flavor = "multi_thread")]
534    async fn test_dbtx_rollback_to_savepoint() {
535        fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
536            "fcb-rocksdb-test-rollback-to-savepoint",
537        ))
538        .await;
539    }
540
541    #[tokio::test(flavor = "multi_thread")]
542    async fn test_dbtx_phantom_entry() {
543        fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
544            .await;
545    }
546
547    #[tokio::test(flavor = "multi_thread")]
548    async fn test_dbtx_write_conflict() {
549        fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
550            .await;
551    }
552
553    #[tokio::test(flavor = "multi_thread")]
554    async fn test_dbtx_remove_by_prefix() {
555        fedimint_core::db::verify_remove_by_prefix(open_temp_db(
556            "fcb-rocksdb-test-remove-by-prefix",
557        ))
558        .await;
559    }
560
561    #[tokio::test(flavor = "multi_thread")]
562    async fn test_module_dbtx() {
563        fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
564            .await;
565    }
566
567    #[tokio::test(flavor = "multi_thread")]
568    async fn test_module_db() {
569        let module_instance_id = 1;
570        let path = tempfile::Builder::new()
571            .prefix("fcb-rocksdb-test-module-db-prefix")
572            .tempdir()
573            .unwrap();
574
575        let module_db = Database::new(
576            RocksDb::open(path).unwrap(),
577            ModuleDecoderRegistry::default(),
578        );
579
580        fedimint_core::db::verify_module_db(
581            open_temp_db("fcb-rocksdb-test-module-db"),
582            module_db.with_prefix_module_id(module_instance_id).0,
583        )
584        .await;
585    }
586
587    #[test]
588    fn test_next_prefix() {
589        // Note: although we are testing the general case of a vector with N elements,
590        // the prefixes currently use N = 1
591        assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
592        assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
593        assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
594        assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
595        // this is a "max" prefix
596        assert!(next_prefix(&[255, 255, 255]).is_none());
597        // these are the common case
598        assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
599        assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
600        assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
601    }
602
603    #[repr(u8)]
604    #[derive(Clone)]
605    pub enum TestDbKeyPrefix {
606        Test = 254,
607        MaxTest = 255,
608    }
609
610    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
611    pub(super) struct TestKey(pub Vec<u8>);
612
613    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
614    pub(super) struct TestVal(pub Vec<u8>);
615
616    #[derive(Debug, Encodable, Decodable)]
617    struct DbPrefixTestPrefix;
618
619    impl_db_record!(
620        key = TestKey,
621        value = TestVal,
622        db_prefix = TestDbKeyPrefix::Test,
623        notify_on_modify = true,
624    );
625    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
626
627    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
628    pub(super) struct TestKey2(pub Vec<u8>);
629
630    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
631    pub(super) struct TestVal2(pub Vec<u8>);
632
633    #[derive(Debug, Encodable, Decodable)]
634    struct DbPrefixTestPrefixMax;
635
636    impl_db_record!(
637        key = TestKey2,
638        value = TestVal2,
639        db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
640        notify_on_modify = true,
641    );
642    impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
643
644    #[tokio::test(flavor = "multi_thread")]
645    async fn test_retrieve_descending_order() {
646        let path = tempfile::Builder::new()
647            .prefix("fcb-rocksdb-test-descending-order")
648            .tempdir()
649            .unwrap();
650        {
651            let db = Database::new(
652                RocksDb::open(&path).unwrap(),
653                ModuleDecoderRegistry::default(),
654            );
655            let mut dbtx = db.begin_transaction().await;
656            dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
657                .await;
658            dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
659                .await;
660            dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
661                .await;
662            dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
663                .await;
664            dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
665                .await;
666            dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
667                .await;
668            let query = dbtx
669                .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
670                .await
671                .collect::<Vec<_>>()
672                .await;
673            assert_eq!(
674                query,
675                vec![
676                    (TestKey(vec![255]), TestVal(vec![2])),
677                    (TestKey(vec![254]), TestVal(vec![1])),
678                    (TestKey(vec![0]), TestVal(vec![3]))
679                ]
680            );
681            let query = dbtx
682                .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
683                .await
684                .collect::<Vec<_>>()
685                .await;
686            assert_eq!(
687                query,
688                vec![
689                    (TestKey2(vec![255]), TestVal2(vec![2])),
690                    (TestKey2(vec![254]), TestVal2(vec![1])),
691                    (TestKey2(vec![0]), TestVal2(vec![3]))
692                ]
693            );
694            dbtx.commit_tx().await;
695        }
696        // Test readonly implementation
697        let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap();
698        let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
699        let mut dbtx = db_readonly.begin_transaction_nc().await;
700        let query = dbtx
701            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
702            .await
703            .collect::<Vec<_>>()
704            .await;
705        assert_eq!(
706            query,
707            vec![
708                (TestKey(vec![255]), TestVal(vec![2])),
709                (TestKey(vec![254]), TestVal(vec![1])),
710                (TestKey(vec![0]), TestVal(vec![3]))
711            ]
712        );
713        let query = dbtx
714            .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
715            .await
716            .collect::<Vec<_>>()
717            .await;
718        assert_eq!(
719            query,
720            vec![
721                (TestKey2(vec![255]), TestVal2(vec![2])),
722                (TestKey2(vec![254]), TestVal2(vec![1])),
723                (TestKey2(vec![0]), TestVal2(vec![3]))
724            ]
725        );
726    }
727}