1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::must_use_candidate)]
4#![allow(clippy::needless_lifetimes)]
5
6pub mod envs;
7
8use std::fmt;
9use std::ops::Range;
10use std::path::Path;
11use std::str::FromStr;
12
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use fedimint_core::db::{
16 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
17 PrefixStream,
18};
19use futures::stream;
20pub use rocksdb;
21use rocksdb::{
22 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
23};
24use tracing::debug;
25
26use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
27
28fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> + use<I>
31where
32 I: Iterator + Send + 'i,
33 I::Item: Send,
34{
35 stream::unfold(iter, |mut iter| async {
36 fedimint_core::runtime::block_in_place(|| {
37 let item = iter.next();
38 item.map(|item| (item, iter))
39 })
40 })
41}
42
43#[derive(Debug)]
44pub struct RocksDb(rocksdb::OptimisticTransactionDB);
45
46pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
47
48impl RocksDb {
49 pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> {
50 let mut opts = get_default_options()?;
51 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
54 let db: rocksdb::OptimisticTransactionDB =
55 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?;
56 Ok(RocksDb(db))
57 }
58
59 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
60 &self.0
61 }
62}
63
64fn is_power_of_two(num: usize) -> bool {
66 num.is_power_of_two()
67}
68
69impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.write_str("RocksDbTransaction")
72 }
73}
74
75impl<'a> fmt::Debug for RocksDbTransaction<'a> {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 f.write_str("RocksDbTransaction")
78 }
79}
80
81#[test]
82fn is_power_of_two_sanity() {
83 assert!(!is_power_of_two(0));
84 assert!(is_power_of_two(1));
85 assert!(is_power_of_two(2));
86 assert!(!is_power_of_two(3));
87 assert!(is_power_of_two(4));
88 assert!(!is_power_of_two(5));
89 assert!(is_power_of_two(2 << 10));
90 assert!(!is_power_of_two((2 << 10) + 1));
91}
92
93fn get_default_options() -> anyhow::Result<rocksdb::Options> {
94 let mut opts = rocksdb::Options::default();
95 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
96 debug!(var, "Using custom write buffer size");
97 let size: usize = FromStr::from_str(&var)
98 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
99 if !is_power_of_two(size) {
100 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
101 }
102 opts.set_write_buffer_size(size);
103 }
104 opts.create_if_missing(true);
105 Ok(opts)
106}
107
108#[derive(Debug)]
109pub struct RocksDbReadOnly(rocksdb::DB);
110
111pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
112
113impl RocksDbReadOnly {
114 pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
115 let opts = get_default_options()?;
116 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
117 Ok(RocksDbReadOnly(db))
118 }
119}
120
121impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
122 fn from(db: OptimisticTransactionDB) -> Self {
123 RocksDb(db)
124 }
125}
126
127impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
128 fn from(db: RocksDb) -> Self {
129 db.0
130 }
131}
132
133fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
139 let mut next_prefix = prefix.to_vec();
140 let mut is_last_prefix = true;
141 for i in (0..next_prefix.len()).rev() {
142 next_prefix[i] = next_prefix[i].wrapping_add(1);
143 if next_prefix[i] > 0 {
144 is_last_prefix = false;
145 break;
146 }
147 }
148 if is_last_prefix {
149 None
152 } else {
153 Some(next_prefix)
154 }
155}
156
157#[async_trait]
158impl IRawDatabase for RocksDb {
159 type Transaction<'a> = RocksDbTransaction<'a>;
160 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
161 let mut optimistic_options = OptimisticTransactionOptions::default();
162 optimistic_options.set_snapshot(true);
163
164 let mut write_options = WriteOptions::default();
165 write_options.set_sync(true);
167
168 let mut rocksdb_tx =
169 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
170 rocksdb_tx
171 .set_tx_savepoint()
172 .await
173 .expect("setting tx savepoint failed");
174
175 rocksdb_tx
176 }
177
178 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
179 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
180 checkpoint.create_checkpoint(backup_path)?;
181 Ok(())
182 }
183}
184
185#[async_trait]
186impl IRawDatabase for RocksDbReadOnly {
187 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
188 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
189 RocksDbReadOnlyTransaction(&self.0)
190 }
191
192 fn checkpoint(&self, backup_path: &Path) -> Result<()> {
193 let checkpoint = rocksdb::checkpoint::Checkpoint::new(&self.0)?;
194 checkpoint.create_checkpoint(backup_path)?;
195 Ok(())
196 }
197}
198
199#[async_trait]
200impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
201 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
202 fedimint_core::runtime::block_in_place(|| {
203 let val = self.0.snapshot().get(key).unwrap();
204 self.0.put(key, value)?;
205 Ok(val)
206 })
207 }
208
209 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
210 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
211 }
212
213 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
214 fedimint_core::runtime::block_in_place(|| {
215 let val = self.0.snapshot().get(key).unwrap();
216 self.0.delete(key)?;
217 Ok(val)
218 })
219 }
220
221 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
222 Ok(fedimint_core::runtime::block_in_place(|| {
223 let prefix = key_prefix.to_vec();
224 let mut options = rocksdb::ReadOptions::default();
225 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
226 let iter = self.0.snapshot().iterator_opt(
227 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
228 options,
229 );
230 let rocksdb_iter = iter.map_while(move |res| {
231 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
232 key_bytes
233 .starts_with(&prefix)
234 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
235 });
236 Box::pin(convert_to_async_stream(rocksdb_iter))
237 }))
238 }
239
240 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
241 Ok(fedimint_core::runtime::block_in_place(|| {
242 let range = Range {
243 start: range.start.to_vec(),
244 end: range.end.to_vec(),
245 };
246 let mut options = rocksdb::ReadOptions::default();
247 options.set_iterate_range(range.clone());
248 let iter = self.0.snapshot().iterator_opt(
249 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
250 options,
251 );
252 let rocksdb_iter = iter.map_while(move |res| {
253 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
254 (key_bytes.as_ref() < range.end.as_slice())
255 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
256 });
257 Box::pin(convert_to_async_stream(rocksdb_iter))
258 }))
259 }
260
261 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
262 fedimint_core::runtime::block_in_place(|| {
263 let mut options = rocksdb::ReadOptions::default();
265 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
266 let iter = self
267 .0
268 .snapshot()
269 .iterator_opt(
270 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
271 options,
272 )
273 .map_while(|res| {
274 res.map(|(key_bytes, _)| {
275 key_bytes
276 .starts_with(key_prefix)
277 .then_some(key_bytes.to_vec())
278 })
279 .transpose()
280 });
281
282 for item in iter {
283 let key = item?;
284 self.0.delete(key)?;
285 }
286
287 Ok(())
288 })
289 }
290
291 async fn raw_find_by_prefix_sorted_descending(
292 &mut self,
293 key_prefix: &[u8],
294 ) -> Result<PrefixStream<'_>> {
295 let prefix = key_prefix.to_vec();
296 let next_prefix = next_prefix(&prefix);
297 let iterator_mode = if let Some(next_prefix) = &next_prefix {
298 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
299 } else {
300 rocksdb::IteratorMode::End
301 };
302 Ok(fedimint_core::runtime::block_in_place(|| {
303 let mut options = rocksdb::ReadOptions::default();
304 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
305 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
306 let rocksdb_iter = iter.map_while(move |res| {
307 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
308 key_bytes
309 .starts_with(&prefix)
310 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
311 });
312 Box::pin(convert_to_async_stream(rocksdb_iter))
313 }))
314 }
315}
316
317#[async_trait]
318impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
319 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
320 Ok(fedimint_core::runtime::block_in_place(|| {
321 self.0.rollback_to_savepoint()
322 })?)
323 }
324
325 async fn set_tx_savepoint(&mut self) -> Result<()> {
326 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
327
328 Ok(())
329 }
330}
331
332#[async_trait]
333impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
334 async fn commit_tx(self) -> Result<()> {
335 fedimint_core::runtime::block_in_place(|| {
336 self.0.commit()?;
337 Ok(())
338 })
339 }
340}
341
342#[async_trait]
343impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
344 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
345 panic!("Cannot insert into a read only transaction");
346 }
347
348 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
349 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
350 }
351
352 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
353 panic!("Cannot remove from a read only transaction");
354 }
355
356 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
357 Ok(fedimint_core::runtime::block_in_place(|| {
358 let range = Range {
359 start: range.start.to_vec(),
360 end: range.end.to_vec(),
361 };
362 let mut options = rocksdb::ReadOptions::default();
363 options.set_iterate_range(range.clone());
364 let iter = self.0.snapshot().iterator_opt(
365 rocksdb::IteratorMode::From(&range.start, rocksdb::Direction::Forward),
366 options,
367 );
368 let rocksdb_iter = iter.map_while(move |res| {
369 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
370 (key_bytes.as_ref() < range.end.as_slice())
371 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
372 });
373 Box::pin(convert_to_async_stream(rocksdb_iter))
374 }))
375 }
376
377 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
378 Ok(fedimint_core::runtime::block_in_place(|| {
379 let prefix = key_prefix.to_vec();
380 let mut options = rocksdb::ReadOptions::default();
381 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
382 let iter = self.0.snapshot().iterator_opt(
383 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
384 options,
385 );
386 let rocksdb_iter = iter.map_while(move |res| {
387 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
388 key_bytes
389 .starts_with(&prefix)
390 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
391 });
392 Box::pin(convert_to_async_stream(rocksdb_iter))
393 }))
394 }
395
396 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
397 panic!("Cannot remove from a read only transaction");
398 }
399
400 async fn raw_find_by_prefix_sorted_descending(
401 &mut self,
402 key_prefix: &[u8],
403 ) -> Result<PrefixStream<'_>> {
404 let prefix = key_prefix.to_vec();
405 let next_prefix = next_prefix(&prefix);
406 let iterator_mode = if let Some(next_prefix) = &next_prefix {
407 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
408 } else {
409 rocksdb::IteratorMode::End
410 };
411 Ok(fedimint_core::runtime::block_in_place(|| {
412 let mut options = rocksdb::ReadOptions::default();
413 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
414 let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
415 let rocksdb_iter = iter.map_while(move |res| {
416 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
417 key_bytes
418 .starts_with(&prefix)
419 .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
420 });
421 Box::pin(stream::iter(rocksdb_iter))
422 }))
423 }
424}
425
426#[async_trait]
427impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
428 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
429 panic!("Cannot rollback a read only transaction");
430 }
431
432 async fn set_tx_savepoint(&mut self) -> Result<()> {
433 panic!("Cannot set a savepoint in a read only transaction");
434 }
435}
436
437#[async_trait]
438impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
439 async fn commit_tx(self) -> Result<()> {
440 panic!("Cannot commit a read only transaction");
441 }
442}
443
444#[cfg(test)]
445mod fedimint_rocksdb_tests {
446 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
447 use fedimint_core::encoding::{Decodable, Encodable};
448 use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
449 use fedimint_core::{impl_db_lookup, impl_db_record};
450 use futures::StreamExt;
451
452 use super::*;
453
454 fn open_temp_db(temp_path: &str) -> Database {
455 let path = tempfile::Builder::new()
456 .prefix(temp_path)
457 .tempdir()
458 .unwrap();
459
460 Database::new(
461 RocksDb::open(path).unwrap(),
462 ModuleDecoderRegistry::default(),
463 )
464 }
465
466 #[tokio::test(flavor = "multi_thread")]
467 async fn test_dbtx_insert_elements() {
468 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
469 .await;
470 }
471
472 #[tokio::test(flavor = "multi_thread")]
473 async fn test_dbtx_remove_nonexisting() {
474 fedimint_core::db::verify_remove_nonexisting(open_temp_db(
475 "fcb-rocksdb-test-remove-nonexisting",
476 ))
477 .await;
478 }
479
480 #[tokio::test(flavor = "multi_thread")]
481 async fn test_dbtx_remove_existing() {
482 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
483 .await;
484 }
485
486 #[tokio::test(flavor = "multi_thread")]
487 async fn test_dbtx_read_own_writes() {
488 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
489 .await;
490 }
491
492 #[tokio::test(flavor = "multi_thread")]
493 async fn test_dbtx_prevent_dirty_reads() {
494 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
495 "fcb-rocksdb-test-prevent-dirty-reads",
496 ))
497 .await;
498 }
499
500 #[tokio::test(flavor = "multi_thread")]
501 async fn test_dbtx_find_by_range() {
502 fedimint_core::db::verify_find_by_range(open_temp_db("fcb-rocksdb-test-find-by-range"))
503 .await;
504 }
505
506 #[tokio::test(flavor = "multi_thread")]
507 async fn test_dbtx_find_by_prefix() {
508 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
509 .await;
510 }
511
512 #[tokio::test(flavor = "multi_thread")]
513 async fn test_dbtx_commit() {
514 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
515 }
516
517 #[tokio::test(flavor = "multi_thread")]
518 async fn test_dbtx_prevent_nonrepeatable_reads() {
519 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
520 "fcb-rocksdb-test-prevent-nonrepeatable-reads",
521 ))
522 .await;
523 }
524
525 #[tokio::test(flavor = "multi_thread")]
526 async fn test_dbtx_snapshot_isolation() {
527 fedimint_core::db::verify_snapshot_isolation(open_temp_db(
528 "fcb-rocksdb-test-snapshot-isolation",
529 ))
530 .await;
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn test_dbtx_rollback_to_savepoint() {
535 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
536 "fcb-rocksdb-test-rollback-to-savepoint",
537 ))
538 .await;
539 }
540
541 #[tokio::test(flavor = "multi_thread")]
542 async fn test_dbtx_phantom_entry() {
543 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
544 .await;
545 }
546
547 #[tokio::test(flavor = "multi_thread")]
548 async fn test_dbtx_write_conflict() {
549 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
550 .await;
551 }
552
553 #[tokio::test(flavor = "multi_thread")]
554 async fn test_dbtx_remove_by_prefix() {
555 fedimint_core::db::verify_remove_by_prefix(open_temp_db(
556 "fcb-rocksdb-test-remove-by-prefix",
557 ))
558 .await;
559 }
560
561 #[tokio::test(flavor = "multi_thread")]
562 async fn test_module_dbtx() {
563 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
564 .await;
565 }
566
567 #[tokio::test(flavor = "multi_thread")]
568 async fn test_module_db() {
569 let module_instance_id = 1;
570 let path = tempfile::Builder::new()
571 .prefix("fcb-rocksdb-test-module-db-prefix")
572 .tempdir()
573 .unwrap();
574
575 let module_db = Database::new(
576 RocksDb::open(path).unwrap(),
577 ModuleDecoderRegistry::default(),
578 );
579
580 fedimint_core::db::verify_module_db(
581 open_temp_db("fcb-rocksdb-test-module-db"),
582 module_db.with_prefix_module_id(module_instance_id).0,
583 )
584 .await;
585 }
586
587 #[test]
588 fn test_next_prefix() {
589 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
592 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
593 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
594 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
595 assert!(next_prefix(&[255, 255, 255]).is_none());
597 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
599 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
600 assert!(next_prefix(&[255]).is_none()); }
602
603 #[repr(u8)]
604 #[derive(Clone)]
605 pub enum TestDbKeyPrefix {
606 Test = 254,
607 MaxTest = 255,
608 }
609
610 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
611 pub(super) struct TestKey(pub Vec<u8>);
612
613 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
614 pub(super) struct TestVal(pub Vec<u8>);
615
616 #[derive(Debug, Encodable, Decodable)]
617 struct DbPrefixTestPrefix;
618
619 impl_db_record!(
620 key = TestKey,
621 value = TestVal,
622 db_prefix = TestDbKeyPrefix::Test,
623 notify_on_modify = true,
624 );
625 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
626
627 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
628 pub(super) struct TestKey2(pub Vec<u8>);
629
630 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
631 pub(super) struct TestVal2(pub Vec<u8>);
632
633 #[derive(Debug, Encodable, Decodable)]
634 struct DbPrefixTestPrefixMax;
635
636 impl_db_record!(
637 key = TestKey2,
638 value = TestVal2,
639 db_prefix = TestDbKeyPrefix::MaxTest, notify_on_modify = true,
641 );
642 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
643
644 #[tokio::test(flavor = "multi_thread")]
645 async fn test_retrieve_descending_order() {
646 let path = tempfile::Builder::new()
647 .prefix("fcb-rocksdb-test-descending-order")
648 .tempdir()
649 .unwrap();
650 {
651 let db = Database::new(
652 RocksDb::open(&path).unwrap(),
653 ModuleDecoderRegistry::default(),
654 );
655 let mut dbtx = db.begin_transaction().await;
656 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
657 .await;
658 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
659 .await;
660 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
661 .await;
662 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
663 .await;
664 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
665 .await;
666 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
667 .await;
668 let query = dbtx
669 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
670 .await
671 .collect::<Vec<_>>()
672 .await;
673 assert_eq!(
674 query,
675 vec![
676 (TestKey(vec![255]), TestVal(vec![2])),
677 (TestKey(vec![254]), TestVal(vec![1])),
678 (TestKey(vec![0]), TestVal(vec![3]))
679 ]
680 );
681 let query = dbtx
682 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
683 .await
684 .collect::<Vec<_>>()
685 .await;
686 assert_eq!(
687 query,
688 vec![
689 (TestKey2(vec![255]), TestVal2(vec![2])),
690 (TestKey2(vec![254]), TestVal2(vec![1])),
691 (TestKey2(vec![0]), TestVal2(vec![3]))
692 ]
693 );
694 dbtx.commit_tx().await;
695 }
696 let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap();
698 let db_readonly = Database::new(db_readonly, ModuleRegistry::default());
699 let mut dbtx = db_readonly.begin_transaction_nc().await;
700 let query = dbtx
701 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
702 .await
703 .collect::<Vec<_>>()
704 .await;
705 assert_eq!(
706 query,
707 vec![
708 (TestKey(vec![255]), TestVal(vec![2])),
709 (TestKey(vec![254]), TestVal(vec![1])),
710 (TestKey(vec![0]), TestVal(vec![3]))
711 ]
712 );
713 let query = dbtx
714 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
715 .await
716 .collect::<Vec<_>>()
717 .await;
718 assert_eq!(
719 query,
720 vec![
721 (TestKey2(vec![255]), TestVal2(vec![2])),
722 (TestKey2(vec![254]), TestVal2(vec![1])),
723 (TestKey2(vec![0]), TestVal2(vec![3]))
724 ]
725 );
726 }
727}