fedimint_server/consensus/aleph_bft/
network.rs1use async_channel::Sender;
2use bitcoin::hashes::{Hash, sha256};
3use fedimint_core::PeerId;
4use fedimint_core::config::P2PMessage;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::Encodable;
7use fedimint_core::module::SerdeModuleEncoding;
8use fedimint_core::module::registry::ModuleRegistry;
9use fedimint_core::net::peers::{DynP2PConnections, Recipient};
10use fedimint_core::secp256k1::schnorr;
11use fedimint_core::session_outcome::SignedSessionOutcome;
12use fedimint_core::util::FmtCompact as _;
13use fedimint_logging::LOG_CONSENSUS;
14use parity_scale_codec::{Decode, Encode, IoReader};
15use tracing::error;
16
17use super::super::db::SignedSessionOutcomeKey;
18use super::data_provider::UnitData;
19use super::keychain::Keychain;
20
21#[derive(Debug, Clone, Eq, PartialEq)]
22pub struct Hasher;
23
24impl aleph_bft::Hasher for Hasher {
25 type Hash = [u8; 32];
26
27 fn hash(input: &[u8]) -> Self::Hash {
28 input.consensus_hash::<sha256::Hash>().to_byte_array()
29 }
30}
31
32pub type NetworkData = aleph_bft::NetworkData<
33 Hasher,
34 UnitData,
35 <Keychain as aleph_bft::Keychain>::Signature,
36 <Keychain as aleph_bft::MultiKeychain>::PartialMultisignature,
37>;
38
39pub struct Network {
40 connections: DynP2PConnections<P2PMessage>,
41 signed_outcomes_sender: Sender<(PeerId, SignedSessionOutcome)>,
42 signatures_sender: Sender<(PeerId, schnorr::Signature)>,
43 db: Database,
44}
45
46impl Network {
47 pub fn new(
48 connections: DynP2PConnections<P2PMessage>,
49 signed_outcomes_sender: Sender<(PeerId, SignedSessionOutcome)>,
50 signatures_sender: Sender<(PeerId, schnorr::Signature)>,
51 db: Database,
52 ) -> Self {
53 Self {
54 connections,
55 signed_outcomes_sender,
56 signatures_sender,
57 db,
58 }
59 }
60}
61
62#[async_trait::async_trait]
63impl aleph_bft::Network<NetworkData> for Network {
64 fn send(&self, network_data: NetworkData, recipient: aleph_bft::Recipient) {
65 let recipient = match recipient {
67 aleph_bft::Recipient::Node(node_index) => {
68 Recipient::Peer(super::to_peer_id(node_index))
69 }
70 aleph_bft::Recipient::Everyone => Recipient::Everyone,
71 };
72
73 self.connections
74 .send(recipient, P2PMessage::Aleph(network_data.encode()));
75 }
76
77 async fn next_event(&mut self) -> Option<NetworkData> {
78 loop {
79 let (peer_id, message) = self.connections.receive().await?;
80
81 match message {
82 P2PMessage::Aleph(bytes) => {
83 match NetworkData::decode(&mut IoReader(bytes.as_slice())) {
84 Ok(network_data) => {
85 if network_data.included_data().iter().all(UnitData::is_valid) {
88 return Some(network_data);
89 }
90
91 error!(
92 target: LOG_CONSENSUS,
93 %peer_id,
94 "Received invalid unit data"
95 );
96 }
97 Err(err) => {
98 error!(
99 target: LOG_CONSENSUS,
100 %peer_id,
101 err = %err.fmt_compact(),
102 "Failed to decode Aleph BFT network data"
103 );
104 }
105 }
106 }
107 P2PMessage::SessionSignature(signature) => {
108 self.signatures_sender.try_send((peer_id, signature)).ok();
109 }
110 P2PMessage::SessionIndex(their_session) => {
111 if let Some(outcome) = self
112 .db
113 .begin_transaction_nc()
114 .await
115 .get_value(&SignedSessionOutcomeKey(their_session))
116 .await
117 {
118 self.connections.send(
119 Recipient::Peer(peer_id),
120 P2PMessage::SignedSessionOutcome(SerdeModuleEncoding::from(&outcome)),
121 );
122 }
123 }
124 P2PMessage::SignedSessionOutcome(encoded_outcome) => {
125 match encoded_outcome.try_into_inner(&ModuleRegistry::default()) {
126 Ok(outcome) => {
127 self.signed_outcomes_sender
128 .try_send((peer_id, outcome))
129 .ok();
130 }
131 Err(err) => {
132 error!(
133 target: LOG_CONSENSUS,
134 %peer_id,
135 err = %err.fmt_compact(),
136 "Failed to decode SignedSessionOutcome"
137 );
138 }
139 }
140 }
141 message => {
142 error!(
143 target: LOG_CONSENSUS,
144 %peer_id,
145 ?message,
146 "Received unexpected p2p message variant"
147 );
148 }
149 }
150 }
151 }
152}