fedimint_server/consensus/aleph_bft/
backup.rs

1use async_trait::async_trait;
2use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
3use futures::StreamExt as _;
4use tracing::info;
5
6use crate::LOG_CONSENSUS;
7use crate::consensus::db::{AlephUnitsKey, AlephUnitsPrefix};
8
9pub struct BackupReader {
10    db: Database,
11}
12
13impl BackupReader {
14    pub fn new(db: Database) -> Self {
15        Self { db }
16    }
17}
18
19#[async_trait]
20impl aleph_bft::BackupReader for BackupReader {
21    async fn read(&mut self) -> std::io::Result<Vec<u8>> {
22        let mut dbtx = self.db.begin_transaction_nc().await;
23
24        let units = dbtx
25            .find_by_prefix(&AlephUnitsPrefix)
26            .await
27            .map(|entry| entry.1)
28            .collect::<Vec<Vec<u8>>>()
29            .await;
30
31        if !units.is_empty() {
32            info!(target: LOG_CONSENSUS, units_len = %units.len(), "Recovering from an in-session-shutdown");
33        }
34
35        Ok(units.into_iter().flatten().collect())
36    }
37}
38
39pub struct BackupWriter {
40    db: Database,
41    units_index: u64,
42}
43
44impl BackupWriter {
45    pub async fn new(db: Database) -> Self {
46        let units_index = db
47            .begin_transaction_nc()
48            .await
49            .find_by_prefix_sorted_descending(&AlephUnitsPrefix)
50            .await
51            .next()
52            .await
53            .map_or(0, |entry| (entry.0.0) + 1);
54
55        Self { db, units_index }
56    }
57}
58
59#[async_trait]
60impl aleph_bft::BackupWriter for BackupWriter {
61    async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
62        let mut dbtx = self.db.begin_transaction().await;
63
64        dbtx.insert_new_entry(&AlephUnitsKey(self.units_index), &data.to_owned())
65            .await;
66
67        self.units_index += 1;
68
69        dbtx.commit_tx_result()
70            .await
71            .expect("This is the only place where we write to this key");
72
73        Ok(())
74    }
75}