fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9
10use async_channel::{Receiver, Sender, bounded};
11use async_trait::async_trait;
12use fedimint_api_client::api::P2PConnectionStatus;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<P2PConnectionStatus>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32    let mut senders = BTreeMap::new();
33    let mut receivers = BTreeMap::new();
34
35    for peer in peers {
36        let (sender, receiver) = watch::channel(P2PConnectionStatus::Disconnected);
37
38        senders.insert(peer, sender);
39        receivers.insert(peer, receiver);
40    }
41
42    (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47    connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51    pub fn new(
52        identity: PeerId,
53        connector: DynP2PConnector<M>,
54        task_group: &TaskGroup,
55        status_senders: P2PStatusSenders,
56    ) -> Self {
57        let mut connection_senders = BTreeMap::new();
58        let mut connections = BTreeMap::new();
59
60        for peer_id in connector.peers() {
61            assert_ne!(peer_id, identity);
62
63            let (connection_sender, connection_receiver) = bounded(4);
64
65            let connection = P2PConnection::new(
66                identity,
67                peer_id,
68                connector.clone(),
69                connection_receiver,
70                status_senders
71                    .get(&peer_id)
72                    .expect("No p2p status sender for peer {peer}")
73                    .clone(),
74                task_group,
75            );
76
77            connection_senders.insert(peer_id, connection_sender);
78            connections.insert(peer_id, connection);
79        }
80
81        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82            info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84            loop {
85                match connector.accept().await {
86                    Ok((peer, connection)) => {
87                        if connection_senders
88                            .get_mut(&peer)
89                            .expect("Authenticating connectors dont return unknown peers")
90                            .send(connection)
91                            .await
92                            .is_err()
93                        {
94                            break;
95                        }
96                    },
97                    Err(err) => {
98                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99                    }
100                }
101            }
102
103            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104        });
105
106        ReconnectP2PConnections { connections }
107    }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112    async fn send(&self, recipient: Recipient, message: M) {
113        match recipient {
114            Recipient::Everyone => {
115                for connection in self.connections.values() {
116                    connection.send(message.clone()).await;
117                }
118            }
119            Recipient::Peer(peer) => match self.connections.get(&peer) {
120                Some(connection) => {
121                    connection.send(message).await;
122                }
123                _ => {
124                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125                }
126            },
127        }
128    }
129
130    fn try_send(&self, recipient: Recipient, message: M) {
131        match recipient {
132            Recipient::Everyone => {
133                for connection in self.connections.values() {
134                    connection.try_send(message.clone());
135                }
136            }
137            Recipient::Peer(peer) => match self.connections.get(&peer) {
138                Some(connection) => {
139                    connection.try_send(message);
140                }
141                _ => {
142                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
143                }
144            },
145        }
146    }
147
148    async fn receive(&self) -> Option<(PeerId, M)> {
149        select_all(self.connections.iter().map(|(&peer, connection)| {
150            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
151        }))
152        .await
153        .0
154    }
155
156    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
157        self.connections
158            .get(&peer)
159            .expect("No connection found for peer {peer}")
160            .receive()
161            .await
162    }
163}
164
165#[derive(Clone)]
166struct P2PConnection<M> {
167    outgoing: Sender<M>,
168    incoming: Receiver<M>,
169}
170
171impl<M: Send + 'static> P2PConnection<M> {
172    #[allow(clippy::too_many_arguments)]
173    fn new(
174        our_id: PeerId,
175        peer_id: PeerId,
176        connector: DynP2PConnector<M>,
177        incoming_connections: Receiver<DynP2PConnection<M>>,
178        status_sender: watch::Sender<P2PConnectionStatus>,
179        task_group: &TaskGroup,
180    ) -> P2PConnection<M> {
181        let (outgoing_sender, outgoing_receiver) = bounded(1024);
182        let (incoming_sender, incoming_receiver) = bounded(1024);
183
184        task_group.spawn_cancellable(
185            format!("io-state-machine-{peer_id}"),
186            async move {
187                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
188
189                let mut state_machine = P2PConnectionStateMachine {
190                    common: P2PConnectionSMCommon {
191                        incoming_sender,
192                        outgoing_receiver,
193                        our_id_str: our_id.to_string(),
194                        our_id,
195                        peer_id_str: peer_id.to_string(),
196                        peer_id,
197                        connector,
198                        incoming_connections,
199                        status_sender,
200                    },
201                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
202                };
203
204                while let Some(sm) = state_machine.state_transition().await {
205                    state_machine = sm;
206                }
207
208                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
209            }
210            .instrument(info_span!("io-state-machine", ?peer_id)),
211        );
212
213        P2PConnection {
214            outgoing: outgoing_sender,
215            incoming: incoming_receiver,
216        }
217    }
218
219    async fn send(&self, message: M) {
220        self.outgoing.send(message).await.ok();
221    }
222
223    fn try_send(&self, message: M) {
224        self.outgoing.try_send(message).ok();
225    }
226
227    async fn receive(&self) -> Option<M> {
228        self.incoming.recv().await.ok()
229    }
230}
231
232struct P2PConnectionStateMachine<M> {
233    state: P2PConnectionSMState<M>,
234    common: P2PConnectionSMCommon<M>,
235}
236
237struct P2PConnectionSMCommon<M> {
238    incoming_sender: async_channel::Sender<M>,
239    outgoing_receiver: async_channel::Receiver<M>,
240    our_id: PeerId,
241    our_id_str: String,
242    peer_id: PeerId,
243    peer_id_str: String,
244    connector: DynP2PConnector<M>,
245    incoming_connections: Receiver<DynP2PConnection<M>>,
246    status_sender: watch::Sender<P2PConnectionStatus>,
247}
248
249enum P2PConnectionSMState<M> {
250    Disconnected(FibonacciBackoff),
251    Connected(DynP2PConnection<M>),
252}
253
254impl<M: Send + 'static> P2PConnectionStateMachine<M> {
255    async fn state_transition(mut self) -> Option<Self> {
256        match self.state {
257            P2PConnectionSMState::Disconnected(disconnected) => {
258                self.common
259                    .status_sender
260                    .send(P2PConnectionStatus::Disconnected)
261                    .ok();
262
263                self.common.transition_disconnected(disconnected).await
264            }
265            P2PConnectionSMState::Connected(connected) => {
266                self.common
267                    .status_sender
268                    .send(P2PConnectionStatus::Connected)
269                    .ok();
270
271                self.common.transition_connected(connected).await
272            }
273        }
274        .map(|state| P2PConnectionStateMachine {
275            common: self.common,
276            state,
277        })
278    }
279}
280
281impl<M: Send + 'static> P2PConnectionSMCommon<M> {
282    async fn transition_connected(
283        &mut self,
284        mut connection: DynP2PConnection<M>,
285    ) -> Option<P2PConnectionSMState<M>> {
286        tokio::select! {
287            message = self.outgoing_receiver.recv() => {
288                Some(self.send_message(connection, message.ok()?).await)
289            },
290            connection = self.incoming_connections.recv() => {
291                info!(target: LOG_NET_PEER, "Connected to peer");
292
293                Some(P2PConnectionSMState::Connected(connection.ok()?))
294            },
295            message = connection.receive() => {
296                match message {
297                    Ok(message) => {
298                        PEER_MESSAGES_COUNT
299                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
300                            .inc();
301
302                         self.incoming_sender.send(message).await.ok()?;
303                    },
304                    Err(e) => return Some(self.disconnect(e)),
305                };
306
307                Some(P2PConnectionSMState::Connected(connection))
308            },
309        }
310    }
311
312    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
313        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
314
315        PEER_DISCONNECT_COUNT
316            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
317            .inc();
318
319        P2PConnectionSMState::Disconnected(api_networking_backoff())
320    }
321
322    async fn send_message(
323        &mut self,
324        mut connection: DynP2PConnection<M>,
325        peer_message: M,
326    ) -> P2PConnectionSMState<M> {
327        PEER_MESSAGES_COUNT
328            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
329            .inc();
330
331        if let Err(e) = connection.send(peer_message).await {
332            return self.disconnect(e);
333        }
334
335        P2PConnectionSMState::Connected(connection)
336    }
337
338    async fn transition_disconnected(
339        &mut self,
340        mut backoff: FibonacciBackoff,
341    ) -> Option<P2PConnectionSMState<M>> {
342        tokio::select! {
343            connection = self.incoming_connections.recv() => {
344                PEER_CONNECT_COUNT
345                    .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
346                    .inc();
347
348                info!(target: LOG_NET_PEER, "Connected to peer");
349
350                Some(P2PConnectionSMState::Connected(connection.ok()?))
351            },
352            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
353                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
354
355                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
356
357                match  self.connector.connect(self.peer_id).await {
358                    Ok(connection) => {
359                        PEER_CONNECT_COUNT
360                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
361                            .inc();
362
363                        info!(target: LOG_NET_PEER, "Connected to peer");
364
365                        return Some(P2PConnectionSMState::Connected(connection));
366                    }
367                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
368                }
369
370                Some(P2PConnectionSMState::Disconnected(backoff))
371            },
372        }
373    }
374}