fedimint_cursed_redb/
lib.rs

1//! Uses immutable data structures and saves to redb on commit.
2
3use std::fmt::Debug;
4use std::ops::Range;
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use anyhow::{Context as _, Result};
9use fedimint_core::db::{
10    IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
11    PrefixStream,
12};
13use fedimint_core::{apply, async_trait_maybe_send};
14use futures::stream;
15use imbl::OrdMap;
16use redb::{Database, ReadableTable, TableDefinition};
17
18const KV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("fedimint_kv");
19
20#[derive(Debug, Default)]
21pub struct DatabaseInsertOperation {
22    pub key: Vec<u8>,
23    pub value: Vec<u8>,
24    pub old_value: Option<Vec<u8>>,
25}
26
27#[derive(Debug, Default)]
28pub struct DatabaseDeleteOperation {
29    pub key: Vec<u8>,
30    pub old_value: Option<Vec<u8>>,
31}
32
33#[derive(Debug)]
34pub enum DatabaseOperation {
35    Insert(DatabaseInsertOperation),
36    Delete(DatabaseDeleteOperation),
37}
38
39#[derive(Clone)]
40pub struct MemAndRedb {
41    data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>,
42    db: Arc<Database>,
43}
44
45impl Debug for MemAndRedb {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("MemDatabase").finish_non_exhaustive()
48    }
49}
50
51#[derive(Debug)]
52pub struct MemAndRedbTransaction<'a> {
53    operations: Vec<DatabaseOperation>,
54    tx_data: OrdMap<Vec<u8>, Vec<u8>>,
55    db: &'a MemAndRedb,
56}
57
58#[cfg(not(target_family = "wasm"))]
59mod native;
60
61#[cfg(target_family = "wasm")]
62mod wasm;
63
64impl MemAndRedb {
65    fn new_from_redb(db: Database) -> Result<Self> {
66        let db = Arc::new(db);
67        let mut data = OrdMap::new();
68
69        // Load existing data from redb
70        let read_txn = db
71            .begin_read()
72            .context("Failed to begin read transaction")?;
73        if let Ok(table) = read_txn.open_table(KV_TABLE) {
74            for entry in table.iter()? {
75                let (key, value) = entry?;
76                data.insert(key.value().to_vec(), value.value().to_vec());
77            }
78        }
79        // Table might not exist on first run, which is fine
80
81        Ok(Self {
82            data: Arc::new(Mutex::new(data)),
83            db,
84        })
85    }
86}
87
88#[apply(async_trait_maybe_send!)]
89impl IRawDatabase for MemAndRedb {
90    type Transaction<'a> = MemAndRedbTransaction<'a>;
91
92    async fn begin_transaction<'a>(&'a self) -> MemAndRedbTransaction<'a> {
93        MemAndRedbTransaction {
94            operations: Vec::new(),
95            tx_data: {
96                let data_lock = self.data.lock().expect("poison");
97                data_lock.clone()
98            },
99            db: self,
100        }
101    }
102
103    fn checkpoint(&self, _: &Path) -> Result<()> {
104        unimplemented!()
105    }
106}
107
108#[apply(async_trait_maybe_send!)]
109impl<'a> IDatabaseTransactionOpsCore for MemAndRedbTransaction<'a> {
110    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
111        let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await;
112        // Insert data from copy so we can read our own writes
113        let old_value = self.tx_data.insert(key.to_vec(), value.to_vec());
114        self.operations
115            .push(DatabaseOperation::Insert(DatabaseInsertOperation {
116                key: key.to_vec(),
117                value: value.to_vec(),
118                old_value,
119            }));
120        val
121    }
122
123    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
124        Ok(self.tx_data.get(key).cloned())
125    }
126
127    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
128        // Remove data from copy so we can read our own writes
129        let old_value = self.tx_data.remove(&key.to_vec());
130        self.operations
131            .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
132                key: key.to_vec(),
133                old_value: old_value.clone(),
134            }));
135        Ok(old_value)
136    }
137
138    async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
139        let data = self
140            .tx_data
141            .range::<_, Vec<u8>>(Range {
142                start: range.start.to_vec(),
143                end: range.end.to_vec(),
144            })
145            .map(|(key, value)| (key.clone(), value.clone()))
146            .collect::<Vec<_>>();
147
148        Ok(Box::pin(stream::iter(data)))
149    }
150
151    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
152        let data = self
153            .tx_data
154            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
155            .take_while(|(key, _)| key.starts_with(key_prefix))
156            .map(|(key, value)| (key.clone(), value.clone()))
157            .collect::<Vec<_>>();
158
159        Ok(Box::pin(stream::iter(data)))
160    }
161
162    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
163        let keys = self
164            .tx_data
165            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
166            .take_while(|(key, _)| key.starts_with(key_prefix))
167            .map(|(key, _)| key.clone())
168            .collect::<Vec<_>>();
169        for key in keys.iter() {
170            let old_value = self.tx_data.remove(&key.to_vec());
171            self.operations
172                .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
173                    key: key.to_vec(),
174                    old_value,
175                }));
176        }
177        Ok(())
178    }
179
180    async fn raw_find_by_prefix_sorted_descending(
181        &mut self,
182        key_prefix: &[u8],
183    ) -> Result<PrefixStream<'_>> {
184        let mut data = self
185            .tx_data
186            .range::<_, Vec<u8>>((key_prefix.to_vec())..)
187            .take_while(|(key, _)| key.starts_with(key_prefix))
188            .map(|(key, value)| (key.clone(), value.clone()))
189            .collect::<Vec<_>>();
190        data.sort_by(|a, b| a.cmp(b).reverse());
191
192        Ok(Box::pin(stream::iter(data)))
193    }
194}
195
196#[apply(async_trait_maybe_send!)]
197impl<'a> IDatabaseTransactionOps for MemAndRedbTransaction<'a> {
198    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
199        unimplemented!()
200    }
201
202    async fn set_tx_savepoint(&mut self) -> Result<()> {
203        unimplemented!()
204    }
205}
206
207// In-memory database transaction should only be used for test code and never
208// for production as it doesn't properly implement MVCC
209#[apply(async_trait_maybe_send!)]
210impl<'a> IRawDatabaseTransaction for MemAndRedbTransaction<'a> {
211    async fn commit_tx(self) -> Result<()> {
212        let mut data_locked = self.db.data.lock().expect("poison");
213        let write_txn = self.db.db.begin_write()?;
214        let operations = self.operations;
215        let mut data_new = data_locked.clone();
216        {
217            let mut table = write_txn
218                .open_table(KV_TABLE)
219                .context("Failed to open redb table")?;
220
221            // Apply all operations
222            for op in operations {
223                match op {
224                    DatabaseOperation::Insert(insert_op) => {
225                        table
226                            .insert(&insert_op.key[..], &insert_op.value[..])
227                            .context("Failed to insert into redb")?;
228                        let old_value = data_new.insert(insert_op.key, insert_op.value);
229                        anyhow::ensure!(old_value == insert_op.old_value, "write-write conflict");
230                    }
231                    DatabaseOperation::Delete(delete_op) => {
232                        table
233                            .remove(&delete_op.key[..])
234                            .context("Failed to delete from redb")?;
235                        let old_value = data_new.remove(&delete_op.key);
236                        anyhow::ensure!(old_value == delete_op.old_value, "write-write conflict");
237                    }
238                }
239            }
240        }
241        // Commit redb transaction
242        write_txn
243            .commit()
244            .context("Failed to commit redb transaction")?;
245
246        // Update in-memory data
247        *data_locked = data_new;
248        Ok(())
249    }
250}
251
252#[cfg(all(test, not(target_family = "wasm")))]
253mod tests {
254    use fedimint_core::db::Database;
255    use fedimint_core::module::registry::ModuleDecoderRegistry;
256    use tempfile::TempDir;
257
258    use super::*;
259
260    async fn open_temp_db(temp_path: &str) -> (Database, TempDir) {
261        let temp_dir = tempfile::Builder::new()
262            .prefix(temp_path)
263            .tempdir()
264            .unwrap();
265
266        let db_path = temp_dir.path().join("test.redb");
267        let locked_db = MemAndRedb::new(&db_path).await.unwrap();
268
269        let database = Database::new(locked_db, ModuleDecoderRegistry::default());
270        (database, temp_dir)
271    }
272
273    #[tokio::test(flavor = "multi_thread")]
274    async fn test_dbtx_insert_elements() {
275        let (db, _dir) = open_temp_db("fcb-redb-test-insert-elements").await;
276        fedimint_core::db::verify_insert_elements(db).await;
277    }
278
279    #[tokio::test(flavor = "multi_thread")]
280    async fn test_dbtx_remove_nonexisting() {
281        let (db, _dir) = open_temp_db("fcb-redb-test-remove-nonexisting").await;
282        fedimint_core::db::verify_remove_nonexisting(db).await;
283    }
284
285    #[tokio::test(flavor = "multi_thread")]
286    async fn test_dbtx_remove_existing() {
287        let (db, _dir) = open_temp_db("fcb-redb-test-remove-existing").await;
288        fedimint_core::db::verify_remove_existing(db).await;
289    }
290
291    #[tokio::test(flavor = "multi_thread")]
292    async fn test_dbtx_read_own_writes() {
293        let (db, _dir) = open_temp_db("fcb-redb-test-read-own-writes").await;
294        fedimint_core::db::verify_read_own_writes(db).await;
295    }
296
297    #[tokio::test(flavor = "multi_thread")]
298    async fn test_dbtx_prevent_dirty_reads() {
299        let (db, _dir) = open_temp_db("fcb-redb-test-prevent-dirty-reads").await;
300        fedimint_core::db::verify_prevent_dirty_reads(db).await;
301    }
302
303    #[tokio::test(flavor = "multi_thread")]
304    async fn test_dbtx_find_by_range() {
305        let (db, _dir) = open_temp_db("fcb-redb-test-find-by-range").await;
306        fedimint_core::db::verify_find_by_range(db).await;
307    }
308
309    #[tokio::test(flavor = "multi_thread")]
310    async fn test_dbtx_find_by_prefix() {
311        let (db, _dir) = open_temp_db("fcb-redb-test-find-by-prefix").await;
312        fedimint_core::db::verify_find_by_prefix(db).await;
313    }
314
315    #[tokio::test(flavor = "multi_thread")]
316    async fn test_dbtx_commit() {
317        let (db, _dir) = open_temp_db("fcb-redb-test-commit").await;
318        fedimint_core::db::verify_commit(db).await;
319    }
320
321    #[tokio::test(flavor = "multi_thread")]
322    async fn test_dbtx_prevent_nonrepeatable_reads() {
323        let (db, _dir) = open_temp_db("fcb-redb-test-prevent-nonrepeatable-reads").await;
324        fedimint_core::db::verify_prevent_nonrepeatable_reads(db).await;
325    }
326
327    #[tokio::test(flavor = "multi_thread")]
328    async fn test_dbtx_phantom_entry() {
329        let (db, _dir) = open_temp_db("fcb-redb-test-phantom-entry").await;
330        fedimint_core::db::verify_phantom_entry(db).await;
331    }
332
333    #[tokio::test(flavor = "multi_thread")]
334    async fn test_dbtx_write_conflict() {
335        let (db, _dir) = open_temp_db("fcb-redb-test-write-conflict").await;
336        fedimint_core::db::verify_snapshot_isolation(db).await;
337    }
338
339    #[tokio::test(flavor = "multi_thread")]
340    async fn test_dbtx_remove_by_prefix() {
341        let (db, _dir) = open_temp_db("fcb-redb-test-remove-by-prefix").await;
342        fedimint_core::db::verify_remove_by_prefix(db).await;
343    }
344}