1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
17 PrefixStream,
18};
19use fedimint_core::task::block_in_place;
20use fedimint_db_locked::{Locked, LockedBuilder};
21use futures::stream;
22pub use rocksdb;
23use rocksdb::{
24 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
25};
26use tracing::debug;
27
28use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
29
30fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
33where
34 I: Iterator + Send + 'i,
35 I::Item: Send,
36{
37 stream::unfold(iter, |mut iter| async {
38 fedimint_core::runtime::block_in_place(|| {
39 let item = iter.next();
40 item.map(|item| (item, iter))
41 })
42 })
43}
44
45#[derive(Debug)]
46pub struct RocksDb(rocksdb::OptimisticTransactionDB);
47
48pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
49
50impl RocksDb {
51 #[allow(clippy::unused_async)]
52 pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
53 let db_path = db_path.as_ref();
54
55 block_in_place(|| Self::open_blocking(db_path))
56 }
57
58 pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
59 block_in_place(|| {
60 std::fs::create_dir_all(
61 db_path
62 .parent()
63 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
64 )?;
65 LockedBuilder::new(db_path)?.with_db(|| Self::open_blocking_unlocked(db_path))
66 })
67 }
68
69 pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
70 let mut opts = get_default_options()?;
71 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
74 let db: rocksdb::OptimisticTransactionDB =
75 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
76 Ok(RocksDb(db))
77 }
78
79 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
80 &self.0
81 }
82}
83
84fn is_power_of_two(num: usize) -> bool {
86 num.is_power_of_two()
87}
88
89impl fmt::Debug for RocksDbReadOnlyTransaction<'_> {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.write_str("RocksDbTransaction")
92 }
93}
94
95impl fmt::Debug for RocksDbTransaction<'_> {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.write_str("RocksDbTransaction")
98 }
99}
100
101#[test]
102fn is_power_of_two_sanity() {
103 assert!(!is_power_of_two(0));
104 assert!(is_power_of_two(1));
105 assert!(is_power_of_two(2));
106 assert!(!is_power_of_two(3));
107 assert!(is_power_of_two(4));
108 assert!(!is_power_of_two(5));
109 assert!(is_power_of_two(2 << 10));
110 assert!(!is_power_of_two((2 << 10) + 1));
111}
112
113fn get_default_options() -> anyhow::Result<rocksdb::Options> {
114 let mut opts = rocksdb::Options::default();
115 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
116 debug!(var, "Using custom write buffer size");
117 let size: usize = FromStr::from_str(&var)
118 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
119 if !is_power_of_two(size) {
120 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
121 }
122 opts.set_write_buffer_size(size);
123 }
124 opts.create_if_missing(true);
125 Ok(opts)
126}
127
128#[derive(Debug)]
129pub struct RocksDbReadOnly(rocksdb::DB);
130
131pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
132
133impl RocksDbReadOnly {
134 #[allow(clippy::unused_async)]
135 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
136 let db_path = db_path.as_ref();
137 block_in_place(|| Self::open_read_only_blocking(db_path))
138 }
139
140 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
141 let opts = get_default_options()?;
142 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
144 Ok(RocksDbReadOnly(db))
145 }
146}
147
148impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
149 fn from(db: OptimisticTransactionDB) -> Self {
150 RocksDb(db)
151 }
152}
153
154impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
155 fn from(db: RocksDb) -> Self {
156 db.0
157 }
158}
159
160fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
166 let mut next_prefix = prefix.to_vec();
167 let mut is_last_prefix = true;
168 for i in (0..next_prefix.len()).rev() {
169 next_prefix[i] = next_prefix[i].wrapping_add(1);
170 if next_prefix[i] > 0 {
171 is_last_prefix = false;
172 break;
173 }
174 }
175 if is_last_prefix {
176 None
179 } else {
180 Some(next_prefix)
181 }
182}
183
184#[async_trait]
185impl IRawDatabase for RocksDb {
186 type Transaction<'a> = RocksDbTransaction<'a>;
187 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
188 let mut optimistic_options = OptimisticTransactionOptions::default();
189 optimistic_options.set_snapshot(true);
190
191 let mut write_options = WriteOptions::default();
192 write_options.set_sync(true);
194
195 let mut rocksdb_tx =
196 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
197 rocksdb_tx
198 .set_tx_savepoint()
199 .await
200 .expect("setting tx savepoint failed");
201
202 rocksdb_tx
203 }
204
205 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
206 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
207 checkpoint.create_checkpoint(backup_path)?;
208 Ok(())
209 }
210}
211
212#[async_trait]
213impl IRawDatabase for RocksDbReadOnly {
214 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
215 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
216 RocksDbReadOnlyTransaction(&self.0)
217 }
218
219 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
220 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
221 checkpoint.create_checkpoint(backup_path)?;
222 Ok(())
223 }
224}
225
226#[async_trait]
227impl IDatabaseTransactionOpsCore for RocksDbTransaction<'_> {
228 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
229 fedimint_core::runtime::block_in_place(|| {
230 let val = self.0.snapshot().get(key).unwrap();
231 self.0.put(key, value)?;
232 Ok(val)
233 })
234 }
235
236 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
237 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
238 }
239
240 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
241 fedimint_core::runtime::block_in_place(|| {
242 let val = self.0.snapshot().get(key).unwrap();
243 self.0.delete(key)?;
244 Ok(val)
245 })
246 }
247
248 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
249 Ok(fedimint_core::runtime::block_in_place(|| {
250 let prefix = key_prefix.to_vec();
251 let mut options = rocksdb::ReadOptions::default();
252 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
253 let iter = self.0.snapshot().iterator_opt(
254 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
255 options,
256 );
257 let rocksdb_iter = iter.map_while(move |res| {
258 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
259 key_bytes
260 .starts_with(&prefix)
261 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
262 });
263 Box::pin(convert_to_async_stream(rocksdb_iter))
264 }))
265 }
266
267 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
268 Ok(fedimint_core::runtime::block_in_place(|| {
269 let range = Range {
270 start: range.start.to_vec(),
271 end: range.end.to_vec(),
272 };
273 let mut options = rocksdb::ReadOptions::default();
274 options.set_iterate_range(range.clone());
275 let iter = self.0.snapshot().iterator_opt(
276 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
277 options,
278 );
279 let rocksdb_iter = iter.map_while(move |res| {
280 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
281 (key_bytes.as_ref() < range.end.as_slice())
282 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
283 });
284 Box::pin(convert_to_async_stream(rocksdb_iter))
285 }))
286 }
287
288 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
289 fedimint_core::runtime::block_in_place(|| {
290 let mut options = rocksdb::ReadOptions::default();
292 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
293 let iter = self
294 .0
295 .snapshot()
296 .iterator_opt(
297 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
298 options,
299 )
300 .map_while(|res| {
301 res.map(|(key_bytes, _)| {
302 key_bytes
303 .starts_with(key_prefix)
304 .then_some(key_bytes.to_vec())
305 })
306 .transpose()
307 });
308
309 for item in iter {
310 let key = item?;
311 self.0.delete(key)?;
312 }
313
314 Ok(())
315 })
316 }
317
318 async fn raw_find_by_prefix_sorted_descending(
319 &mut self,
320 key_prefix: &[u8],
321 ) -> Result<PrefixStream<'_>> {
322 let prefix = key_prefix.to_vec();
323 let next_prefix = next_prefix(&prefix);
324 let iterator_mode = if let Some(next_prefix) = &next_prefix {
325 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
326 } else {
327 rocksdb::IteratorMode::End
328 };
329 Ok(fedimint_core::runtime::block_in_place(|| {
330 let mut options = rocksdb::ReadOptions::default();
331 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
332 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
333 let rocksdb_iter = iter.map_while(move |res| {
334 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
335 key_bytes
336 .starts_with(&prefix)
337 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
338 });
339 Box::pin(convert_to_async_stream(rocksdb_iter))
340 }))
341 }
342}
343
344#[async_trait]
345impl IDatabaseTransactionOps for RocksDbTransaction<'_> {
346 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
347 Ok(fedimint_core::runtime::block_in_place(|| {
348 self.0.rollback_to_savepoint()
349 })?)
350 }
351
352 async fn set_tx_savepoint(&mut self) -> Result<()> {
353 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
354
355 Ok(())
356 }
357}
358
359#[async_trait]
360impl IRawDatabaseTransaction for RocksDbTransaction<'_> {
361 async fn commit_tx(self) -> Result<()> {
362 fedimint_core::runtime::block_in_place(|| {
363 self.0.commit()?;
364 Ok(())
365 })
366 }
367}
368
369#[async_trait]
370impl IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'_> {
371 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
372 panic!("Cannot insert into a read only transaction");
373 }
374
375 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
376 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
377 }
378
379 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
380 panic!("Cannot remove from a read only transaction");
381 }
382
383 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
384 Ok(fedimint_core::runtime::block_in_place(|| {
385 let range = Range {
386 start: range.start.to_vec(),
387 end: range.end.to_vec(),
388 };
389 let mut options = rocksdb::ReadOptions::default();
390 options.set_iterate_range(range.clone());
391 let iter = self.0.snapshot().iterator_opt(
392 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
393 options,
394 );
395 let rocksdb_iter = iter.map_while(move |res| {
396 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
397 (key_bytes.as_ref() < range.end.as_slice())
398 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
399 });
400 Box::pin(convert_to_async_stream(rocksdb_iter))
401 }))
402 }
403
404 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
405 Ok(fedimint_core::runtime::block_in_place(|| {
406 let prefix = key_prefix.to_vec();
407 let mut options = rocksdb::ReadOptions::default();
408 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
409 let iter = self.0.snapshot().iterator_opt(
410 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
411 options,
412 );
413 let rocksdb_iter = iter.map_while(move |res| {
414 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
415 key_bytes
416 .starts_with(&prefix)
417 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
418 });
419 Box::pin(convert_to_async_stream(rocksdb_iter))
420 }))
421 }
422
423 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
424 panic!("Cannot remove from a read only transaction");
425 }
426
427 async fn raw_find_by_prefix_sorted_descending(
428 &mut self,
429 key_prefix: &[u8],
430 ) -> Result<PrefixStream<'_>> {
431 let prefix = key_prefix.to_vec();
432 let next_prefix = next_prefix(&prefix);
433 let iterator_mode = if let Some(next_prefix) = &next_prefix {
434 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
435 } else {
436 rocksdb::IteratorMode::End
437 };
438 Ok(fedimint_core::runtime::block_in_place(|| {
439 let mut options = rocksdb::ReadOptions::default();
440 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
441 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
442 let rocksdb_iter = iter.map_while(move |res| {
443 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
444 key_bytes
445 .starts_with(&prefix)
446 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
447 });
448 Box::pin(stream::iter(rocksdb_iter))
449 }))
450 }
451}
452
453#[async_trait]
454impl IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'_> {
455 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
456 panic!("Cannot rollback a read only transaction");
457 }
458
459 async fn set_tx_savepoint(&mut self) -> Result<()> {
460 panic!("Cannot set a savepoint in a read only transaction");
461 }
462}
463
464#[async_trait]
465impl IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'_> {
466 async fn commit_tx(self) -> Result<()> {
467 panic!("Cannot commit a read only transaction");
468 }
469}
470
471#[cfg(test)]
472mod fedimint_rocksdb_tests {
473 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
474 use fedimint_core::encoding::{Decodable, Encodable};
475 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
476 use fedimint_core::{impl_db_lookup, impl_db_record};
477 use futures::StreamExt;
478
479 use super::*;
480
481 fn open_temp_db(temp_path: &str) -> Database {
482 let path = tempfile::Builder::new()
483 .prefix(temp_path)
484 .tempdir()
485 .unwrap();
486
487 Database::new(
488 RocksDb::open_blocking(path.as_ref()).unwrap(),
489 ModuleDecoderRegistry::default(),
490 )
491 }
492
493 #[tokio::test(flavor = "multi_thread")]
494 async fn test_dbtx_insert_elements() {
495 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
496 .await;
497 }
498
499 #[tokio::test(flavor = "multi_thread")]
500 async fn test_dbtx_remove_nonexisting() {
501 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
502 "fcb-rocksdb-test-remove-nonexisting",
503 ))
504 .await;
505 }
506
507 #[tokio::test(flavor = "multi_thread")]
508 async fn test_dbtx_remove_existing() {
509 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
510 .await;
511 }
512
513 #[tokio::test(flavor = "multi_thread")]
514 async fn test_dbtx_read_own_writes() {
515 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
516 .await;
517 }
518
519 #[tokio::test(flavor = "multi_thread")]
520 async fn test_dbtx_prevent_dirty_reads() {
521 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
522 "fcb-rocksdb-test-prevent-dirty-reads",
523 ))
524 .await;
525 }
526
527 #[tokio::test(flavor = "multi_thread")]
528 async fn test_dbtx_find_by_range() {
529 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
530 .await;
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn test_dbtx_find_by_prefix() {
535 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
536 .await;
537 }
538
539 #[tokio::test(flavor = "multi_thread")]
540 async fn test_dbtx_commit() {
541 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
542 }
543
544 #[tokio::test(flavor = "multi_thread")]
545 async fn test_dbtx_prevent_nonrepeatable_reads() {
546 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
547 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
548 ))
549 .await;
550 }
551
552 #[tokio::test(flavor = "multi_thread")]
553 async fn test_dbtx_snapshot_isolation() {
554 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
555 "fcb-rocksdb-test-snapshot-isolation",
556 ))
557 .await;
558 }
559
560 #[tokio::test(flavor = "multi_thread")]
561 async fn test_dbtx_rollback_to_savepoint() {
562 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
563 "fcb-rocksdb-test-rollback-to-savepoint",
564 ))
565 .await;
566 }
567
568 #[tokio::test(flavor = "multi_thread")]
569 async fn test_dbtx_phantom_entry() {
570 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
571 .await;
572 }
573
574 #[tokio::test(flavor = "multi_thread")]
575 async fn test_dbtx_write_conflict() {
576 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
577 .await;
578 }
579
580 #[tokio::test(flavor = "multi_thread")]
581 async fn test_dbtx_remove_by_prefix() {
582 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
583 "fcb-rocksdb-test-remove-by-prefix",
584 ))
585 .await;
586 }
587
588 #[tokio::test(flavor = "multi_thread")]
589 async fn test_module_dbtx() {
590 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
591 .await;
592 }
593
594 #[tokio::test(flavor = "multi_thread")]
595 async fn test_module_db() {
596 let module_instance_id = 1;
597 let path = tempfile::Builder::new()
598 .prefix("fcb-rocksdb-test-module-db-prefix")
599 .tempdir()
600 .unwrap();
601
602 let module_db = Database::new(
603 RocksDb::open_blocking(path.as_ref()).unwrap(),
604 ModuleDecoderRegistry::default(),
605 );
606
607 fedimint_core::db::verify_module_db(
608 open_temp_db("fcb-rocksdb-test-module-db"),
609 module_db.with_prefix_module_id(module_instance_id).0,
610 )
611 .await;
612 }
613
614 #[test]
615 fn test_next_prefix() {
616 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
619 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
620 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
621 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
622 assert!(next_prefix(&[255, 255, 255]).is_none());
624 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
626 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
627 assert!(next_prefix(&[255]).is_none()); }
629
630 #[repr(u8)]
631 #[derive(Clone)]
632 pub enum TestDbKeyPrefix {
633 Test = 254,
634 MaxTest = 255,
635 }
636
637 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
638 pub(super) struct TestKey(pub Vec<u8>);
639
640 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
641 pub(super) struct TestVal(pub Vec<u8>);
642
643 #[derive(Debug, Encodable, Decodable)]
644 struct DbPrefixTestPrefix;
645
646 impl_db_record!(
647 key = TestKey,
648 value = TestVal,
649 db_prefix = TestDbKeyPrefix::Test,
650 notify_on_modify = true,
651 );
652 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
653
654 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
655 pub(super) struct TestKey2(pub Vec<u8>);
656
657 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
658 pub(super) struct TestVal2(pub Vec<u8>);
659
660 #[derive(Debug, Encodable, Decodable)]
661 struct DbPrefixTestPrefixMax;
662
663 impl_db_record!(
664 key = TestKey2,
665 value = TestVal2,
666 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
668 );
669 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
670
671 #[tokio::test(flavor = "multi_thread")]
672 async fn test_retrieve_descending_order() {
673 let path = tempfile::Builder::new()
674 .prefix("fcb-rocksdb-test-descending-order")
675 .tempdir()
676 .unwrap();
677 {
678 let db = Database::new(
679 RocksDb::open(&path).await.unwrap(),
680 ModuleDecoderRegistry::default(),
681 );
682 let mut dbtx = db.begin_transaction().await;
683 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
684 .await;
685 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
686 .await;
687 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
688 .await;
689 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
690 .await;
691 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
692 .await;
693 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
694 .await;
695 let query = dbtx
696 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
697 .await
698 .collect::<Vec<_>>()
699 .await;
700 assert_eq!(
701 query,
702 vec![
703 (TestKey(vec![255]), TestVal(vec![2])),
704 (TestKey(vec![254]), TestVal(vec![1])),
705 (TestKey(vec![0]), TestVal(vec![3]))
706 ]
707 );
708 let query = dbtx
709 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
710 .await
711 .collect::<Vec<_>>()
712 .await;
713 assert_eq!(
714 query,
715 vec![
716 (TestKey2(vec![255]), TestVal2(vec![2])),
717 (TestKey2(vec![254]), TestVal2(vec![1])),
718 (TestKey2(vec![0]), TestVal2(vec![3]))
719 ]
720 );
721 dbtx.commit_tx().await;
722 }
723 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
725 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
726 let mut dbtx = db_readonly.begin_transaction_nc().await;
727 let query = dbtx
728 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
729 .await
730 .collect::<Vec<_>>()
731 .await;
732 assert_eq!(
733 query,
734 vec![
735 (TestKey(vec![255]), TestVal(vec![2])),
736 (TestKey(vec![254]), TestVal(vec![1])),
737 (TestKey(vec![0]), TestVal(vec![3]))
738 ]
739 );
740 let query = dbtx
741 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
742 .await
743 .collect::<Vec<_>>()
744 .await;
745 assert_eq!(
746 query,
747 vec![
748 (TestKey2(vec![255]), TestVal2(vec![2])),
749 (TestKey2(vec![254]), TestVal2(vec![1])),
750 (TestKey2(vec![0]), TestVal2(vec![3]))
751 ]
752 );
753 }
754}