1use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use fedimint_server_core::dashboard_ui::ConnectionType;
20use futures::FutureExt;
21use futures::future::select_all;
22use tokio::sync::watch;
23use tracing::{Instrument, debug, info, info_span, warn};
24
25use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
26use crate::net::p2p_connection::DynP2PConnection;
27use crate::net::p2p_connector::DynP2PConnector;
28
29pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
30pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
31
32pub type P2PConnectionTypeSenders = BTreeMap<PeerId, watch::Sender<ConnectionType>>;
33pub type P2PConnectionTypeReceivers = BTreeMap<PeerId, watch::Receiver<ConnectionType>>;
34
35pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
36 let mut senders = BTreeMap::new();
37 let mut receivers = BTreeMap::new();
38
39 for peer in peers {
40 let (sender, receiver) = watch::channel(None);
41
42 senders.insert(peer, sender);
43 receivers.insert(peer, receiver);
44 }
45
46 (senders, receivers)
47}
48
49pub fn p2p_connection_type_channels(
50 peers: Vec<PeerId>,
51) -> (P2PConnectionTypeSenders, P2PConnectionTypeReceivers) {
52 let mut senders = BTreeMap::new();
53 let mut receivers = BTreeMap::new();
54
55 for peer in peers {
56 let (sender, receiver) = watch::channel(ConnectionType::Direct);
57
58 senders.insert(peer, sender);
59 receivers.insert(peer, receiver);
60 }
61
62 (senders, receivers)
63}
64
65#[derive(Clone)]
66pub struct ReconnectP2PConnections<M> {
67 connections: BTreeMap<PeerId, P2PConnection<M>>,
68}
69
70impl<M: Send + 'static> ReconnectP2PConnections<M> {
71 pub fn new(
72 identity: PeerId,
73 connector: DynP2PConnector<M>,
74 task_group: &TaskGroup,
75 status_senders: P2PStatusSenders,
76 connection_type_senders: P2PConnectionTypeSenders,
77 ) -> Self {
78 let mut connection_senders = BTreeMap::new();
79 let mut connections = BTreeMap::new();
80
81 for peer_id in connector.peers() {
82 assert_ne!(peer_id, identity);
83
84 let (connection_sender, connection_receiver) = bounded(4);
85
86 let connection = P2PConnection::new(
87 identity,
88 peer_id,
89 connector.clone(),
90 connection_receiver,
91 status_senders
92 .get(&peer_id)
93 .expect("No p2p status sender for peer")
94 .clone(),
95 connection_type_senders
96 .get(&peer_id)
97 .expect("No p2p connection type sender for peer")
98 .clone(),
99 task_group,
100 );
101
102 connection_senders.insert(peer_id, connection_sender);
103 connections.insert(peer_id, connection);
104 }
105
106 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
107 info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
108
109 loop {
110 match connector.accept().await {
111 Ok((peer, connection)) => {
112 if connection_senders
113 .get_mut(&peer)
114 .expect("Authenticating connectors dont return unknown peers")
115 .send(connection)
116 .await
117 .is_err()
118 {
119 break;
120 }
121 },
122 Err(err) => {
123 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
124 }
125 }
126 }
127
128 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
129 });
130
131 ReconnectP2PConnections { connections }
132 }
133}
134
135#[async_trait]
136impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
137 fn send(&self, recipient: Recipient, message: M) {
138 match recipient {
139 Recipient::Everyone => {
140 for connection in self.connections.values() {
141 connection.try_send(message.clone());
142 }
143 }
144 Recipient::Peer(peer) => match self.connections.get(&peer) {
145 Some(connection) => {
146 connection.try_send(message);
147 }
148 _ => {
149 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
150 }
151 },
152 }
153 }
154
155 async fn receive(&self) -> Option<(PeerId, M)> {
156 select_all(self.connections.iter().map(|(&peer, connection)| {
157 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
158 }))
159 .await
160 .0
161 }
162
163 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
164 self.connections
165 .get(&peer)
166 .expect("No connection found for peer")
167 .receive()
168 .await
169 }
170}
171
172#[derive(Clone)]
173struct P2PConnection<M> {
174 outgoing_sender: Sender<M>,
175 incoming_receiver: Receiver<M>,
176}
177
178impl<M: Send + 'static> P2PConnection<M> {
179 #[allow(clippy::too_many_arguments)]
180 fn new(
181 our_id: PeerId,
182 peer_id: PeerId,
183 connector: DynP2PConnector<M>,
184 incoming_connections: Receiver<DynP2PConnection<M>>,
185 status_sender: watch::Sender<Option<Duration>>,
186 connection_type_sender: watch::Sender<ConnectionType>,
187 task_group: &TaskGroup,
188 ) -> P2PConnection<M> {
189 let (outgoing_sender, outgoing_receiver) = bounded(5);
196 let (incoming_sender, incoming_receiver) = bounded(5);
197
198 let connector_clone = connector.clone();
199 let connection_type_sender_clone = connection_type_sender.clone();
200
201 task_group.spawn_cancellable(
203 format!("connection-type-poller-{peer_id}"),
204 async move {
205 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
206 loop {
207 interval.tick().await;
208 let connection_type = connector_clone.connection_type(peer_id).await;
209 let _ = connection_type_sender_clone.send_replace(connection_type);
210 }
211 }
212 .instrument(info_span!("connection-type-poller", ?peer_id)),
213 );
214
215 task_group.spawn_cancellable(
216 format!("io-state-machine-{peer_id}"),
217 async move {
218 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
219
220 let mut state_machine = P2PConnectionStateMachine {
221 common: P2PConnectionSMCommon {
222 incoming_sender,
223 outgoing_receiver,
224 our_id_str: our_id.to_string(),
225 our_id,
226 peer_id_str: peer_id.to_string(),
227 peer_id,
228 connector,
229 incoming_connections,
230 status_sender,
231 },
232 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
233 };
234
235 while let Some(sm) = state_machine.state_transition().await {
236 state_machine = sm;
237 }
238
239 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
240 }
241 .instrument(info_span!("io-state-machine", ?peer_id)),
242 );
243
244 P2PConnection {
245 outgoing_sender,
246 incoming_receiver,
247 }
248 }
249
250 fn try_send(&self, message: M) {
251 if self.outgoing_sender.try_send(message).is_err() {
252 debug!(target: LOG_NET_PEER, "Outgoing message channel is full");
253 }
254 }
255
256 async fn receive(&self) -> Option<M> {
257 self.incoming_receiver.recv().await.ok()
258 }
259}
260
261struct P2PConnectionStateMachine<M> {
262 state: P2PConnectionSMState<M>,
263 common: P2PConnectionSMCommon<M>,
264}
265
266struct P2PConnectionSMCommon<M> {
267 incoming_sender: async_channel::Sender<M>,
268 outgoing_receiver: async_channel::Receiver<M>,
269 our_id: PeerId,
270 our_id_str: String,
271 peer_id: PeerId,
272 peer_id_str: String,
273 connector: DynP2PConnector<M>,
274 incoming_connections: Receiver<DynP2PConnection<M>>,
275 status_sender: watch::Sender<Option<Duration>>,
276}
277
278enum P2PConnectionSMState<M> {
279 Disconnected(FibonacciBackoff),
280 Connected(DynP2PConnection<M>),
281}
282
283impl<M: Send + 'static> P2PConnectionStateMachine<M> {
284 async fn state_transition(mut self) -> Option<Self> {
285 match self.state {
286 P2PConnectionSMState::Disconnected(backoff) => {
287 self.common.status_sender.send_replace(None);
288
289 self.common.transition_disconnected(backoff).await
290 }
291 P2PConnectionSMState::Connected(connection) => {
292 self.common
293 .status_sender
294 .send_replace(Some(connection.rtt()));
295
296 self.common.transition_connected(connection).await
297 }
298 }
299 .map(|state| P2PConnectionStateMachine {
300 common: self.common,
301 state,
302 })
303 }
304}
305
306impl<M: Send + 'static> P2PConnectionSMCommon<M> {
307 async fn transition_connected(
308 &mut self,
309 mut connection: DynP2PConnection<M>,
310 ) -> Option<P2PConnectionSMState<M>> {
311 tokio::select! {
312 message = self.outgoing_receiver.recv() => {
313 Some(self.send_message(connection, message.ok()?).await)
314 },
315 connection = self.incoming_connections.recv() => {
316 info!(target: LOG_NET_PEER, "Connected to peer");
317
318 Some(P2PConnectionSMState::Connected(connection.ok()?))
319 },
320 message = connection.receive() => {
321 let mut message = match message {
322 Ok(message) => message,
323 Err(e) => return Some(self.disconnect(e)),
324 };
325
326 match message.read_to_end().await {
327 Ok(message) => {
328 PEER_MESSAGES_COUNT
329 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
330 .inc();
331
332 if self.incoming_sender.try_send(message).is_err() {
333 debug!(target: LOG_NET_PEER, "Incoming message channel is full");
334 }
335 },
336 Err(e) => return Some(self.disconnect(e)),
337 }
338
339 Some(P2PConnectionSMState::Connected(connection))
340 },
341 }
342 }
343
344 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
345 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
346
347 PEER_DISCONNECT_COUNT
348 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
349 .inc();
350
351 P2PConnectionSMState::Disconnected(api_networking_backoff())
352 }
353
354 async fn send_message(
355 &mut self,
356 mut connection: DynP2PConnection<M>,
357 peer_message: M,
358 ) -> P2PConnectionSMState<M> {
359 PEER_MESSAGES_COUNT
360 .with_label_values(&[
361 self.our_id_str.as_str(),
362 self.peer_id_str.as_str(),
363 "outgoing",
364 ])
365 .inc();
366
367 if let Err(e) = connection.send(peer_message).await {
368 return self.disconnect(e);
369 }
370
371 P2PConnectionSMState::Connected(connection)
372 }
373
374 async fn transition_disconnected(
375 &mut self,
376 mut backoff: FibonacciBackoff,
377 ) -> Option<P2PConnectionSMState<M>> {
378 tokio::select! {
379 connection = self.incoming_connections.recv() => {
380 PEER_CONNECT_COUNT
381 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
382 .inc();
383
384 info!(target: LOG_NET_PEER, "Connected to peer");
385
386 Some(P2PConnectionSMState::Connected(connection.ok()?))
387 },
388 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
390
391
392 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
393
394 match self.connector.connect(self.peer_id).await {
395 Ok(connection) => {
396 PEER_CONNECT_COUNT
397 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
398 .inc();
399
400 info!(target: LOG_NET_PEER, "Connected to peer");
401
402 return Some(P2PConnectionSMState::Connected(connection));
403 }
404 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
405 }
406
407 Some(P2PConnectionSMState::Disconnected(backoff))
408 },
409 }
410 }
411}