1use std::collections::BTreeMap;
9
10use async_channel::{Receiver, Sender, bounded};
11use async_trait::async_trait;
12use fedimint_core::PeerId;
13use fedimint_core::net::peers::{IP2PConnections, Recipient};
14use fedimint_core::task::{TaskGroup, sleep};
15use fedimint_core::util::FmtCompactAnyhow;
16use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
17use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
18use fedimint_server_core::dashboard_ui::P2PConnectionStatus;
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, debug, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<P2PConnectionStatus>>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<P2PConnectionStatus>>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32 let mut senders = BTreeMap::new();
33 let mut receivers = BTreeMap::new();
34
35 for peer in peers {
36 let (sender, receiver) = watch::channel(None);
37
38 senders.insert(peer, sender);
39 receivers.insert(peer, receiver);
40 }
41
42 (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47 connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51 pub fn new(
52 identity: PeerId,
53 connector: DynP2PConnector<M>,
54 task_group: &TaskGroup,
55 status_senders: P2PStatusSenders,
56 ) -> Self {
57 let mut connection_senders = BTreeMap::new();
58 let mut connections = BTreeMap::new();
59
60 for peer_id in connector.peers() {
61 assert_ne!(peer_id, identity);
62
63 let (connection_sender, connection_receiver) = bounded(4);
64
65 let connection = P2PConnection::new(
66 identity,
67 peer_id,
68 connector.clone(),
69 connection_receiver,
70 status_senders
71 .get(&peer_id)
72 .expect("No p2p status sender for peer")
73 .clone(),
74 task_group,
75 );
76
77 connection_senders.insert(peer_id, connection_sender);
78 connections.insert(peer_id, connection);
79 }
80
81 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82 info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84 loop {
85 match connector.accept().await {
86 Ok((peer, connection)) => {
87 if connection_senders
88 .get_mut(&peer)
89 .expect("Authenticating connectors dont return unknown peers")
90 .send(connection)
91 .await
92 .is_err()
93 {
94 break;
95 }
96 },
97 Err(err) => {
98 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99 }
100 }
101 }
102
103 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104 });
105
106 ReconnectP2PConnections { connections }
107 }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112 fn send(&self, recipient: Recipient, message: M) {
113 match recipient {
114 Recipient::Everyone => {
115 for connection in self.connections.values() {
116 connection.try_send(message.clone());
117 }
118 }
119 Recipient::Peer(peer) => match self.connections.get(&peer) {
120 Some(connection) => {
121 connection.try_send(message);
122 }
123 _ => {
124 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125 }
126 },
127 }
128 }
129
130 async fn receive(&self) -> Option<(PeerId, M)> {
131 select_all(self.connections.iter().map(|(&peer, connection)| {
132 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
133 }))
134 .await
135 .0
136 }
137
138 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
139 self.connections
140 .get(&peer)
141 .expect("No connection found for peer")
142 .receive()
143 .await
144 }
145}
146
147#[derive(Clone)]
148struct P2PConnection<M> {
149 outgoing_sender: Sender<M>,
150 incoming_receiver: Receiver<M>,
151}
152
153impl<M: Send + 'static> P2PConnection<M> {
154 fn new(
155 our_id: PeerId,
156 peer_id: PeerId,
157 connector: DynP2PConnector<M>,
158 incoming_connections: Receiver<DynP2PConnection<M>>,
159 status_sender: watch::Sender<Option<P2PConnectionStatus>>,
160 task_group: &TaskGroup,
161 ) -> P2PConnection<M> {
162 let (outgoing_sender, outgoing_receiver) = bounded(5);
169 let (incoming_sender, incoming_receiver) = bounded(5);
170
171 task_group.spawn_cancellable(
172 format!("io-state-machine-{peer_id}"),
173 async move {
174 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
175
176 let mut state_machine = P2PConnectionStateMachine {
177 common: P2PConnectionSMCommon {
178 incoming_sender,
179 outgoing_receiver,
180 our_id_str: our_id.to_string(),
181 our_id,
182 peer_id_str: peer_id.to_string(),
183 peer_id,
184 connector,
185 incoming_connections,
186 status_sender,
187 },
188 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
189 };
190
191 while let Some(sm) = state_machine.state_transition().await {
192 state_machine = sm;
193 }
194
195 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
196 }
197 .instrument(info_span!("io-state-machine", ?peer_id)),
198 );
199
200 P2PConnection {
201 outgoing_sender,
202 incoming_receiver,
203 }
204 }
205
206 fn try_send(&self, message: M) {
207 if self.outgoing_sender.try_send(message).is_err() {
208 debug!(target: LOG_NET_PEER, "Outgoing message channel is full");
209 }
210 }
211
212 async fn receive(&self) -> Option<M> {
213 self.incoming_receiver.recv().await.ok()
214 }
215}
216
217struct P2PConnectionStateMachine<M> {
218 state: P2PConnectionSMState<M>,
219 common: P2PConnectionSMCommon<M>,
220}
221
222struct P2PConnectionSMCommon<M> {
223 incoming_sender: async_channel::Sender<M>,
224 outgoing_receiver: async_channel::Receiver<M>,
225 our_id: PeerId,
226 our_id_str: String,
227 peer_id: PeerId,
228 peer_id_str: String,
229 connector: DynP2PConnector<M>,
230 incoming_connections: Receiver<DynP2PConnection<M>>,
231 status_sender: watch::Sender<Option<P2PConnectionStatus>>,
232}
233
234enum P2PConnectionSMState<M> {
235 Disconnected(FibonacciBackoff),
236 Connected(DynP2PConnection<M>),
237}
238
239impl<M: Send + 'static> P2PConnectionStateMachine<M> {
240 async fn state_transition(mut self) -> Option<Self> {
241 match self.state {
242 P2PConnectionSMState::Disconnected(backoff) => {
243 self.common.status_sender.send_replace(None);
244
245 self.common.transition_disconnected(backoff).await
246 }
247 P2PConnectionSMState::Connected(connection) => {
248 let status = P2PConnectionStatus {
249 conn_type: self.common.connector.connection_type(self.common.peer_id),
250 rtt: connection.rtt(),
251 };
252
253 self.common.status_sender.send_replace(Some(status));
254
255 self.common.transition_connected(connection).await
256 }
257 }
258 .map(|state| P2PConnectionStateMachine {
259 common: self.common,
260 state,
261 })
262 }
263}
264
265impl<M: Send + 'static> P2PConnectionSMCommon<M> {
266 async fn transition_connected(
267 &mut self,
268 mut connection: DynP2PConnection<M>,
269 ) -> Option<P2PConnectionSMState<M>> {
270 tokio::select! {
271 message = self.outgoing_receiver.recv() => {
272 Some(self.send_message(connection, message.ok()?).await)
273 },
274 connection = self.incoming_connections.recv() => {
275 info!(target: LOG_NET_PEER, "Connected to peer");
276
277 Some(P2PConnectionSMState::Connected(connection.ok()?))
278 },
279 message = connection.receive() => {
280 let mut message = match message {
281 Ok(message) => message,
282 Err(e) => return Some(self.disconnect(e)),
283 };
284
285 match message.read_to_end().await {
286 Ok(message) => {
287 PEER_MESSAGES_COUNT
288 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
289 .inc();
290
291 if self.incoming_sender.try_send(message).is_err() {
292 debug!(target: LOG_NET_PEER, "Incoming message channel is full");
293 }
294 },
295 Err(e) => return Some(self.disconnect(e)),
296 }
297
298 Some(P2PConnectionSMState::Connected(connection))
299 },
300 }
301 }
302
303 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
304 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
305
306 PEER_DISCONNECT_COUNT
307 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
308 .inc();
309
310 P2PConnectionSMState::Disconnected(api_networking_backoff())
311 }
312
313 async fn send_message(
314 &mut self,
315 mut connection: DynP2PConnection<M>,
316 peer_message: M,
317 ) -> P2PConnectionSMState<M> {
318 PEER_MESSAGES_COUNT
319 .with_label_values(&[
320 self.our_id_str.as_str(),
321 self.peer_id_str.as_str(),
322 "outgoing",
323 ])
324 .inc();
325
326 if let Err(e) = connection.send(peer_message).await {
327 return self.disconnect(e);
328 }
329
330 P2PConnectionSMState::Connected(connection)
331 }
332
333 async fn transition_disconnected(
334 &mut self,
335 mut backoff: FibonacciBackoff,
336 ) -> Option<P2PConnectionSMState<M>> {
337 tokio::select! {
338 connection = self.incoming_connections.recv() => {
339 PEER_CONNECT_COUNT
340 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
341 .inc();
342
343 info!(target: LOG_NET_PEER, "Connected to peer");
344
345 Some(P2PConnectionSMState::Connected(connection.ok()?))
346 },
347 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
349
350
351 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
352
353 match self.connector.connect(self.peer_id).await {
354 Ok(connection) => {
355 PEER_CONNECT_COUNT
356 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
357 .inc();
358
359 info!(target: LOG_NET_PEER, "Connected to peer");
360
361 return Some(P2PConnectionSMState::Connected(connection));
362 }
363 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
364 }
365
366 Some(P2PConnectionSMState::Disconnected(backoff))
367 },
368 }
369 }
370}