1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod db_locked;
7pub mod envs;
8
9use std::fmt;
10use std::ops::Range;
11use std::path::Path;
12use std::str::FromStr;
13
14use anyhow::{Context, Result, bail};
15use async_trait::async_trait;
16use db_locked::{Locked, LockedBuilder};
17use fedimint_core::db::{
18 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
19 PrefixStream,
20};
21use fedimint_core::task::block_in_place;
22use futures::stream;
23pub use rocksdb;
24use rocksdb::{
25 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
26};
27use tracing::debug;
28
29use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
30
31fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
34where
35 I: Iterator + Send + 'i,
36 I::Item: Send,
37{
38 stream::unfold(iter, |mut iter| async {
39 fedimint_core::runtime::block_in_place(|| {
40 let item = iter.next();
41 item.map(|item| (item, iter))
42 })
43 })
44}
45
46#[derive(Debug)]
47pub struct RocksDb(rocksdb::OptimisticTransactionDB);
48
49pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
50
51impl RocksDb {
52 #[allow(clippy::unused_async)]
53 pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
54 let db_path = db_path.as_ref();
55
56 block_in_place(|| Self::open_blocking(db_path))
57 }
58
59 pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
60 block_in_place(|| {
61 std::fs::create_dir_all(
62 db_path
63 .parent()
64 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
65 )?;
66 LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
67 })
68 }
69
70 pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
71 let mut opts = get_default_options()?;
72 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
75 let db: rocksdb::OptimisticTransactionDB =
76 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
77 Ok(RocksDb(db))
78 }
79
80 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
81 &self.0
82 }
83}
84
85fn is_power_of_two(num: usize) -> bool {
87 num.is_power_of_two()
88}
89
90impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.write_str("RocksDbTransaction")
93 }
94}
95
96impl fmt::Debug for RocksDbTransaction<'_> {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.write_str("RocksDbTransaction")
99 }
100}
101
102#[test]
103fn is_power_of_two_sanity() {
104 assert!(!is_power_of_two(0));
105 assert!(is_power_of_two(1));
106 assert!(is_power_of_two(2));
107 assert!(!is_power_of_two(3));
108 assert!(is_power_of_two(4));
109 assert!(!is_power_of_two(5));
110 assert!(is_power_of_two(2 << 10));
111 assert!(!is_power_of_two((2 << 10) + 1));
112}
113
114fn get_default_options() -> anyhow::Result<rocksdb::Options> {
115 let mut opts = rocksdb::Options::default();
116 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
117 debug!(var, "Using custom write buffer size");
118 let size: usize = FromStr::from_str(&var)
119 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
120 if !is_power_of_two(size) {
121 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
122 }
123 opts.set_write_buffer_size(size);
124 }
125 opts.create_if_missing(true);
126 Ok(opts)
127}
128
129#[derive(Debug)]
130pub struct RocksDbReadOnly(rocksdb::DB);
131
132pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
133
134impl RocksDbReadOnly {
135 #[allow(clippy::unused_async)]
136 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
137 let db_path = db_path.as_ref();
138 block_in_place(|| Self::open_read_only_blocking(db_path))
139 }
140
141 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
142 let opts = get_default_options()?;
143 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
145 Ok(RocksDbReadOnly(db))
146 }
147}
148
149impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
150 fn from(db: OptimisticTransactionDB) -> Self {
151 RocksDb(db)
152 }
153}
154
155impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
156 fn from(db: RocksDb) -> Self {
157 db.0
158 }
159}
160
161fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
167 let mut next_prefix = prefix.to_vec();
168 let mut is_last_prefix = true;
169 for i in (0..next_prefix.len()).rev() {
170 next_prefix[i] = next_prefix[i].wrapping_add(1);
171 if next_prefix[i] > 0 {
172 is_last_prefix = false;
173 break;
174 }
175 }
176 if is_last_prefix {
177 None
180 } else {
181 Some(next_prefix)
182 }
183}
184
185#[async_trait]
186impl IRawDatabase for RocksDb {
187 type Transaction<'a> = RocksDbTransaction<'a>;
188 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
189 let mut optimistic_options = OptimisticTransactionOptions::default();
190 optimistic_options.set_snapshot(true);
191
192 let mut write_options = WriteOptions::default();
193 write_options.set_sync(true);
195
196 let mut rocksdb_tx =
197 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
198 rocksdb_tx
199 .set_tx_savepoint()
200 .await
201 .expect("setting tx savepoint failed");
202
203 rocksdb_tx
204 }
205
206 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
207 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
208 checkpoint.create_checkpoint(backup_path)?;
209 Ok(())
210 }
211}
212
213#[async_trait]
214impl IRawDatabase for RocksDbReadOnly {
215 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
216 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
217 RocksDbReadOnlyTransaction(&self.0)
218 }
219
220 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
221 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
222 checkpoint.create_checkpoint(backup_path)?;
223 Ok(())
224 }
225}
226
227#[async_trait]
228impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
229 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
230 fedimint_core::runtime::block_in_place(|| {
231 let val = self.0.snapshot().get(key).unwrap();
232 self.0.put(key, value)?;
233 Ok(val)
234 })
235 }
236
237 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
238 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
239 }
240
241 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
242 fedimint_core::runtime::block_in_place(|| {
243 let val = self.0.snapshot().get(key).unwrap();
244 self.0.delete(key)?;
245 Ok(val)
246 })
247 }
248
249 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
250 Ok(fedimint_core::runtime::block_in_place(|| {
251 let prefix = key_prefix.to_vec();
252 let mut options = rocksdb::ReadOptions::default();
253 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
254 let iter = self.0.snapshot().iterator_opt(
255 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
256 options,
257 );
258 let rocksdb_iter = iter.map_while(move |res| {
259 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
260 key_bytes
261 .starts_with(&prefix)
262 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
263 });
264 Box::pin(convert_to_async_stream(rocksdb_iter))
265 }))
266 }
267
268 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
269 Ok(fedimint_core::runtime::block_in_place(|| {
270 let range = Range {
271 start: range.start.to_vec(),
272 end: range.end.to_vec(),
273 };
274 let mut options = rocksdb::ReadOptions::default();
275 options.set_iterate_range(range.clone());
276 let iter = self.0.snapshot().iterator_opt(
277 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
278 options,
279 );
280 let rocksdb_iter = iter.map_while(move |res| {
281 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
282 (key_bytes.as_ref() < range.end.as_slice())
283 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
284 });
285 Box::pin(convert_to_async_stream(rocksdb_iter))
286 }))
287 }
288
289 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
290 fedimint_core::runtime::block_in_place(|| {
291 let mut options = rocksdb::ReadOptions::default();
293 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
294 let iter = self
295 .0
296 .snapshot()
297 .iterator_opt(
298 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
299 options,
300 )
301 .map_while(|res| {
302 res.map(|(key_bytes, _)| {
303 key_bytes
304 .starts_with(key_prefix)
305 .then_some(key_bytes.to_vec())
306 })
307 .transpose()
308 });
309
310 for item in iter {
311 let key = item?;
312 self.0.delete(key)?;
313 }
314
315 Ok(())
316 })
317 }
318
319 async fn raw_find_by_prefix_sorted_descending(
320 &mut self,
321 key_prefix: &[u8],
322 ) -> Result<PrefixStream<'_>> {
323 let prefix = key_prefix.to_vec();
324 let next_prefix = next_prefix(&prefix);
325 let iterator_mode = if let Some(next_prefix) = &next_prefix {
326 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
327 } else {
328 rocksdb::IteratorMode::End
329 };
330 Ok(fedimint_core::runtime::block_in_place(|| {
331 let mut options = rocksdb::ReadOptions::default();
332 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
333 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
334 let rocksdb_iter = iter.map_while(move |res| {
335 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
336 key_bytes
337 .starts_with(&prefix)
338 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
339 });
340 Box::pin(convert_to_async_stream(rocksdb_iter))
341 }))
342 }
343}
344
345#[async_trait]
346impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
347 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
348 Ok(fedimint_core::runtime::block_in_place(|| {
349 self.0.rollback_to_savepoint()
350 })?)
351 }
352
353 async fn set_tx_savepoint(&mut self) -> Result<()> {
354 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
355
356 Ok(())
357 }
358}
359
360#[async_trait]
361impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
362 async fn commit_tx(self) -> Result<()> {
363 fedimint_core::runtime::block_in_place(|| {
364 self.0.commit()?;
365 Ok(())
366 })
367 }
368}
369
370#[async_trait]
371impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
372 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
373 panic!("Cannot insert into a read only transaction");
374 }
375
376 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
377 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
378 }
379
380 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
381 panic!("Cannot remove from a read only transaction");
382 }
383
384 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
385 Ok(fedimint_core::runtime::block_in_place(|| {
386 let range = Range {
387 start: range.start.to_vec(),
388 end: range.end.to_vec(),
389 };
390 let mut options = rocksdb::ReadOptions::default();
391 options.set_iterate_range(range.clone());
392 let iter = self.0.snapshot().iterator_opt(
393 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
394 options,
395 );
396 let rocksdb_iter = iter.map_while(move |res| {
397 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
398 (key_bytes.as_ref() < range.end.as_slice())
399 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
400 });
401 Box::pin(convert_to_async_stream(rocksdb_iter))
402 }))
403 }
404
405 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
406 Ok(fedimint_core::runtime::block_in_place(|| {
407 let prefix = key_prefix.to_vec();
408 let mut options = rocksdb::ReadOptions::default();
409 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
410 let iter = self.0.snapshot().iterator_opt(
411 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
412 options,
413 );
414 let rocksdb_iter = iter.map_while(move |res| {
415 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
416 key_bytes
417 .starts_with(&prefix)
418 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
419 });
420 Box::pin(convert_to_async_stream(rocksdb_iter))
421 }))
422 }
423
424 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
425 panic!("Cannot remove from a read only transaction");
426 }
427
428 async fn raw_find_by_prefix_sorted_descending(
429 &mut self,
430 key_prefix: &[u8],
431 ) -> Result<PrefixStream<'_>> {
432 let prefix = key_prefix.to_vec();
433 let next_prefix = next_prefix(&prefix);
434 let iterator_mode = if let Some(next_prefix) = &next_prefix {
435 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
436 } else {
437 rocksdb::IteratorMode::End
438 };
439 Ok(fedimint_core::runtime::block_in_place(|| {
440 let mut options = rocksdb::ReadOptions::default();
441 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
442 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
443 let rocksdb_iter = iter.map_while(move |res| {
444 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
445 key_bytes
446 .starts_with(&prefix)
447 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
448 });
449 Box::pin(stream::iter(rocksdb_iter))
450 }))
451 }
452}
453
454#[async_trait]
455impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
456 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
457 panic!("Cannot rollback a read only transaction");
458 }
459
460 async fn set_tx_savepoint(&mut self) -> Result<()> {
461 panic!("Cannot set a savepoint in a read only transaction");
462 }
463}
464
465#[async_trait]
466impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
467 async fn commit_tx(self) -> Result<()> {
468 panic!("Cannot commit a read only transaction");
469 }
470}
471
472#[cfg(test)]
473mod fedimint_rocksdb_tests {
474 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
475 use fedimint_core::encoding::{Decodable, Encodable};
476 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
477 use fedimint_core::{impl_db_lookup, impl_db_record};
478 use futures::StreamExt;
479
480 use super::*;
481
482 fn open_temp_db(temp_path: &str) -> Database {
483 let path = tempfile::Builder::new()
484 .prefix(temp_path)
485 .tempdir()
486 .unwrap();
487
488 Database::new(
489 RocksDb::open_blocking(path.as_ref()).unwrap(),
490 ModuleDecoderRegistry::default(),
491 )
492 }
493
494 #[tokio::test(flavor = "multi_thread")]
495 async fn test_dbtx_insert_elements() {
496 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
497 .await;
498 }
499
500 #[tokio::test(flavor = "multi_thread")]
501 async fn test_dbtx_remove_nonexisting() {
502 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
503 "fcb-rocksdb-test-remove-nonexisting",
504 ))
505 .await;
506 }
507
508 #[tokio::test(flavor = "multi_thread")]
509 async fn test_dbtx_remove_existing() {
510 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
511 .await;
512 }
513
514 #[tokio::test(flavor = "multi_thread")]
515 async fn test_dbtx_read_own_writes() {
516 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
517 .await;
518 }
519
520 #[tokio::test(flavor = "multi_thread")]
521 async fn test_dbtx_prevent_dirty_reads() {
522 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
523 "fcb-rocksdb-test-prevent-dirty-reads",
524 ))
525 .await;
526 }
527
528 #[tokio::test(flavor = "multi_thread")]
529 async fn test_dbtx_find_by_range() {
530 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
531 .await;
532 }
533
534 #[tokio::test(flavor = "multi_thread")]
535 async fn test_dbtx_find_by_prefix() {
536 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
537 .await;
538 }
539
540 #[tokio::test(flavor = "multi_thread")]
541 async fn test_dbtx_commit() {
542 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
543 }
544
545 #[tokio::test(flavor = "multi_thread")]
546 async fn test_dbtx_prevent_nonrepeatable_reads() {
547 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
548 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
549 ))
550 .await;
551 }
552
553 #[tokio::test(flavor = "multi_thread")]
554 async fn test_dbtx_snapshot_isolation() {
555 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
556 "fcb-rocksdb-test-snapshot-isolation",
557 ))
558 .await;
559 }
560
561 #[tokio::test(flavor = "multi_thread")]
562 async fn test_dbtx_rollback_to_savepoint() {
563 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
564 "fcb-rocksdb-test-rollback-to-savepoint",
565 ))
566 .await;
567 }
568
569 #[tokio::test(flavor = "multi_thread")]
570 async fn test_dbtx_phantom_entry() {
571 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
572 .await;
573 }
574
575 #[tokio::test(flavor = "multi_thread")]
576 async fn test_dbtx_write_conflict() {
577 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
578 .await;
579 }
580
581 #[tokio::test(flavor = "multi_thread")]
582 async fn test_dbtx_remove_by_prefix() {
583 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
584 "fcb-rocksdb-test-remove-by-prefix",
585 ))
586 .await;
587 }
588
589 #[tokio::test(flavor = "multi_thread")]
590 async fn test_module_dbtx() {
591 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
592 .await;
593 }
594
595 #[tokio::test(flavor = "multi_thread")]
596 async fn test_module_db() {
597 let module_instance_id = 1;
598 let path = tempfile::Builder::new()
599 .prefix("fcb-rocksdb-test-module-db-prefix")
600 .tempdir()
601 .unwrap();
602
603 let module_db = Database::new(
604 RocksDb::open_blocking(path.as_ref()).unwrap(),
605 ModuleDecoderRegistry::default(),
606 );
607
608 fedimint_core::db::verify_module_db(
609 open_temp_db("fcb-rocksdb-test-module-db"),
610 module_db.with_prefix_module_id(module_instance_id).0,
611 )
612 .await;
613 }
614
615 #[test]
616 fn test_next_prefix() {
617 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
620 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
621 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
622 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
623 assert!(next_prefix(&[255, 255, 255]).is_none());
625 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
627 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
628 assert!(next_prefix(&[255]).is_none()); }
630
631 #[repr(u8)]
632 #[derive(Clone)]
633 pub enum TestDbKeyPrefix {
634 Test = 254,
635 MaxTest = 255,
636 }
637
638 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
639 pub(super) struct TestKey(pub Vec<u8>);
640
641 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
642 pub(super) struct TestVal(pub Vec<u8>);
643
644 #[derive(Debug, Encodable, Decodable)]
645 struct DbPrefixTestPrefix;
646
647 impl_db_record!(
648 key = TestKey,
649 value = TestVal,
650 db_prefix = TestDbKeyPrefix::Test,
651 notify_on_modify = true,
652 );
653 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
654
655 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
656 pub(super) struct TestKey2(pub Vec<u8>);
657
658 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
659 pub(super) struct TestVal2(pub Vec<u8>);
660
661 #[derive(Debug, Encodable, Decodable)]
662 struct DbPrefixTestPrefixMax;
663
664 impl_db_record!(
665 key = TestKey2,
666 value = TestVal2,
667 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
669 );
670 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
671
672 #[tokio::test(flavor = "multi_thread")]
673 async fn test_retrieve_descending_order() {
674 let path = tempfile::Builder::new()
675 .prefix("fcb-rocksdb-test-descending-order")
676 .tempdir()
677 .unwrap();
678 {
679 let db = Database::new(
680 RocksDb::open(&path).await.unwrap(),
681 ModuleDecoderRegistry::default(),
682 );
683 let mut dbtx = db.begin_transaction().await;
684 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
685 .await;
686 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
687 .await;
688 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
689 .await;
690 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
691 .await;
692 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
693 .await;
694 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
695 .await;
696 let query = dbtx
697 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
698 .await
699 .collect::<Vec<_>>()
700 .await;
701 assert_eq!(
702 query,
703 vec![
704 (TestKey(vec![255]), TestVal(vec![2])),
705 (TestKey(vec![254]), TestVal(vec![1])),
706 (TestKey(vec![0]), TestVal(vec![3]))
707 ]
708 );
709 let query = dbtx
710 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
711 .await
712 .collect::<Vec<_>>()
713 .await;
714 assert_eq!(
715 query,
716 vec![
717 (TestKey2(vec![255]), TestVal2(vec![2])),
718 (TestKey2(vec![254]), TestVal2(vec![1])),
719 (TestKey2(vec![0]), TestVal2(vec![3]))
720 ]
721 );
722 dbtx.commit_tx().await;
723 }
724 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
726 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
727 let mut dbtx = db_readonly.begin_transaction_nc().await;
728 let query = dbtx
729 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
730 .await
731 .collect::<Vec<_>>()
732 .await;
733 assert_eq!(
734 query,
735 vec![
736 (TestKey(vec![255]), TestVal(vec![2])),
737 (TestKey(vec![254]), TestVal(vec![1])),
738 (TestKey(vec![0]), TestVal(vec![3]))
739 ]
740 );
741 let query = dbtx
742 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
743 .await
744 .collect::<Vec<_>>()
745 .await;
746 assert_eq!(
747 query,
748 vec![
749 (TestKey2(vec![255]), TestVal2(vec![2])),
750 (TestKey2(vec![254]), TestVal2(vec![1])),
751 (TestKey2(vec![0]), TestVal2(vec![3]))
752 ]
753 );
754 }
755}