fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9
10use async_channel::{Receiver, Sender, bounded};
11use async_trait::async_trait;
12use fedimint_core::PeerId;
13use fedimint_core::net::peers::{IP2PConnections, Recipient};
14use fedimint_core::task::{TaskGroup, sleep};
15use fedimint_core::util::FmtCompactAnyhow;
16use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
17use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
18use fedimint_server_core::dashboard_ui::P2PConnectionStatus;
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, debug, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<P2PConnectionStatus>>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<P2PConnectionStatus>>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32    let mut senders = BTreeMap::new();
33    let mut receivers = BTreeMap::new();
34
35    for peer in peers {
36        let (sender, receiver) = watch::channel(None);
37
38        senders.insert(peer, sender);
39        receivers.insert(peer, receiver);
40    }
41
42    (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47    connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51    pub fn new(
52        identity: PeerId,
53        connector: DynP2PConnector<M>,
54        task_group: &TaskGroup,
55        status_senders: P2PStatusSenders,
56    ) -> Self {
57        let mut connection_senders = BTreeMap::new();
58        let mut connections = BTreeMap::new();
59
60        for peer_id in connector.peers() {
61            assert_ne!(peer_id, identity);
62
63            let (connection_sender, connection_receiver) = bounded(4);
64
65            let connection = P2PConnection::new(
66                identity,
67                peer_id,
68                connector.clone(),
69                connection_receiver,
70                status_senders
71                    .get(&peer_id)
72                    .expect("No p2p status sender for peer")
73                    .clone(),
74                task_group,
75            );
76
77            connection_senders.insert(peer_id, connection_sender);
78            connections.insert(peer_id, connection);
79        }
80
81        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82            info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84            loop {
85                match connector.accept().await {
86                    Ok((peer, connection)) => {
87                        if connection_senders
88                            .get_mut(&peer)
89                            .expect("Authenticating connectors dont return unknown peers")
90                            .send(connection)
91                            .await
92                            .is_err()
93                        {
94                            break;
95                        }
96                    },
97                    Err(err) => {
98                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99                    }
100                }
101            }
102
103            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104        });
105
106        ReconnectP2PConnections { connections }
107    }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112    fn send(&self, recipient: Recipient, message: M) {
113        match recipient {
114            Recipient::Everyone => {
115                for connection in self.connections.values() {
116                    connection.try_send(message.clone());
117                }
118            }
119            Recipient::Peer(peer) => match self.connections.get(&peer) {
120                Some(connection) => {
121                    connection.try_send(message);
122                }
123                _ => {
124                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125                }
126            },
127        }
128    }
129
130    async fn receive(&self) -> Option<(PeerId, M)> {
131        select_all(self.connections.iter().map(|(&peer, connection)| {
132            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
133        }))
134        .await
135        .0
136    }
137
138    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
139        self.connections
140            .get(&peer)
141            .expect("No connection found for peer")
142            .receive()
143            .await
144    }
145}
146
147#[derive(Clone)]
148struct P2PConnection<M> {
149    outgoing_sender: Sender<M>,
150    incoming_receiver: Receiver<M>,
151}
152
153impl<M: Send + 'static> P2PConnection<M> {
154    fn new(
155        our_id: PeerId,
156        peer_id: PeerId,
157        connector: DynP2PConnector<M>,
158        incoming_connections: Receiver<DynP2PConnection<M>>,
159        status_sender: watch::Sender<Option<P2PConnectionStatus>>,
160        task_group: &TaskGroup,
161    ) -> P2PConnection<M> {
162        // We use small message queues here to avoid outdated messages such as requests
163        // for signed session outcomes to queue up while a peer is disconnected. The
164        // consensus expects an unreliable networking layer and will resend lost
165        // messages accordingly. Furthermore, during the DKG there will never be more
166        // than two messages in those channels at once, due to its sequential
167        // nature there.
168        let (outgoing_sender, outgoing_receiver) = bounded(5);
169        let (incoming_sender, incoming_receiver) = bounded(5);
170
171        task_group.spawn_cancellable(
172            format!("io-state-machine-{peer_id}"),
173            async move {
174                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
175
176                let mut state_machine = P2PConnectionStateMachine {
177                    common: P2PConnectionSMCommon {
178                        incoming_sender,
179                        outgoing_receiver,
180                        our_id_str: our_id.to_string(),
181                        our_id,
182                        peer_id_str: peer_id.to_string(),
183                        peer_id,
184                        connector,
185                        incoming_connections,
186                        status_sender,
187                    },
188                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
189                };
190
191                while let Some(sm) = state_machine.state_transition().await {
192                    state_machine = sm;
193                }
194
195                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
196            }
197            .instrument(info_span!("io-state-machine", ?peer_id)),
198        );
199
200        P2PConnection {
201            outgoing_sender,
202            incoming_receiver,
203        }
204    }
205
206    fn try_send(&self, message: M) {
207        if self.outgoing_sender.try_send(message).is_err() {
208            debug!(target: LOG_NET_PEER, "Outgoing message channel is full");
209        }
210    }
211
212    async fn receive(&self) -> Option<M> {
213        self.incoming_receiver.recv().await.ok()
214    }
215}
216
217struct P2PConnectionStateMachine<M> {
218    state: P2PConnectionSMState<M>,
219    common: P2PConnectionSMCommon<M>,
220}
221
222struct P2PConnectionSMCommon<M> {
223    incoming_sender: async_channel::Sender<M>,
224    outgoing_receiver: async_channel::Receiver<M>,
225    our_id: PeerId,
226    our_id_str: String,
227    peer_id: PeerId,
228    peer_id_str: String,
229    connector: DynP2PConnector<M>,
230    incoming_connections: Receiver<DynP2PConnection<M>>,
231    status_sender: watch::Sender<Option<P2PConnectionStatus>>,
232}
233
234enum P2PConnectionSMState<M> {
235    Disconnected(FibonacciBackoff),
236    Connected(DynP2PConnection<M>),
237}
238
239impl<M: Send + 'static> P2PConnectionStateMachine<M> {
240    async fn state_transition(mut self) -> Option<Self> {
241        match self.state {
242            P2PConnectionSMState::Disconnected(backoff) => {
243                self.common.status_sender.send_replace(None);
244
245                self.common.transition_disconnected(backoff).await
246            }
247            P2PConnectionSMState::Connected(connection) => {
248                let status = P2PConnectionStatus {
249                    conn_type: self.common.connector.connection_type(self.common.peer_id),
250                    rtt: connection.rtt(),
251                };
252
253                self.common.status_sender.send_replace(Some(status));
254
255                self.common.transition_connected(connection).await
256            }
257        }
258        .map(|state| P2PConnectionStateMachine {
259            common: self.common,
260            state,
261        })
262    }
263}
264
265impl<M: Send + 'static> P2PConnectionSMCommon<M> {
266    async fn transition_connected(
267        &mut self,
268        mut connection: DynP2PConnection<M>,
269    ) -> Option<P2PConnectionSMState<M>> {
270        tokio::select! {
271            message = self.outgoing_receiver.recv() => {
272                Some(self.send_message(connection, message.ok()?).await)
273            },
274            connection = self.incoming_connections.recv() => {
275                info!(target: LOG_NET_PEER, "Connected to peer");
276
277                Some(P2PConnectionSMState::Connected(connection.ok()?))
278            },
279            message = connection.receive() => {
280                let mut message = match message {
281                    Ok(message) => message,
282                    Err(e) => return Some(self.disconnect(e)),
283                };
284
285                match message.read_to_end().await {
286                    Ok(message) => {
287                        PEER_MESSAGES_COUNT
288                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
289                            .inc();
290
291                        if self.incoming_sender.try_send(message).is_err() {
292                            debug!(target: LOG_NET_PEER, "Incoming message channel is full");
293                        }
294                    },
295                    Err(e) => return Some(self.disconnect(e)),
296                }
297
298                Some(P2PConnectionSMState::Connected(connection))
299            },
300        }
301    }
302
303    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
304        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
305
306        PEER_DISCONNECT_COUNT
307            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
308            .inc();
309
310        P2PConnectionSMState::Disconnected(api_networking_backoff())
311    }
312
313    async fn send_message(
314        &mut self,
315        mut connection: DynP2PConnection<M>,
316        peer_message: M,
317    ) -> P2PConnectionSMState<M> {
318        PEER_MESSAGES_COUNT
319            .with_label_values(&[
320                self.our_id_str.as_str(),
321                self.peer_id_str.as_str(),
322                "outgoing",
323            ])
324            .inc();
325
326        if let Err(e) = connection.send(peer_message).await {
327            return self.disconnect(e);
328        }
329
330        P2PConnectionSMState::Connected(connection)
331    }
332
333    async fn transition_disconnected(
334        &mut self,
335        mut backoff: FibonacciBackoff,
336    ) -> Option<P2PConnectionSMState<M>> {
337        tokio::select! {
338            connection = self.incoming_connections.recv() => {
339                PEER_CONNECT_COUNT
340                    .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
341                    .inc();
342
343                info!(target: LOG_NET_PEER, "Connected to peer");
344
345                Some(P2PConnectionSMState::Connected(connection.ok()?))
346            },
347            // to prevent "reconnection ping-pongs", only the side with lower PeerId reconnects
348            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
349
350
351                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
352
353                match  self.connector.connect(self.peer_id).await {
354                    Ok(connection) => {
355                        PEER_CONNECT_COUNT
356                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
357                            .inc();
358
359                        info!(target: LOG_NET_PEER, "Connected to peer");
360
361                        return Some(P2PConnectionSMState::Connected(connection));
362                    }
363                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
364                }
365
366                Some(P2PConnectionSMState::Disconnected(backoff))
367            },
368        }
369    }
370}