1use std::fmt::Debug;
4use std::ops::Range;
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use anyhow::{Context as _, Result};
9use fedimint_core::db::{
10 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
11 PrefixStream,
12};
13use fedimint_core::{apply, async_trait_maybe_send};
14use futures::stream;
15use imbl::OrdMap;
16use redb::{Database, ReadableTable, TableDefinition};
17
18const KV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("fedimint_kv");
19
20#[derive(Debug, Default)]
21pub struct DatabaseInsertOperation {
22 pub key: Vec<u8>,
23 pub value: Vec<u8>,
24 pub old_value: Option<Vec<u8>>,
25}
26
27#[derive(Debug, Default)]
28pub struct DatabaseDeleteOperation {
29 pub key: Vec<u8>,
30 pub old_value: Option<Vec<u8>>,
31}
32
33#[derive(Debug)]
34pub enum DatabaseOperation {
35 Insert(DatabaseInsertOperation),
36 Delete(DatabaseDeleteOperation),
37}
38
39#[derive(Clone)]
40pub struct MemAndRedb {
41 data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>,
42 db: Arc<Database>,
43}
44
45impl Debug for MemAndRedb {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("MemDatabase").finish_non_exhaustive()
48 }
49}
50
51#[derive(Debug)]
52pub struct MemAndRedbTransaction<'a> {
53 operations: Vec<DatabaseOperation>,
54 tx_data: OrdMap<Vec<u8>, Vec<u8>>,
55 db: &'a MemAndRedb,
56}
57
58#[cfg(not(target_family = "wasm"))]
59mod native;
60
61#[cfg(target_family = "wasm")]
62mod wasm;
63
64impl MemAndRedb {
65 fn new_from_redb(db: Database) -> Result<Self> {
66 let db = Arc::new(db);
67 let mut data = OrdMap::new();
68
69 let read_txn = db
71 .begin_read()
72 .context("Failed to begin read transaction")?;
73 if let Ok(table) = read_txn.open_table(KV_TABLE) {
74 for entry in table.iter()? {
75 let (key, value) = entry?;
76 data.insert(key.value().to_vec(), value.value().to_vec());
77 }
78 }
79 Ok(Self {
82 data: Arc::new(Mutex::new(data)),
83 db,
84 })
85 }
86}
87
88#[apply(async_trait_maybe_send!)]
89impl IRawDatabase for MemAndRedb {
90 type Transaction<'a> = MemAndRedbTransaction<'a>;
91
92 async fn begin_transaction<'a>(&'a self) -> MemAndRedbTransaction<'a> {
93 MemAndRedbTransaction {
94 operations: Vec::new(),
95 tx_data: {
96 let data_lock = self.data.lock().expect("poison");
97 data_lock.clone()
98 },
99 db: self,
100 }
101 }
102
103 fn checkpoint(&self, _: &Path) -> Result<()> {
104 unimplemented!()
105 }
106}
107
108#[apply(async_trait_maybe_send!)]
109impl<'a> IDatabaseTransactionOpsCore for MemAndRedbTransaction<'a> {
110 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
111 let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await;
112 let old_value = self.tx_data.insert(key.to_vec(), value.to_vec());
114 self.operations
115 .push(DatabaseOperation::Insert(DatabaseInsertOperation {
116 key: key.to_vec(),
117 value: value.to_vec(),
118 old_value,
119 }));
120 val
121 }
122
123 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
124 Ok(self.tx_data.get(key).cloned())
125 }
126
127 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
128 let old_value = self.tx_data.remove(&key.to_vec());
130 self.operations
131 .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
132 key: key.to_vec(),
133 old_value: old_value.clone(),
134 }));
135 Ok(old_value)
136 }
137
138 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
139 let data = self
140 .tx_data
141 .range::<_, Vec<u8>>(Range {
142 start: range.start.to_vec(),
143 end: range.end.to_vec(),
144 })
145 .map(|(key, value)| (key.clone(), value.clone()))
146 .collect::<Vec<_>>();
147
148 Ok(Box::pin(stream::iter(data)))
149 }
150
151 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
152 let data = self
153 .tx_data
154 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
155 .take_while(|(key, _)| key.starts_with(key_prefix))
156 .map(|(key, value)| (key.clone(), value.clone()))
157 .collect::<Vec<_>>();
158
159 Ok(Box::pin(stream::iter(data)))
160 }
161
162 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
163 let keys = self
164 .tx_data
165 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
166 .take_while(|(key, _)| key.starts_with(key_prefix))
167 .map(|(key, _)| key.clone())
168 .collect::<Vec<_>>();
169 for key in keys.iter() {
170 let old_value = self.tx_data.remove(&key.to_vec());
171 self.operations
172 .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
173 key: key.to_vec(),
174 old_value,
175 }));
176 }
177 Ok(())
178 }
179
180 async fn raw_find_by_prefix_sorted_descending(
181 &mut self,
182 key_prefix: &[u8],
183 ) -> Result<PrefixStream<'_>> {
184 let mut data = self
185 .tx_data
186 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
187 .take_while(|(key, _)| key.starts_with(key_prefix))
188 .map(|(key, value)| (key.clone(), value.clone()))
189 .collect::<Vec<_>>();
190 data.sort_by(|a, b| a.cmp(b).reverse());
191
192 Ok(Box::pin(stream::iter(data)))
193 }
194}
195
196#[apply(async_trait_maybe_send!)]
197impl<'a> IDatabaseTransactionOps for MemAndRedbTransaction<'a> {
198 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
199 unimplemented!()
200 }
201
202 async fn set_tx_savepoint(&mut self) -> Result<()> {
203 unimplemented!()
204 }
205}
206
207#[apply(async_trait_maybe_send!)]
210impl<'a> IRawDatabaseTransaction for MemAndRedbTransaction<'a> {
211 async fn commit_tx(self) -> Result<()> {
212 let mut data_locked = self.db.data.lock().expect("poison");
213 let write_txn = self.db.db.begin_write()?;
214 let operations = self.operations;
215 let mut data_new = data_locked.clone();
216 {
217 let mut table = write_txn
218 .open_table(KV_TABLE)
219 .context("Failed to open redb table")?;
220
221 for op in operations {
223 match op {
224 DatabaseOperation::Insert(insert_op) => {
225 table
226 .insert(&insert_op.key[..], &insert_op.value[..])
227 .context("Failed to insert into redb")?;
228 let old_value = data_new.insert(insert_op.key, insert_op.value);
229 anyhow::ensure!(old_value == insert_op.old_value, "write-write conflict");
230 }
231 DatabaseOperation::Delete(delete_op) => {
232 table
233 .remove(&delete_op.key[..])
234 .context("Failed to delete from redb")?;
235 let old_value = data_new.remove(&delete_op.key);
236 anyhow::ensure!(old_value == delete_op.old_value, "write-write conflict");
237 }
238 }
239 }
240 }
241 write_txn
243 .commit()
244 .context("Failed to commit redb transaction")?;
245
246 *data_locked = data_new;
248 Ok(())
249 }
250}
251
252#[cfg(all(test, not(target_family = "wasm")))]
253mod tests {
254 use fedimint_core::db::Database;
255 use fedimint_core::module::registry::ModuleDecoderRegistry;
256 use tempfile::TempDir;
257
258 use super::*;
259
260 async fn open_temp_db(temp_path: &str) -> (Database, TempDir) {
261 let temp_dir = tempfile::Builder::new()
262 .prefix(temp_path)
263 .tempdir()
264 .unwrap();
265
266 let db_path = temp_dir.path().join("test.redb");
267 let locked_db = MemAndRedb::new(&db_path).await.unwrap();
268
269 let database = Database::new(locked_db, ModuleDecoderRegistry::default());
270 (database, temp_dir)
271 }
272
273 #[tokio::test(flavor = "multi_thread")]
274 async fn test_dbtx_insert_elements() {
275 let (db, _dir) = open_temp_db("fcb-redb-test-insert-elements").await;
276 fedimint_core::db::verify_insert_elements(db).await;
277 }
278
279 #[tokio::test(flavor = "multi_thread")]
280 async fn test_dbtx_remove_nonexisting() {
281 let (db, _dir) = open_temp_db("fcb-redb-test-remove-nonexisting").await;
282 fedimint_core::db::verify_remove_nonexisting(db).await;
283 }
284
285 #[tokio::test(flavor = "multi_thread")]
286 async fn test_dbtx_remove_existing() {
287 let (db, _dir) = open_temp_db("fcb-redb-test-remove-existing").await;
288 fedimint_core::db::verify_remove_existing(db).await;
289 }
290
291 #[tokio::test(flavor = "multi_thread")]
292 async fn test_dbtx_read_own_writes() {
293 let (db, _dir) = open_temp_db("fcb-redb-test-read-own-writes").await;
294 fedimint_core::db::verify_read_own_writes(db).await;
295 }
296
297 #[tokio::test(flavor = "multi_thread")]
298 async fn test_dbtx_prevent_dirty_reads() {
299 let (db, _dir) = open_temp_db("fcb-redb-test-prevent-dirty-reads").await;
300 fedimint_core::db::verify_prevent_dirty_reads(db).await;
301 }
302
303 #[tokio::test(flavor = "multi_thread")]
304 async fn test_dbtx_find_by_range() {
305 let (db, _dir) = open_temp_db("fcb-redb-test-find-by-range").await;
306 fedimint_core::db::verify_find_by_range(db).await;
307 }
308
309 #[tokio::test(flavor = "multi_thread")]
310 async fn test_dbtx_find_by_prefix() {
311 let (db, _dir) = open_temp_db("fcb-redb-test-find-by-prefix").await;
312 fedimint_core::db::verify_find_by_prefix(db).await;
313 }
314
315 #[tokio::test(flavor = "multi_thread")]
316 async fn test_dbtx_commit() {
317 let (db, _dir) = open_temp_db("fcb-redb-test-commit").await;
318 fedimint_core::db::verify_commit(db).await;
319 }
320
321 #[tokio::test(flavor = "multi_thread")]
322 async fn test_dbtx_prevent_nonrepeatable_reads() {
323 let (db, _dir) = open_temp_db("fcb-redb-test-prevent-nonrepeatable-reads").await;
324 fedimint_core::db::verify_prevent_nonrepeatable_reads(db).await;
325 }
326
327 #[tokio::test(flavor = "multi_thread")]
328 async fn test_dbtx_phantom_entry() {
329 let (db, _dir) = open_temp_db("fcb-redb-test-phantom-entry").await;
330 fedimint_core::db::verify_phantom_entry(db).await;
331 }
332
333 #[tokio::test(flavor = "multi_thread")]
334 async fn test_dbtx_write_conflict() {
335 let (db, _dir) = open_temp_db("fcb-redb-test-write-conflict").await;
336 fedimint_core::db::verify_snapshot_isolation(db).await;
337 }
338
339 #[tokio::test(flavor = "multi_thread")]
340 async fn test_dbtx_remove_by_prefix() {
341 let (db, _dir) = open_temp_db("fcb-redb-test-remove-by-prefix").await;
342 fedimint_core::db::verify_remove_by_prefix(db).await;
343 }
344}