1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod db_locked;
7pub mod envs;
8
9use std::fmt;
10use std::ops::Range;
11use std::path::Path;
12use std::str::FromStr;
13
14use anyhow::{Context, Result, bail};
15use async_trait::async_trait;
16use db_locked::{Locked, LockedBuilder};
17use fedimint_core::db::{
18 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
19 PrefixStream,
20};
21use fedimint_core::task::block_in_place;
22use futures::stream;
23pub use rocksdb;
24use rocksdb::{
25 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
26};
27use tracing::debug;
28
29use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
30
31fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
34where
35 I: Iterator + Send + 'i,
36 I::Item: Send,
37{
38 stream::unfold(iter, |mut iter| async {
39 fedimint_core::runtime::block_in_place(|| {
40 let item = iter.next();
41 item.map(|item| (item, iter))
42 })
43 })
44}
45
46#[derive(Debug)]
47pub struct RocksDb(rocksdb::OptimisticTransactionDB);
48
49pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
50
51impl RocksDb {
52 #[allow(clippy::unused_async)]
53 pub async fn open(db_path: impl AsRef<Path>) -> anyhow::Result<Locked<RocksDb>> {
54 let db_path = db_path.as_ref();
55
56 block_in_place(|| Self::open_blocking(db_path))
57 }
58 pub fn open_blocking(db_path: &Path) -> anyhow::Result<Locked<RocksDb>> {
59 block_in_place(|| {
60 std::fs::create_dir_all(
61 db_path
62 .parent()
63 .ok_or_else(|| anyhow::anyhow!("db path must have a base dir"))?,
64 )?;
65 Ok(LockedBuilder::new(db_path)?.with_db(Self::open_blocking_unlocked(db_path)?))
66 })
67 }
68 pub fn open_blocking_unlocked(db_path: &Path) -> anyhow::Result<RocksDb> {
69 let mut opts = get_default_options()?;
70 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
73 let db: rocksdb::OptimisticTransactionDB =
74 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, db_path)?;
75 Ok(RocksDb(db))
76 }
77
78 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
79 &self.0
80 }
81}
82
83fn is_power_of_two(num: usize) -> bool {
85 num.is_power_of_two()
86}
87
88impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.write_str("RocksDbTransaction")
91 }
92}
93
94impl<'a> fmt::Debug for RocksDbTransaction<'a> {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.write_str("RocksDbTransaction")
97 }
98}
99
100#[test]
101fn is_power_of_two_sanity() {
102 assert!(!is_power_of_two(0));
103 assert!(is_power_of_two(1));
104 assert!(is_power_of_two(2));
105 assert!(!is_power_of_two(3));
106 assert!(is_power_of_two(4));
107 assert!(!is_power_of_two(5));
108 assert!(is_power_of_two(2 << 10));
109 assert!(!is_power_of_two((2 << 10) + 1));
110}
111
112fn get_default_options() -> anyhow::Result<rocksdb::Options> {
113 let mut opts = rocksdb::Options::default();
114 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
115 debug!(var, "Using custom write buffer size");
116 let size: usize = FromStr::from_str(&var)
117 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
118 if !is_power_of_two(size) {
119 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
120 }
121 opts.set_write_buffer_size(size);
122 }
123 opts.create_if_missing(true);
124 Ok(opts)
125}
126
127#[derive(Debug)]
128pub struct RocksDbReadOnly(rocksdb::DB);
129
130pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
131
132impl RocksDbReadOnly {
133 #[allow(clippy::unused_async)]
134 pub async fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
135 let db_path = db_path.as_ref();
136 block_in_place(|| Self::open_read_only_blocking(db_path))
137 }
138
139 pub fn open_read_only_blocking(db_path: &Path) -> anyhow::Result<RocksDbReadOnly> {
140 let opts = get_default_options()?;
141 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
143 Ok(RocksDbReadOnly(db))
144 }
145}
146
147impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
148 fn from(db: OptimisticTransactionDB) -> Self {
149 RocksDb(db)
150 }
151}
152
153impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
154 fn from(db: RocksDb) -> Self {
155 db.0
156 }
157}
158
159fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
165 let mut next_prefix = prefix.to_vec();
166 let mut is_last_prefix = true;
167 for i in (0..next_prefix.len()).rev() {
168 next_prefix[i] = next_prefix[i].wrapping_add(1);
169 if next_prefix[i] > 0 {
170 is_last_prefix = false;
171 break;
172 }
173 }
174 if is_last_prefix {
175 None
178 } else {
179 Some(next_prefix)
180 }
181}
182
183#[async_trait]
184impl IRawDatabase for RocksDb {
185 type Transaction<'a> = RocksDbTransaction<'a>;
186 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
187 let mut optimistic_options = OptimisticTransactionOptions::default();
188 optimistic_options.set_snapshot(true);
189
190 let mut write_options = WriteOptions::default();
191 write_options.set_sync(true);
193
194 let mut rocksdb_tx =
195 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
196 rocksdb_tx
197 .set_tx_savepoint()
198 .await
199 .expect("setting tx savepoint failed");
200
201 rocksdb_tx
202 }
203
204 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
205 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
206 checkpoint.create_checkpoint(backup_path)?;
207 Ok(())
208 }
209}
210
211#[async_trait]
212impl IRawDatabase for RocksDbReadOnly {
213 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
214 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
215 RocksDbReadOnlyTransaction(&self.0)
216 }
217
218 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
219 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
220 checkpoint.create_checkpoint(backup_path)?;
221 Ok(())
222 }
223}
224
225#[async_trait]
226impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
227 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
228 fedimint_core::runtime::block_in_place(|| {
229 let val = self.0.snapshot().get(key).unwrap();
230 self.0.put(key, value)?;
231 Ok(val)
232 })
233 }
234
235 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
236 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
237 }
238
239 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
240 fedimint_core::runtime::block_in_place(|| {
241 let val = self.0.snapshot().get(key).unwrap();
242 self.0.delete(key)?;
243 Ok(val)
244 })
245 }
246
247 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
248 Ok(fedimint_core::runtime::block_in_place(|| {
249 let prefix = key_prefix.to_vec();
250 let mut options = rocksdb::ReadOptions::default();
251 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
252 let iter = self.0.snapshot().iterator_opt(
253 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
254 options,
255 );
256 let rocksdb_iter = iter.map_while(move |res| {
257 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
258 key_bytes
259 .starts_with(&prefix)
260 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
261 });
262 Box::pin(convert_to_async_stream(rocksdb_iter))
263 }))
264 }
265
266 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
267 Ok(fedimint_core::runtime::block_in_place(|| {
268 let range = Range {
269 start: range.start.to_vec(),
270 end: range.end.to_vec(),
271 };
272 let mut options = rocksdb::ReadOptions::default();
273 options.set_iterate_range(range.clone());
274 let iter = self.0.snapshot().iterator_opt(
275 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
276 options,
277 );
278 let rocksdb_iter = iter.map_while(move |res| {
279 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
280 (key_bytes.as_ref() < range.end.as_slice())
281 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
282 });
283 Box::pin(convert_to_async_stream(rocksdb_iter))
284 }))
285 }
286
287 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
288 fedimint_core::runtime::block_in_place(|| {
289 let mut options = rocksdb::ReadOptions::default();
291 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
292 let iter = self
293 .0
294 .snapshot()
295 .iterator_opt(
296 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
297 options,
298 )
299 .map_while(|res| {
300 res.map(|(key_bytes, _)| {
301 key_bytes
302 .starts_with(key_prefix)
303 .then_some(key_bytes.to_vec())
304 })
305 .transpose()
306 });
307
308 for item in iter {
309 let key = item?;
310 self.0.delete(key)?;
311 }
312
313 Ok(())
314 })
315 }
316
317 async fn raw_find_by_prefix_sorted_descending(
318 &mut self,
319 key_prefix: &[u8],
320 ) -> Result<PrefixStream<'_>> {
321 let prefix = key_prefix.to_vec();
322 let next_prefix = next_prefix(&prefix);
323 let iterator_mode = if let Some(next_prefix) = &next_prefix {
324 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
325 } else {
326 rocksdb::IteratorMode::End
327 };
328 Ok(fedimint_core::runtime::block_in_place(|| {
329 let mut options = rocksdb::ReadOptions::default();
330 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
331 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
332 let rocksdb_iter = iter.map_while(move |res| {
333 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
334 key_bytes
335 .starts_with(&prefix)
336 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
337 });
338 Box::pin(convert_to_async_stream(rocksdb_iter))
339 }))
340 }
341}
342
343#[async_trait]
344impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
345 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
346 Ok(fedimint_core::runtime::block_in_place(|| {
347 self.0.rollback_to_savepoint()
348 })?)
349 }
350
351 async fn set_tx_savepoint(&mut self) -> Result<()> {
352 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
353
354 Ok(())
355 }
356}
357
358#[async_trait]
359impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
360 async fn commit_tx(self) -> Result<()> {
361 fedimint_core::runtime::block_in_place(|| {
362 self.0.commit()?;
363 Ok(())
364 })
365 }
366}
367
368#[async_trait]
369impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
370 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
371 panic!("Cannot insert into a read only transaction");
372 }
373
374 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
375 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
376 }
377
378 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
379 panic!("Cannot remove from a read only transaction");
380 }
381
382 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
383 Ok(fedimint_core::runtime::block_in_place(|| {
384 let range = Range {
385 start: range.start.to_vec(),
386 end: range.end.to_vec(),
387 };
388 let mut options = rocksdb::ReadOptions::default();
389 options.set_iterate_range(range.clone());
390 let iter = self.0.snapshot().iterator_opt(
391 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
392 options,
393 );
394 let rocksdb_iter = iter.map_while(move |res| {
395 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
396 (key_bytes.as_ref() < range.end.as_slice())
397 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
398 });
399 Box::pin(convert_to_async_stream(rocksdb_iter))
400 }))
401 }
402
403 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
404 Ok(fedimint_core::runtime::block_in_place(|| {
405 let prefix = key_prefix.to_vec();
406 let mut options = rocksdb::ReadOptions::default();
407 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
408 let iter = self.0.snapshot().iterator_opt(
409 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
410 options,
411 );
412 let rocksdb_iter = iter.map_while(move |res| {
413 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
414 key_bytes
415 .starts_with(&prefix)
416 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
417 });
418 Box::pin(convert_to_async_stream(rocksdb_iter))
419 }))
420 }
421
422 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
423 panic!("Cannot remove from a read only transaction");
424 }
425
426 async fn raw_find_by_prefix_sorted_descending(
427 &mut self,
428 key_prefix: &[u8],
429 ) -> Result<PrefixStream<'_>> {
430 let prefix = key_prefix.to_vec();
431 let next_prefix = next_prefix(&prefix);
432 let iterator_mode = if let Some(next_prefix) = &next_prefix {
433 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
434 } else {
435 rocksdb::IteratorMode::End
436 };
437 Ok(fedimint_core::runtime::block_in_place(|| {
438 let mut options = rocksdb::ReadOptions::default();
439 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
440 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
441 let rocksdb_iter = iter.map_while(move |res| {
442 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
443 key_bytes
444 .starts_with(&prefix)
445 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
446 });
447 Box::pin(stream::iter(rocksdb_iter))
448 }))
449 }
450}
451
452#[async_trait]
453impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
454 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
455 panic!("Cannot rollback a read only transaction");
456 }
457
458 async fn set_tx_savepoint(&mut self) -> Result<()> {
459 panic!("Cannot set a savepoint in a read only transaction");
460 }
461}
462
463#[async_trait]
464impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
465 async fn commit_tx(self) -> Result<()> {
466 panic!("Cannot commit a read only transaction");
467 }
468}
469
470#[cfg(test)]
471mod fedimint_rocksdb_tests {
472 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
473 use fedimint_core::encoding::{Decodable, Encodable};
474 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
475 use fedimint_core::{impl_db_lookup, impl_db_record};
476 use futures::StreamExt;
477
478 use super::*;
479
480 fn open_temp_db(temp_path: &str) -> Database {
481 let path = tempfile::Builder::new()
482 .prefix(temp_path)
483 .tempdir()
484 .unwrap();
485
486 Database::new(
487 RocksDb::open_blocking(path.as_ref()).unwrap(),
488 ModuleDecoderRegistry::default(),
489 )
490 }
491
492 #[tokio::test(flavor = "multi_thread")]
493 async fn test_dbtx_insert_elements() {
494 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
495 .await;
496 }
497
498 #[tokio::test(flavor = "multi_thread")]
499 async fn test_dbtx_remove_nonexisting() {
500 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
501 "fcb-rocksdb-test-remove-nonexisting",
502 ))
503 .await;
504 }
505
506 #[tokio::test(flavor = "multi_thread")]
507 async fn test_dbtx_remove_existing() {
508 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
509 .await;
510 }
511
512 #[tokio::test(flavor = "multi_thread")]
513 async fn test_dbtx_read_own_writes() {
514 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
515 .await;
516 }
517
518 #[tokio::test(flavor = "multi_thread")]
519 async fn test_dbtx_prevent_dirty_reads() {
520 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
521 "fcb-rocksdb-test-prevent-dirty-reads",
522 ))
523 .await;
524 }
525
526 #[tokio::test(flavor = "multi_thread")]
527 async fn test_dbtx_find_by_range() {
528 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
529 .await;
530 }
531
532 #[tokio::test(flavor = "multi_thread")]
533 async fn test_dbtx_find_by_prefix() {
534 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
535 .await;
536 }
537
538 #[tokio::test(flavor = "multi_thread")]
539 async fn test_dbtx_commit() {
540 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
541 }
542
543 #[tokio::test(flavor = "multi_thread")]
544 async fn test_dbtx_prevent_nonrepeatable_reads() {
545 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
546 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
547 ))
548 .await;
549 }
550
551 #[tokio::test(flavor = "multi_thread")]
552 async fn test_dbtx_snapshot_isolation() {
553 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
554 "fcb-rocksdb-test-snapshot-isolation",
555 ))
556 .await;
557 }
558
559 #[tokio::test(flavor = "multi_thread")]
560 async fn test_dbtx_rollback_to_savepoint() {
561 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
562 "fcb-rocksdb-test-rollback-to-savepoint",
563 ))
564 .await;
565 }
566
567 #[tokio::test(flavor = "multi_thread")]
568 async fn test_dbtx_phantom_entry() {
569 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
570 .await;
571 }
572
573 #[tokio::test(flavor = "multi_thread")]
574 async fn test_dbtx_write_conflict() {
575 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
576 .await;
577 }
578
579 #[tokio::test(flavor = "multi_thread")]
580 async fn test_dbtx_remove_by_prefix() {
581 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
582 "fcb-rocksdb-test-remove-by-prefix",
583 ))
584 .await;
585 }
586
587 #[tokio::test(flavor = "multi_thread")]
588 async fn test_module_dbtx() {
589 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
590 .await;
591 }
592
593 #[tokio::test(flavor = "multi_thread")]
594 async fn test_module_db() {
595 let module_instance_id = 1;
596 let path = tempfile::Builder::new()
597 .prefix("fcb-rocksdb-test-module-db-prefix")
598 .tempdir()
599 .unwrap();
600
601 let module_db = Database::new(
602 RocksDb::open_blocking(path.as_ref()).unwrap(),
603 ModuleDecoderRegistry::default(),
604 );
605
606 fedimint_core::db::verify_module_db(
607 open_temp_db("fcb-rocksdb-test-module-db"),
608 module_db.with_prefix_module_id(module_instance_id).0,
609 )
610 .await;
611 }
612
613 #[test]
614 fn test_next_prefix() {
615 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
618 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
619 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
620 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
621 assert!(next_prefix(&[255, 255, 255]).is_none());
623 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
625 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
626 assert!(next_prefix(&[255]).is_none()); }
628
629 #[repr(u8)]
630 #[derive(Clone)]
631 pub enum TestDbKeyPrefix {
632 Test = 254,
633 MaxTest = 255,
634 }
635
636 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
637 pub(super) struct TestKey(pub Vec<u8>);
638
639 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
640 pub(super) struct TestVal(pub Vec<u8>);
641
642 #[derive(Debug, Encodable, Decodable)]
643 struct DbPrefixTestPrefix;
644
645 impl_db_record!(
646 key = TestKey,
647 value = TestVal,
648 db_prefix = TestDbKeyPrefix::Test,
649 notify_on_modify = true,
650 );
651 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
652
653 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
654 pub(super) struct TestKey2(pub Vec<u8>);
655
656 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
657 pub(super) struct TestVal2(pub Vec<u8>);
658
659 #[derive(Debug, Encodable, Decodable)]
660 struct DbPrefixTestPrefixMax;
661
662 impl_db_record!(
663 key = TestKey2,
664 value = TestVal2,
665 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
667 );
668 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
669
670 #[tokio::test(flavor = "multi_thread")]
671 async fn test_retrieve_descending_order() {
672 let path = tempfile::Builder::new()
673 .prefix("fcb-rocksdb-test-descending-order")
674 .tempdir()
675 .unwrap();
676 {
677 let db = Database::new(
678 RocksDb::open(&path).await.unwrap(),
679 ModuleDecoderRegistry::default(),
680 );
681 let mut dbtx = db.begin_transaction().await;
682 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
683 .await;
684 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
685 .await;
686 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
687 .await;
688 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
689 .await;
690 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
691 .await;
692 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
693 .await;
694 let query = dbtx
695 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
696 .await
697 .collect::<Vec<_>>()
698 .await;
699 assert_eq!(
700 query,
701 vec![
702 (TestKey(vec![255]), TestVal(vec![2])),
703 (TestKey(vec![254]), TestVal(vec![1])),
704 (TestKey(vec![0]), TestVal(vec![3]))
705 ]
706 );
707 let query = dbtx
708 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
709 .await
710 .collect::<Vec<_>>()
711 .await;
712 assert_eq!(
713 query,
714 vec![
715 (TestKey2(vec![255]), TestVal2(vec![2])),
716 (TestKey2(vec![254]), TestVal2(vec![1])),
717 (TestKey2(vec![0]), TestVal2(vec![3]))
718 ]
719 );
720 dbtx.commit_tx().await;
721 }
722 let db_readonly = RocksDbReadOnly::open_read_only(path).await.unwrap();
724 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
725 let mut dbtx = db_readonly.begin_transaction_nc().await;
726 let query = dbtx
727 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
728 .await
729 .collect::<Vec<_>>()
730 .await;
731 assert_eq!(
732 query,
733 vec![
734 (TestKey(vec![255]), TestVal(vec![2])),
735 (TestKey(vec![254]), TestVal(vec![1])),
736 (TestKey(vec![0]), TestVal(vec![3]))
737 ]
738 );
739 let query = dbtx
740 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
741 .await
742 .collect::<Vec<_>>()
743 .await;
744 assert_eq!(
745 query,
746 vec![
747 (TestKey2(vec![255]), TestVal2(vec![2])),
748 (TestKey2(vec![254]), TestVal2(vec![1])),
749 (TestKey2(vec![0]), TestVal2(vec![3]))
750 ]
751 );
752 }
753}