1use std::fmt::Debug;
4use std::ops::Range;
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use fedimint_core::db::{
9 DatabaseError, DatabaseResult, IDatabaseTransactionOps, IDatabaseTransactionOpsCore,
10 IRawDatabase, IRawDatabaseTransaction, PrefixStream,
11};
12use fedimint_core::{apply, async_trait_maybe_send};
13use futures::stream;
14use imbl::OrdMap;
15use redb::{Database, ReadableTable, TableDefinition};
16
17const KV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("fedimint_kv");
18
19#[derive(Debug, Default)]
20pub struct DatabaseInsertOperation {
21 pub key: Vec<u8>,
22 pub value: Vec<u8>,
23 pub old_value: Option<Vec<u8>>,
24}
25
26#[derive(Debug, Default)]
27pub struct DatabaseDeleteOperation {
28 pub key: Vec<u8>,
29 pub old_value: Option<Vec<u8>>,
30}
31
32#[derive(Debug)]
33pub enum DatabaseOperation {
34 Insert(DatabaseInsertOperation),
35 Delete(DatabaseDeleteOperation),
36}
37
38#[derive(Clone)]
39pub struct MemAndRedb {
40 data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>,
41 db: Arc<Database>,
42}
43
44impl Debug for MemAndRedb {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("MemDatabase").finish_non_exhaustive()
47 }
48}
49
50#[derive(Debug)]
51pub struct MemAndRedbTransaction<'a> {
52 operations: Vec<DatabaseOperation>,
53 tx_data: OrdMap<Vec<u8>, Vec<u8>>,
54 db: &'a MemAndRedb,
55}
56
57#[cfg(not(target_family = "wasm"))]
58mod native;
59
60#[cfg(target_family = "wasm")]
61mod wasm;
62
63impl MemAndRedb {
64 fn new_from_redb(db: Database) -> DatabaseResult<Self> {
65 let db = Arc::new(db);
66 let mut data = OrdMap::new();
67
68 let read_txn = db.begin_read().map_err(DatabaseError::backend)?;
70 if let Ok(table) = read_txn.open_table(KV_TABLE) {
71 for entry in table.iter().map_err(DatabaseError::backend)? {
72 let (key, value) = entry.map_err(DatabaseError::backend)?;
73 data.insert(key.value().to_vec(), value.value().to_vec());
74 }
75 }
76 Ok(Self {
79 data: Arc::new(Mutex::new(data)),
80 db,
81 })
82 }
83}
84
85#[apply(async_trait_maybe_send!)]
86impl IRawDatabase for MemAndRedb {
87 type Transaction<'a> = MemAndRedbTransaction<'a>;
88
89 async fn begin_transaction<'a>(&'a self) -> MemAndRedbTransaction<'a> {
90 MemAndRedbTransaction {
91 operations: Vec::new(),
92 tx_data: {
93 let data_lock = self.data.lock().expect("poison");
94 data_lock.clone()
95 },
96 db: self,
97 }
98 }
99
100 fn checkpoint(&self, _: &Path) -> DatabaseResult<()> {
101 unimplemented!()
102 }
103}
104
105#[apply(async_trait_maybe_send!)]
106impl<'a> IDatabaseTransactionOpsCore for MemAndRedbTransaction<'a> {
107 async fn raw_insert_bytes(
108 &mut self,
109 key: &[u8],
110 value: &[u8],
111 ) -> DatabaseResult<Option<Vec<u8>>> {
112 let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await;
113 let old_value = self.tx_data.insert(key.to_vec(), value.to_vec());
115 self.operations
116 .push(DatabaseOperation::Insert(DatabaseInsertOperation {
117 key: key.to_vec(),
118 value: value.to_vec(),
119 old_value,
120 }));
121 val
122 }
123
124 async fn raw_get_bytes(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
125 Ok(self.tx_data.get(key).cloned())
126 }
127
128 async fn raw_remove_entry(&mut self, key: &[u8]) -> DatabaseResult<Option<Vec<u8>>> {
129 let old_value = self.tx_data.remove(&key.to_vec());
131 self.operations
132 .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
133 key: key.to_vec(),
134 old_value: old_value.clone(),
135 }));
136 Ok(old_value)
137 }
138
139 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> DatabaseResult<PrefixStream<'_>> {
140 let data = self
141 .tx_data
142 .range::<_, Vec<u8>>(Range {
143 start: range.start.to_vec(),
144 end: range.end.to_vec(),
145 })
146 .map(|(key, value)| (key.clone(), value.clone()))
147 .collect::<Vec<_>>();
148
149 Ok(Box::pin(stream::iter(data)))
150 }
151
152 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<PrefixStream<'_>> {
153 let data = self
154 .tx_data
155 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
156 .take_while(|(key, _)| key.starts_with(key_prefix))
157 .map(|(key, value)| (key.clone(), value.clone()))
158 .collect::<Vec<_>>();
159
160 Ok(Box::pin(stream::iter(data)))
161 }
162
163 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> DatabaseResult<()> {
164 let keys = self
165 .tx_data
166 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
167 .take_while(|(key, _)| key.starts_with(key_prefix))
168 .map(|(key, _)| key.clone())
169 .collect::<Vec<_>>();
170 for key in keys.iter() {
171 let old_value = self.tx_data.remove(&key.to_vec());
172 self.operations
173 .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
174 key: key.to_vec(),
175 old_value,
176 }));
177 }
178 Ok(())
179 }
180
181 async fn raw_find_by_prefix_sorted_descending(
182 &mut self,
183 key_prefix: &[u8],
184 ) -> DatabaseResult<PrefixStream<'_>> {
185 let mut data = self
186 .tx_data
187 .range::<_, Vec<u8>>((key_prefix.to_vec())..)
188 .take_while(|(key, _)| key.starts_with(key_prefix))
189 .map(|(key, value)| (key.clone(), value.clone()))
190 .collect::<Vec<_>>();
191 data.sort_by(|a, b| a.cmp(b).reverse());
192
193 Ok(Box::pin(stream::iter(data)))
194 }
195}
196
197impl<'a> IDatabaseTransactionOps for MemAndRedbTransaction<'a> {}
198
199#[apply(async_trait_maybe_send!)]
202impl<'a> IRawDatabaseTransaction for MemAndRedbTransaction<'a> {
203 async fn commit_tx(self) -> DatabaseResult<()> {
204 let mut data_locked = self.db.data.lock().expect("poison");
205 let write_txn = self.db.db.begin_write().map_err(DatabaseError::backend)?;
206 let operations = self.operations;
207 let mut data_new = data_locked.clone();
208 {
209 let mut table = write_txn
210 .open_table(KV_TABLE)
211 .map_err(DatabaseError::backend)?;
212
213 for op in operations {
215 match op {
216 DatabaseOperation::Insert(insert_op) => {
217 table
218 .insert(&insert_op.key[..], &insert_op.value[..])
219 .map_err(DatabaseError::backend)?;
220 let old_value = data_new.insert(insert_op.key, insert_op.value);
221 if old_value != insert_op.old_value {
222 return Err(DatabaseError::WriteConflict);
223 }
224 }
225 DatabaseOperation::Delete(delete_op) => {
226 table
227 .remove(&delete_op.key[..])
228 .map_err(DatabaseError::backend)?;
229 let old_value = data_new.remove(&delete_op.key);
230 if old_value != delete_op.old_value {
231 return Err(DatabaseError::WriteConflict);
232 }
233 }
234 }
235 }
236 }
237 write_txn.commit().map_err(DatabaseError::backend)?;
239
240 *data_locked = data_new;
242 Ok(())
243 }
244}
245
246#[cfg(all(test, not(target_family = "wasm")))]
247mod tests {
248 use fedimint_core::db::Database;
249 use fedimint_core::module::registry::ModuleDecoderRegistry;
250 use tempfile::TempDir;
251
252 use super::*;
253
254 async fn open_temp_db(temp_path: &str) -> (Database, TempDir) {
255 let temp_dir = tempfile::Builder::new()
256 .prefix(temp_path)
257 .tempdir()
258 .unwrap();
259
260 let db_path = temp_dir.path().join("test.redb");
261 let locked_db = MemAndRedb::new(&db_path).await.unwrap();
262
263 let database = Database::new(locked_db, ModuleDecoderRegistry::default());
264 (database, temp_dir)
265 }
266
267 #[tokio::test(flavor = "multi_thread")]
268 async fn test_dbtx_insert_elements() {
269 let (db, _dir) = open_temp_db("fcb-redb-test-insert-elements").await;
270 fedimint_core::db::verify_insert_elements(db).await;
271 }
272
273 #[tokio::test(flavor = "multi_thread")]
274 async fn test_dbtx_remove_nonexisting() {
275 let (db, _dir) = open_temp_db("fcb-redb-test-remove-nonexisting").await;
276 fedimint_core::db::verify_remove_nonexisting(db).await;
277 }
278
279 #[tokio::test(flavor = "multi_thread")]
280 async fn test_dbtx_remove_existing() {
281 let (db, _dir) = open_temp_db("fcb-redb-test-remove-existing").await;
282 fedimint_core::db::verify_remove_existing(db).await;
283 }
284
285 #[tokio::test(flavor = "multi_thread")]
286 async fn test_dbtx_read_own_writes() {
287 let (db, _dir) = open_temp_db("fcb-redb-test-read-own-writes").await;
288 fedimint_core::db::verify_read_own_writes(db).await;
289 }
290
291 #[tokio::test(flavor = "multi_thread")]
292 async fn test_dbtx_prevent_dirty_reads() {
293 let (db, _dir) = open_temp_db("fcb-redb-test-prevent-dirty-reads").await;
294 fedimint_core::db::verify_prevent_dirty_reads(db).await;
295 }
296
297 #[tokio::test(flavor = "multi_thread")]
298 async fn test_dbtx_find_by_range() {
299 let (db, _dir) = open_temp_db("fcb-redb-test-find-by-range").await;
300 fedimint_core::db::verify_find_by_range(db).await;
301 }
302
303 #[tokio::test(flavor = "multi_thread")]
304 async fn test_dbtx_find_by_prefix() {
305 let (db, _dir) = open_temp_db("fcb-redb-test-find-by-prefix").await;
306 fedimint_core::db::verify_find_by_prefix(db).await;
307 }
308
309 #[tokio::test(flavor = "multi_thread")]
310 async fn test_dbtx_commit() {
311 let (db, _dir) = open_temp_db("fcb-redb-test-commit").await;
312 fedimint_core::db::verify_commit(db).await;
313 }
314
315 #[tokio::test(flavor = "multi_thread")]
316 async fn test_dbtx_prevent_nonrepeatable_reads() {
317 let (db, _dir) = open_temp_db("fcb-redb-test-prevent-nonrepeatable-reads").await;
318 fedimint_core::db::verify_prevent_nonrepeatable_reads(db).await;
319 }
320
321 #[tokio::test(flavor = "multi_thread")]
322 async fn test_dbtx_phantom_entry() {
323 let (db, _dir) = open_temp_db("fcb-redb-test-phantom-entry").await;
324 fedimint_core::db::verify_phantom_entry(db).await;
325 }
326
327 #[tokio::test(flavor = "multi_thread")]
328 async fn test_dbtx_write_conflict() {
329 let (db, _dir) = open_temp_db("fcb-redb-test-write-conflict").await;
330 fedimint_core::db::verify_snapshot_isolation(db).await;
331 }
332
333 #[tokio::test(flavor = "multi_thread")]
334 async fn test_dbtx_remove_by_prefix() {
335 let (db, _dir) = open_temp_db("fcb-redb-test-remove-by-prefix").await;
336 fedimint_core::db::verify_remove_by_prefix(db).await;
337 }
338}