fedimint_server/consensus/aleph_bft/
backup.rs1use async_trait::async_trait;
2use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
3use futures::StreamExt as _;
4use tracing::info;
5
6use crate::LOG_CONSENSUS;
7use crate::consensus::db::{AlephUnitsKey, AlephUnitsPrefix};
8
9pub struct BackupReader {
10 db: Database,
11}
12
13impl BackupReader {
14 pub fn new(db: Database) -> Self {
15 Self { db }
16 }
17}
18
19#[async_trait]
20impl aleph_bft::BackupReader for BackupReader {
21 async fn read(&mut self) -> std::io::Result<Vec<u8>> {
22 let mut dbtx = self.db.begin_transaction_nc().await;
23
24 let units = dbtx
25 .find_by_prefix(&AlephUnitsPrefix)
26 .await
27 .map(|entry| entry.1)
28 .collect::<Vec<Vec<u8>>>()
29 .await;
30
31 if !units.is_empty() {
32 info!(target: LOG_CONSENSUS, units_len = %units.len(), "Recovering from an in-session-shutdown");
33 }
34
35 Ok(units.into_iter().flatten().collect())
36 }
37}
38
39pub struct BackupWriter {
40 db: Database,
41 units_index: u64,
42}
43
44impl BackupWriter {
45 pub async fn new(db: Database) -> Self {
46 let units_index = db
47 .begin_transaction_nc()
48 .await
49 .find_by_prefix_sorted_descending(&AlephUnitsPrefix)
50 .await
51 .next()
52 .await
53 .map_or(0, |entry| (entry.0.0) + 1);
54
55 Self { db, units_index }
56 }
57}
58
59#[async_trait]
60impl aleph_bft::BackupWriter for BackupWriter {
61 async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
62 let mut dbtx = self.db.begin_transaction().await;
63
64 dbtx.insert_new_entry(&AlephUnitsKey(self.units_index), &data.to_owned())
65 .await;
66
67 self.units_index += 1;
68
69 dbtx.commit_tx_result()
70 .await
71 .expect("This is the only place where we write to this key");
72
73 Ok(())
74 }
75}