1use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32 let mut senders = BTreeMap::new();
33 let mut receivers = BTreeMap::new();
34
35 for peer in peers {
36 let (sender, receiver) = watch::channel(None);
37
38 senders.insert(peer, sender);
39 receivers.insert(peer, receiver);
40 }
41
42 (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47 connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51 pub fn new(
52 identity: PeerId,
53 connector: DynP2PConnector<M>,
54 task_group: &TaskGroup,
55 status_senders: P2PStatusSenders,
56 ) -> Self {
57 let mut connection_senders = BTreeMap::new();
58 let mut connections = BTreeMap::new();
59
60 for peer_id in connector.peers() {
61 assert_ne!(peer_id, identity);
62
63 let (connection_sender, connection_receiver) = bounded(4);
64
65 let connection = P2PConnection::new(
66 identity,
67 peer_id,
68 connector.clone(),
69 connection_receiver,
70 status_senders
71 .get(&peer_id)
72 .expect("No p2p status sender for peer {peer}")
73 .clone(),
74 task_group,
75 );
76
77 connection_senders.insert(peer_id, connection_sender);
78 connections.insert(peer_id, connection);
79 }
80
81 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82 info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84 loop {
85 match connector.accept().await {
86 Ok((peer, connection)) => {
87 if connection_senders
88 .get_mut(&peer)
89 .expect("Authenticating connectors dont return unknown peers")
90 .send(connection)
91 .await
92 .is_err()
93 {
94 break;
95 }
96 },
97 Err(err) => {
98 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99 }
100 }
101 }
102
103 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104 });
105
106 ReconnectP2PConnections { connections }
107 }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112 async fn send(&self, recipient: Recipient, message: M) {
113 match recipient {
114 Recipient::Everyone => {
115 for connection in self.connections.values() {
116 connection.send(message.clone()).await;
117 }
118 }
119 Recipient::Peer(peer) => match self.connections.get(&peer) {
120 Some(connection) => {
121 connection.send(message).await;
122 }
123 _ => {
124 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125 }
126 },
127 }
128 }
129
130 fn try_send(&self, recipient: Recipient, message: M) {
131 match recipient {
132 Recipient::Everyone => {
133 for connection in self.connections.values() {
134 connection.try_send(message.clone());
135 }
136 }
137 Recipient::Peer(peer) => match self.connections.get(&peer) {
138 Some(connection) => {
139 connection.try_send(message);
140 }
141 _ => {
142 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
143 }
144 },
145 }
146 }
147
148 async fn receive(&self) -> Option<(PeerId, M)> {
149 select_all(self.connections.iter().map(|(&peer, connection)| {
150 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
151 }))
152 .await
153 .0
154 }
155
156 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
157 self.connections
158 .get(&peer)
159 .expect("No connection found for peer {peer}")
160 .receive()
161 .await
162 }
163}
164
165#[derive(Clone)]
166struct P2PConnection<M> {
167 outgoing: Sender<M>,
168 incoming: Receiver<M>,
169}
170
171impl<M: Send + 'static> P2PConnection<M> {
172 #[allow(clippy::too_many_arguments)]
173 fn new(
174 our_id: PeerId,
175 peer_id: PeerId,
176 connector: DynP2PConnector<M>,
177 incoming_connections: Receiver<DynP2PConnection<M>>,
178 status_sender: watch::Sender<Option<Duration>>,
179 task_group: &TaskGroup,
180 ) -> P2PConnection<M> {
181 let (outgoing_sender, outgoing_receiver) = bounded(1024);
182 let (incoming_sender, incoming_receiver) = bounded(1024);
183
184 task_group.spawn_cancellable(
185 format!("io-state-machine-{peer_id}"),
186 async move {
187 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
188
189 let mut state_machine = P2PConnectionStateMachine {
190 common: P2PConnectionSMCommon {
191 incoming_sender,
192 outgoing_receiver,
193 our_id_str: our_id.to_string(),
194 our_id,
195 peer_id_str: peer_id.to_string(),
196 peer_id,
197 connector,
198 incoming_connections,
199 status_sender,
200 },
201 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
202 };
203
204 while let Some(sm) = state_machine.state_transition().await {
205 state_machine = sm;
206 }
207
208 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
209 }
210 .instrument(info_span!("io-state-machine", ?peer_id)),
211 );
212
213 P2PConnection {
214 outgoing: outgoing_sender,
215 incoming: incoming_receiver,
216 }
217 }
218
219 async fn send(&self, message: M) {
220 self.outgoing.send(message).await.ok();
221 }
222
223 fn try_send(&self, message: M) {
224 self.outgoing.try_send(message).ok();
225 }
226
227 async fn receive(&self) -> Option<M> {
228 self.incoming.recv().await.ok()
229 }
230}
231
232struct P2PConnectionStateMachine<M> {
233 state: P2PConnectionSMState<M>,
234 common: P2PConnectionSMCommon<M>,
235}
236
237struct P2PConnectionSMCommon<M> {
238 incoming_sender: async_channel::Sender<M>,
239 outgoing_receiver: async_channel::Receiver<M>,
240 our_id: PeerId,
241 our_id_str: String,
242 peer_id: PeerId,
243 peer_id_str: String,
244 connector: DynP2PConnector<M>,
245 incoming_connections: Receiver<DynP2PConnection<M>>,
246 status_sender: watch::Sender<Option<Duration>>,
247}
248
249enum P2PConnectionSMState<M> {
250 Disconnected(FibonacciBackoff),
251 Connected(DynP2PConnection<M>),
252}
253
254impl<M: Send + 'static> P2PConnectionStateMachine<M> {
255 async fn state_transition(mut self) -> Option<Self> {
256 match self.state {
257 P2PConnectionSMState::Disconnected(backoff) => {
258 self.common.status_sender.send_replace(None);
259
260 self.common.transition_disconnected(backoff).await
261 }
262 P2PConnectionSMState::Connected(connection) => {
263 self.common
264 .status_sender
265 .send_replace(Some(connection.rtt()));
266
267 self.common.transition_connected(connection).await
268 }
269 }
270 .map(|state| P2PConnectionStateMachine {
271 common: self.common,
272 state,
273 })
274 }
275}
276
277impl<M: Send + 'static> P2PConnectionSMCommon<M> {
278 async fn transition_connected(
279 &mut self,
280 mut connection: DynP2PConnection<M>,
281 ) -> Option<P2PConnectionSMState<M>> {
282 tokio::select! {
283 message = self.outgoing_receiver.recv() => {
284 Some(self.send_message(connection, message.ok()?).await)
285 },
286 connection = self.incoming_connections.recv() => {
287 info!(target: LOG_NET_PEER, "Connected to peer");
288
289 Some(P2PConnectionSMState::Connected(connection.ok()?))
290 },
291 message = connection.receive() => {
292 match message {
293 Ok(message) => {
294 PEER_MESSAGES_COUNT
295 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
296 .inc();
297
298 self.incoming_sender.send(message).await.ok()?;
299 },
300 Err(e) => return Some(self.disconnect(e)),
301 };
302
303 Some(P2PConnectionSMState::Connected(connection))
304 },
305 }
306 }
307
308 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
309 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
310
311 PEER_DISCONNECT_COUNT
312 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
313 .inc();
314
315 P2PConnectionSMState::Disconnected(api_networking_backoff())
316 }
317
318 async fn send_message(
319 &mut self,
320 mut connection: DynP2PConnection<M>,
321 peer_message: M,
322 ) -> P2PConnectionSMState<M> {
323 PEER_MESSAGES_COUNT
324 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
325 .inc();
326
327 if let Err(e) = connection.send(peer_message).await {
328 return self.disconnect(e);
329 }
330
331 P2PConnectionSMState::Connected(connection)
332 }
333
334 async fn transition_disconnected(
335 &mut self,
336 mut backoff: FibonacciBackoff,
337 ) -> Option<P2PConnectionSMState<M>> {
338 tokio::select! {
339 connection = self.incoming_connections.recv() => {
340 PEER_CONNECT_COUNT
341 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
342 .inc();
343
344 info!(target: LOG_NET_PEER, "Connected to peer");
345
346 Some(P2PConnectionSMState::Connected(connection.ok()?))
347 },
348 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
349 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
352
353 match self.connector.connect(self.peer_id).await {
354 Ok(connection) => {
355 PEER_CONNECT_COUNT
356 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
357 .inc();
358
359 info!(target: LOG_NET_PEER, "Connected to peer");
360
361 return Some(P2PConnectionSMState::Connected(connection));
362 }
363 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
364 }
365
366 Some(P2PConnectionSMState::Disconnected(backoff))
367 },
368 }
369 }
370}