1use std::collections::BTreeMap;
9
10use async_channel::{Receiver, Sender, bounded};
11use async_trait::async_trait;
12use fedimint_api_client::api::P2PConnectionStatus;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<P2PConnectionStatus>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32 let mut senders = BTreeMap::new();
33 let mut receivers = BTreeMap::new();
34
35 for peer in peers {
36 let (sender, receiver) = watch::channel(P2PConnectionStatus::Disconnected);
37
38 senders.insert(peer, sender);
39 receivers.insert(peer, receiver);
40 }
41
42 (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47 connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51 pub fn new(
52 identity: PeerId,
53 connector: DynP2PConnector<M>,
54 task_group: &TaskGroup,
55 status_senders: P2PStatusSenders,
56 ) -> Self {
57 let mut connection_senders = BTreeMap::new();
58 let mut connections = BTreeMap::new();
59
60 for peer_id in connector.peers() {
61 assert_ne!(peer_id, identity);
62
63 let (connection_sender, connection_receiver) = bounded(4);
64
65 let connection = P2PConnection::new(
66 identity,
67 peer_id,
68 connector.clone(),
69 connection_receiver,
70 status_senders
71 .get(&peer_id)
72 .expect("No p2p status sender for peer {peer}")
73 .clone(),
74 task_group,
75 );
76
77 connection_senders.insert(peer_id, connection_sender);
78 connections.insert(peer_id, connection);
79 }
80
81 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82 info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84 loop {
85 match connector.accept().await {
86 Ok((peer, connection)) => {
87 if connection_senders
88 .get_mut(&peer)
89 .expect("Authenticating connectors dont return unknown peers")
90 .send(connection)
91 .await
92 .is_err()
93 {
94 break;
95 }
96 },
97 Err(err) => {
98 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99 }
100 }
101 }
102
103 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104 });
105
106 ReconnectP2PConnections { connections }
107 }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112 async fn send(&self, recipient: Recipient, message: M) {
113 match recipient {
114 Recipient::Everyone => {
115 for connection in self.connections.values() {
116 connection.send(message.clone()).await;
117 }
118 }
119 Recipient::Peer(peer) => match self.connections.get(&peer) {
120 Some(connection) => {
121 connection.send(message).await;
122 }
123 _ => {
124 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125 }
126 },
127 }
128 }
129
130 fn try_send(&self, recipient: Recipient, message: M) {
131 match recipient {
132 Recipient::Everyone => {
133 for connection in self.connections.values() {
134 connection.try_send(message.clone());
135 }
136 }
137 Recipient::Peer(peer) => match self.connections.get(&peer) {
138 Some(connection) => {
139 connection.try_send(message);
140 }
141 _ => {
142 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
143 }
144 },
145 }
146 }
147
148 async fn receive(&self) -> Option<(PeerId, M)> {
149 select_all(self.connections.iter().map(|(&peer, connection)| {
150 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
151 }))
152 .await
153 .0
154 }
155
156 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
157 self.connections
158 .get(&peer)
159 .expect("No connection found for peer {peer}")
160 .receive()
161 .await
162 }
163}
164
165#[derive(Clone)]
166struct P2PConnection<M> {
167 outgoing: Sender<M>,
168 incoming: Receiver<M>,
169}
170
171impl<M: Send + 'static> P2PConnection<M> {
172 #[allow(clippy::too_many_arguments)]
173 fn new(
174 our_id: PeerId,
175 peer_id: PeerId,
176 connector: DynP2PConnector<M>,
177 incoming_connections: Receiver<DynP2PConnection<M>>,
178 status_sender: watch::Sender<P2PConnectionStatus>,
179 task_group: &TaskGroup,
180 ) -> P2PConnection<M> {
181 let (outgoing_sender, outgoing_receiver) = bounded(1024);
182 let (incoming_sender, incoming_receiver) = bounded(1024);
183
184 task_group.spawn_cancellable(
185 format!("io-state-machine-{peer_id}"),
186 async move {
187 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
188
189 let mut state_machine = P2PConnectionStateMachine {
190 common: P2PConnectionSMCommon {
191 incoming_sender,
192 outgoing_receiver,
193 our_id_str: our_id.to_string(),
194 our_id,
195 peer_id_str: peer_id.to_string(),
196 peer_id,
197 connector,
198 incoming_connections,
199 status_sender,
200 },
201 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
202 };
203
204 while let Some(sm) = state_machine.state_transition().await {
205 state_machine = sm;
206 }
207
208 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
209 }
210 .instrument(info_span!("io-state-machine", ?peer_id)),
211 );
212
213 P2PConnection {
214 outgoing: outgoing_sender,
215 incoming: incoming_receiver,
216 }
217 }
218
219 async fn send(&self, message: M) {
220 self.outgoing.send(message).await.ok();
221 }
222
223 fn try_send(&self, message: M) {
224 self.outgoing.try_send(message).ok();
225 }
226
227 async fn receive(&self) -> Option<M> {
228 self.incoming.recv().await.ok()
229 }
230}
231
232struct P2PConnectionStateMachine<M> {
233 state: P2PConnectionSMState<M>,
234 common: P2PConnectionSMCommon<M>,
235}
236
237struct P2PConnectionSMCommon<M> {
238 incoming_sender: async_channel::Sender<M>,
239 outgoing_receiver: async_channel::Receiver<M>,
240 our_id: PeerId,
241 our_id_str: String,
242 peer_id: PeerId,
243 peer_id_str: String,
244 connector: DynP2PConnector<M>,
245 incoming_connections: Receiver<DynP2PConnection<M>>,
246 status_sender: watch::Sender<P2PConnectionStatus>,
247}
248
249enum P2PConnectionSMState<M> {
250 Disconnected(FibonacciBackoff),
251 Connected(DynP2PConnection<M>),
252}
253
254impl<M: Send + 'static> P2PConnectionStateMachine<M> {
255 async fn state_transition(mut self) -> Option<Self> {
256 match self.state {
257 P2PConnectionSMState::Disconnected(disconnected) => {
258 self.common
259 .status_sender
260 .send(P2PConnectionStatus::Disconnected)
261 .ok();
262
263 self.common.transition_disconnected(disconnected).await
264 }
265 P2PConnectionSMState::Connected(connected) => {
266 self.common
267 .status_sender
268 .send(P2PConnectionStatus::Connected)
269 .ok();
270
271 self.common.transition_connected(connected).await
272 }
273 }
274 .map(|state| P2PConnectionStateMachine {
275 common: self.common,
276 state,
277 })
278 }
279}
280
281impl<M: Send + 'static> P2PConnectionSMCommon<M> {
282 async fn transition_connected(
283 &mut self,
284 mut connection: DynP2PConnection<M>,
285 ) -> Option<P2PConnectionSMState<M>> {
286 tokio::select! {
287 message = self.outgoing_receiver.recv() => {
288 Some(self.send_message(connection, message.ok()?).await)
289 },
290 connection = self.incoming_connections.recv() => {
291 info!(target: LOG_NET_PEER, "Connected to peer");
292
293 Some(P2PConnectionSMState::Connected(connection.ok()?))
294 },
295 message = connection.receive() => {
296 match message {
297 Ok(message) => {
298 PEER_MESSAGES_COUNT
299 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
300 .inc();
301
302 self.incoming_sender.send(message).await.ok()?;
303 },
304 Err(e) => return Some(self.disconnect(e)),
305 };
306
307 Some(P2PConnectionSMState::Connected(connection))
308 },
309 }
310 }
311
312 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
313 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
314
315 PEER_DISCONNECT_COUNT
316 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
317 .inc();
318
319 P2PConnectionSMState::Disconnected(api_networking_backoff())
320 }
321
322 async fn send_message(
323 &mut self,
324 mut connection: DynP2PConnection<M>,
325 peer_message: M,
326 ) -> P2PConnectionSMState<M> {
327 PEER_MESSAGES_COUNT
328 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
329 .inc();
330
331 if let Err(e) = connection.send(peer_message).await {
332 return self.disconnect(e);
333 }
334
335 P2PConnectionSMState::Connected(connection)
336 }
337
338 async fn transition_disconnected(
339 &mut self,
340 mut backoff: FibonacciBackoff,
341 ) -> Option<P2PConnectionSMState<M>> {
342 tokio::select! {
343 connection = self.incoming_connections.recv() => {
344 PEER_CONNECT_COUNT
345 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
346 .inc();
347
348 info!(target: LOG_NET_PEER, "Connected to peer");
349
350 Some(P2PConnectionSMState::Connected(connection.ok()?))
351 },
352 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
353 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
356
357 match self.connector.connect(self.peer_id).await {
358 Ok(connection) => {
359 PEER_CONNECT_COUNT
360 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
361 .inc();
362
363 info!(target: LOG_NET_PEER, "Connected to peer");
364
365 return Some(P2PConnectionSMState::Connected(connection));
366 }
367 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
368 }
369
370 Some(P2PConnectionSMState::Disconnected(backoff))
371 },
372 }
373 }
374}