fedimint_cursed_redb/
lib.rs

1//! Uses immutable data structures and saves to redb on commit.
2
3use std::fmt::Debug;
4use std::ops::Range;
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use fedimint_core::db::{
9    DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
10    IRawDatabase, IRawDatabaseTransaction, PrefixStream,
11};
12use fedimint_core::{apply, async_trait_maybe_send};
13use futures::stream;
14use imbl::OrdMap;
15use redb::{Database, ReadableTable, TableDefinition};
16
17const KV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("fedimint_kv");
18
19#[derive(Debug, Default)]
20pub struct DatabaseInsertOperation {
21    pub key: Vec<u8>,
22    pub value: Vec<u8>,
23    pub old_value: Option<Vec<u8>>,
24}
25
26#[derive(Debug, Default)]
27pub struct DatabaseDeleteOperation {
28    pub key: Vec<u8>,
29    pub old_value: Option<Vec<u8>>,
30}
31
32#[derive(Debug)]
33pub enum DatabaseOperation {
34    Insert(DatabaseInsertOperation),
35    Delete(DatabaseDeleteOperation),
36}
37
38#[derive(Clone)]
39pub struct MemAndRedb {
40    data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>,
41    db: Arc<Database>,
42}
43
44impl Debug for MemAndRedb {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("MemDatabase").finish_non_exhaustive()
47    }
48}
49
50#[derive(Debug)]
51pub struct MemAndRedbTransaction<'a> {
52    operations: Vec<DatabaseOperation>,
53    tx_data: OrdMap<Vec<u8>, Vec<u8>>,
54    db: &'a MemAndRedb,
55}
56
57#[cfg(not(target_family = "wasm"))]
58mod native;
59
60#[cfg(target_family = "wasm")]
61mod wasm;
62
63impl MemAndRedb {
64    fn new_from_redb(db: Database) -> DatabaseResult<Self> {
65        let db = Arc::new(db);
66        let mut data = OrdMap::new();
67
68        // Load existing data from redb
69        let read_txn = db.begin_read().map_err(DatabaseError::backend)?;
70        if let Ok(table) = read_txn.open_table(KV_TABLE) {
71            for entry in table.iter().map_err(DatabaseError::backend)? {
72                let (key, value) = entry.map_err(DatabaseError::backend)?;
73                data.insert(key.value().to_vec(), value.value().to_vec());
74            }
75        }
76        // Table might not exist on first run, which is fine
77
78        Ok(Self {
79            data: Arc::new(Mutex::new(data)),
80            db,
81        })
82    }
83}
84
85#[apply(async_trait_maybe_send!)]
86impl IRawDatabase for MemAndRedb {
87    type Transaction<'a> = MemAndRedbTransaction<'a>;
88
89    async fn begin_transaction<'a>(&'a self) -> MemAndRedbTransaction<'a> {
90        MemAndRedbTransaction {
91            operations: Vec::new(),
92            tx_data: {
93                let data_lock = self.data.lock().expect("poison");
94                data_lock.clone()
95            },
96            db: self,
97        }
98    }
99
100    fn checkpoint(&self, _: &Path) -> DatabaseResult<()> {
101        unimplemented!()
102    }
103}
104
105#[apply(async_trait_maybe_send!)]
106impl<'a> IDatabaseTransactionOpsCore for MemAndRedbTransaction<'a> {
107    async fn raw_insert_bytes(
108        &mut self,
109        key: &[u8],
110        value: &[u8],
111    ) -> DatabaseResult<Option<Vec<u8>>> {
112        let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await;
113        // Insert data from copy so we can read our own writes
114        let old_value = self.tx_data.insert(key.to_vec(), value.to_vec());
115        self.operations
116            .push(DatabaseOperation::Insert(DatabaseInsertOperation {
117                key: key.to_vec(),
118                value: value.to_vec(),
119                old_value,
120            }));
121        val
122    }
123
124    async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
125        Ok(self.tx_data.get(key).cloned())
126    }
127
128    async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
129        // Remove data from copy so we can read our own writes
130        let old_value = self.tx_data.remove(&key.to_vec());
131        self.operations
132            .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
133                key: key.to_vec(),
134                old_value: old_value.clone(),
135            }));
136        Ok(old_value)
137    }
138
139    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
140        let data = self
141            .tx_data
142            .range::<_, Vec<u8>>(Range {
143                start: range.start.to_vec(),
144                end: range.end.to_vec(),
145            })
146            .map(|(key, value)| (key.clone(), value.clone()))
147            .collect::<Vec<_>>();
148
149        Ok(Box::pin(stream::iter(data)))
150    }
151
152    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
153        let data = self
154            .tx_data
155            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
156            .take_while(|(key, _)| key.starts_with(key_prefix))
157            .map(|(key, value)| (key.clone(), value.clone()))
158            .collect::<Vec<_>>();
159
160        Ok(Box::pin(stream::iter(data)))
161    }
162
163    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
164        let keys = self
165            .tx_data
166            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
167            .take_while(|(key, _)| key.starts_with(key_prefix))
168            .map(|(key, _)| key.clone())
169            .collect::<Vec<_>>();
170        for key in keys.iter() {
171            let old_value = self.tx_data.remove(&key.to_vec());
172            self.operations
173                .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
174                    key: key.to_vec(),
175                    old_value,
176                }));
177        }
178        Ok(())
179    }
180
181    async fn raw_find_by_prefix_sorted_descending(
182        &mut self,
183        key_prefix: &[u8],
184    ) -> DatabaseResult<PrefixStream<'_>> {
185        let mut data = self
186            .tx_data
187            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
188            .take_while(|(key, _)| key.starts_with(key_prefix))
189            .map(|(key, value)| (key.clone(), value.clone()))
190            .collect::<Vec<_>>();
191        data.sort_by(|a, b| a.cmp(b).reverse());
192
193        Ok(Box::pin(stream::iter(data)))
194    }
195}
196
197impl<'a> IDatabaseTransactionOps for MemAndRedbTransaction<'a> {}
198
199// In-memory database transaction should only be used for test code and never
200// for production as it doesn't properly implement MVCC
201#[apply(async_trait_maybe_send!)]
202impl<'a> IRawDatabaseTransaction for MemAndRedbTransaction<'a> {
203    async fn commit_tx(self) -> DatabaseResult<()> {
204        let mut data_locked = self.db.data.lock().expect("poison");
205        let write_txn = self.db.db.begin_write().map_err(DatabaseError::backend)?;
206        let operations = self.operations;
207        let mut data_new = data_locked.clone();
208        {
209            let mut table = write_txn
210                .open_table(KV_TABLE)
211                .map_err(DatabaseError::backend)?;
212
213            // Apply all operations
214            for op in operations {
215                match op {
216                    DatabaseOperation::Insert(insert_op) => {
217                        table
218                            .insert(&insert_op.key[..], &insert_op.value[..])
219                            .map_err(DatabaseError::backend)?;
220                        let old_value = data_new.insert(insert_op.key, insert_op.value);
221                        if old_value != insert_op.old_value {
222                            return Err(DatabaseError::WriteConflict);
223                        }
224                    }
225                    DatabaseOperation::Delete(delete_op) => {
226                        table
227                            .remove(&delete_op.key[..])
228                            .map_err(DatabaseError::backend)?;
229                        let old_value = data_new.remove(&delete_op.key);
230                        if old_value != delete_op.old_value {
231                            return Err(DatabaseError::WriteConflict);
232                        }
233                    }
234                }
235            }
236        }
237        // Commit redb transaction
238        write_txn.commit().map_err(DatabaseError::backend)?;
239
240        // Update in-memory data
241        *data_locked = data_new;
242        Ok(())
243    }
244}
245
246#[cfg(all(test, not(target_family = "wasm")))]
247mod tests {
248    use fedimint_core::db::Database;
249    use fedimint_core::module::registry::ModuleDecoderRegistry;
250    use tempfile::TempDir;
251
252    use super::*;
253
254    async fn open_temp_db(temp_path: &str) -> (Database, TempDir) {
255        let temp_dir = tempfile::Builder::new()
256            .prefix(temp_path)
257            .tempdir()
258            .unwrap();
259
260        let db_path = temp_dir.path().join("test.redb");
261        let locked_db = MemAndRedb::new(&db_path).await.unwrap();
262
263        let database = Database::new(locked_db, ModuleDecoderRegistry::default());
264        (database, temp_dir)
265    }
266
267    #[tokio::test(flavor = "multi_thread")]
268    async fn test_dbtx_insert_elements() {
269        let (db, _dir) = open_temp_db("fcb-redb-test-insert-elements").await;
270        fedimint_core::db::verify_insert_elements(db).await;
271    }
272
273    #[tokio::test(flavor = "multi_thread")]
274    async fn test_dbtx_remove_nonexisting() {
275        let (db, _dir) = open_temp_db("fcb-redb-test-remove-nonexisting").await;
276        fedimint_core::db::verify_remove_nonexisting(db).await;
277    }
278
279    #[tokio::test(flavor = "multi_thread")]
280    async fn test_dbtx_remove_existing() {
281        let (db, _dir) = open_temp_db("fcb-redb-test-remove-existing").await;
282        fedimint_core::db::verify_remove_existing(db).await;
283    }
284
285    #[tokio::test(flavor = "multi_thread")]
286    async fn test_dbtx_read_own_writes() {
287        let (db, _dir) = open_temp_db("fcb-redb-test-read-own-writes").await;
288        fedimint_core::db::verify_read_own_writes(db).await;
289    }
290
291    #[tokio::test(flavor = "multi_thread")]
292    async fn test_dbtx_prevent_dirty_reads() {
293        let (db, _dir) = open_temp_db("fcb-redb-test-prevent-dirty-reads").await;
294        fedimint_core::db::verify_prevent_dirty_reads(db).await;
295    }
296
297    #[tokio::test(flavor = "multi_thread")]
298    async fn test_dbtx_find_by_range() {
299        let (db, _dir) = open_temp_db("fcb-redb-test-find-by-range").await;
300        fedimint_core::db::verify_find_by_range(db).await;
301    }
302
303    #[tokio::test(flavor = "multi_thread")]
304    async fn test_dbtx_find_by_prefix() {
305        let (db, _dir) = open_temp_db("fcb-redb-test-find-by-prefix").await;
306        fedimint_core::db::verify_find_by_prefix(db).await;
307    }
308
309    #[tokio::test(flavor = "multi_thread")]
310    async fn test_dbtx_commit() {
311        let (db, _dir) = open_temp_db("fcb-redb-test-commit").await;
312        fedimint_core::db::verify_commit(db).await;
313    }
314
315    #[tokio::test(flavor = "multi_thread")]
316    async fn test_dbtx_prevent_nonrepeatable_reads() {
317        let (db, _dir) = open_temp_db("fcb-redb-test-prevent-nonrepeatable-reads").await;
318        fedimint_core::db::verify_prevent_nonrepeatable_reads(db).await;
319    }
320
321    #[tokio::test(flavor = "multi_thread")]
322    async fn test_dbtx_phantom_entry() {
323        let (db, _dir) = open_temp_db("fcb-redb-test-phantom-entry").await;
324        fedimint_core::db::verify_phantom_entry(db).await;
325    }
326
327    #[tokio::test(flavor = "multi_thread")]
328    async fn test_dbtx_write_conflict() {
329        let (db, _dir) = open_temp_db("fcb-redb-test-write-conflict").await;
330        fedimint_core::db::verify_snapshot_isolation(db).await;
331    }
332
333    #[tokio::test(flavor = "multi_thread")]
334    async fn test_dbtx_remove_by_prefix() {
335        let (db, _dir) = open_temp_db("fcb-redb-test-remove-by-prefix").await;
336        fedimint_core::db::verify_remove_by_prefix(db).await;
337    }
338}