fedimint_server/consensus/aleph_bft/
network.rs

1use async_channel::Sender;
2use bitcoin::hashes::{Hash, sha256};
3use fedimint_core::PeerId;
4use fedimint_core::config::P2PMessage;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::Encodable;
7use fedimint_core::module::SerdeModuleEncoding;
8use fedimint_core::module::registry::ModuleRegistry;
9use fedimint_core::net::peers::{DynP2PConnections, Recipient};
10use fedimint_core::secp256k1::schnorr;
11use fedimint_core::session_outcome::SignedSessionOutcome;
12use fedimint_core::util::FmtCompact as _;
13use fedimint_logging::LOG_CONSENSUS;
14use parity_scale_codec::{Decode, Encode, IoReader};
15use tracing::error;
16
17use super::super::db::SignedSessionOutcomeKey;
18use super::data_provider::UnitData;
19use super::keychain::Keychain;
20
21#[derive(Debug, Clone, Eq, PartialEq)]
22pub struct Hasher;
23
24impl aleph_bft::Hasher for Hasher {
25    type Hash = [u8; 32];
26
27    fn hash(input: &[u8]) -> Self::Hash {
28        input.consensus_hash::<sha256::Hash>().to_byte_array()
29    }
30}
31
32pub type NetworkData = aleph_bft::NetworkData<
33    Hasher,
34    UnitData,
35    <Keychain as aleph_bft::Keychain>::Signature,
36    <Keychain as aleph_bft::MultiKeychain>::PartialMultisignature,
37>;
38
39pub struct Network {
40    connections: DynP2PConnections<P2PMessage>,
41    signed_outcomes_sender: Sender<(PeerId, SignedSessionOutcome)>,
42    signatures_sender: Sender<(PeerId, schnorr::Signature)>,
43    db: Database,
44}
45
46impl Network {
47    pub fn new(
48        connections: DynP2PConnections<P2PMessage>,
49        signed_outcomes_sender: Sender<(PeerId, SignedSessionOutcome)>,
50        signatures_sender: Sender<(PeerId, schnorr::Signature)>,
51        db: Database,
52    ) -> Self {
53        Self {
54            connections,
55            signed_outcomes_sender,
56            signatures_sender,
57            db,
58        }
59    }
60}
61
62#[async_trait::async_trait]
63impl aleph_bft::Network<NetworkData> for Network {
64    fn send(&self, network_data: NetworkData, recipient: aleph_bft::Recipient) {
65        // convert from aleph_bft::Recipient to session::Recipient
66        let recipient = match recipient {
67            aleph_bft::Recipient::Node(node_index) => {
68                Recipient::Peer(super::to_peer_id(node_index))
69            }
70            aleph_bft::Recipient::Everyone => Recipient::Everyone,
71        };
72
73        self.connections
74            .send(recipient, P2PMessage::Aleph(network_data.encode()));
75    }
76
77    async fn next_event(&mut self) -> Option<NetworkData> {
78        loop {
79            let (peer_id, message) = self.connections.receive().await?;
80
81            match message {
82                P2PMessage::Aleph(bytes) => {
83                    match NetworkData::decode(&mut IoReader(bytes.as_slice())) {
84                        Ok(network_data) => {
85                            // in order to bound the RAM consumption of a session we have to bound
86                            // the size of an individual unit in memory
87                            if network_data.included_data().iter().all(UnitData::is_valid) {
88                                return Some(network_data);
89                            }
90
91                            error!(
92                                target: LOG_CONSENSUS,
93                                %peer_id,
94                                "Received invalid unit data"
95                            );
96                        }
97                        Err(err) => {
98                            error!(
99                                target: LOG_CONSENSUS,
100                                %peer_id,
101                                err = %err.fmt_compact(),
102                                "Failed to decode Aleph BFT network data"
103                            );
104                        }
105                    }
106                }
107                P2PMessage::SessionSignature(signature) => {
108                    self.signatures_sender.try_send((peer_id, signature)).ok();
109                }
110                P2PMessage::SessionIndex(their_session) => {
111                    if let Some(outcome) = self
112                        .db
113                        .begin_transaction_nc()
114                        .await
115                        .get_value(&SignedSessionOutcomeKey(their_session))
116                        .await
117                    {
118                        self.connections.send(
119                            Recipient::Peer(peer_id),
120                            P2PMessage::SignedSessionOutcome(SerdeModuleEncoding::from(&outcome)),
121                        );
122                    }
123                }
124                P2PMessage::SignedSessionOutcome(encoded_outcome) => {
125                    match encoded_outcome.try_into_inner(&ModuleRegistry::default()) {
126                        Ok(outcome) => {
127                            self.signed_outcomes_sender
128                                .try_send((peer_id, outcome))
129                                .ok();
130                        }
131                        Err(err) => {
132                            error!(
133                                target: LOG_CONSENSUS,
134                                %peer_id,
135                                err = %err.fmt_compact(),
136                                "Failed to decode SignedSessionOutcome"
137                            );
138                        }
139                    }
140                }
141                message => {
142                    error!(
143                        target: LOG_CONSENSUS,
144                        %peer_id,
145                        ?message,
146                        "Received unexpected p2p message variant"
147                    );
148                }
149            }
150        }
151    }
152}