fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use fedimint_server_core::dashboard_ui::ConnectionType;
20use futures::FutureExt;
21use futures::future::select_all;
22use tokio::sync::watch;
23use tracing::{Instrument, debug, info, info_span, warn};
24
25use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
26use crate::net::p2p_connection::DynP2PConnection;
27use crate::net::p2p_connector::DynP2PConnector;
28
29pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
30pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
31
32pub type P2PConnectionTypeSenders = BTreeMap<PeerId, watch::Sender<ConnectionType>>;
33pub type P2PConnectionTypeReceivers = BTreeMap<PeerId, watch::Receiver<ConnectionType>>;
34
35pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
36    let mut senders = BTreeMap::new();
37    let mut receivers = BTreeMap::new();
38
39    for peer in peers {
40        let (sender, receiver) = watch::channel(None);
41
42        senders.insert(peer, sender);
43        receivers.insert(peer, receiver);
44    }
45
46    (senders, receivers)
47}
48
49pub fn p2p_connection_type_channels(
50    peers: Vec<PeerId>,
51) -> (P2PConnectionTypeSenders, P2PConnectionTypeReceivers) {
52    let mut senders = BTreeMap::new();
53    let mut receivers = BTreeMap::new();
54
55    for peer in peers {
56        let (sender, receiver) = watch::channel(ConnectionType::Direct);
57
58        senders.insert(peer, sender);
59        receivers.insert(peer, receiver);
60    }
61
62    (senders, receivers)
63}
64
65#[derive(Clone)]
66pub struct ReconnectP2PConnections<M> {
67    connections: BTreeMap<PeerId, P2PConnection<M>>,
68}
69
70impl<M: Send + 'static> ReconnectP2PConnections<M> {
71    pub fn new(
72        identity: PeerId,
73        connector: DynP2PConnector<M>,
74        task_group: &TaskGroup,
75        status_senders: P2PStatusSenders,
76        connection_type_senders: P2PConnectionTypeSenders,
77    ) -> Self {
78        let mut connection_senders = BTreeMap::new();
79        let mut connections = BTreeMap::new();
80
81        for peer_id in connector.peers() {
82            assert_ne!(peer_id, identity);
83
84            let (connection_sender, connection_receiver) = bounded(4);
85
86            let connection = P2PConnection::new(
87                identity,
88                peer_id,
89                connector.clone(),
90                connection_receiver,
91                status_senders
92                    .get(&peer_id)
93                    .expect("No p2p status sender for peer")
94                    .clone(),
95                connection_type_senders
96                    .get(&peer_id)
97                    .expect("No p2p connection type sender for peer")
98                    .clone(),
99                task_group,
100            );
101
102            connection_senders.insert(peer_id, connection_sender);
103            connections.insert(peer_id, connection);
104        }
105
106        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
107            info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
108
109            loop {
110                match connector.accept().await {
111                    Ok((peer, connection)) => {
112                        if connection_senders
113                            .get_mut(&peer)
114                            .expect("Authenticating connectors dont return unknown peers")
115                            .send(connection)
116                            .await
117                            .is_err()
118                        {
119                            break;
120                        }
121                    },
122                    Err(err) => {
123                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
124                    }
125                }
126            }
127
128            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
129        });
130
131        ReconnectP2PConnections { connections }
132    }
133}
134
135#[async_trait]
136impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
137    fn send(&self, recipient: Recipient, message: M) {
138        match recipient {
139            Recipient::Everyone => {
140                for connection in self.connections.values() {
141                    connection.try_send(message.clone());
142                }
143            }
144            Recipient::Peer(peer) => match self.connections.get(&peer) {
145                Some(connection) => {
146                    connection.try_send(message);
147                }
148                _ => {
149                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
150                }
151            },
152        }
153    }
154
155    async fn receive(&self) -> Option<(PeerId, M)> {
156        select_all(self.connections.iter().map(|(&peer, connection)| {
157            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
158        }))
159        .await
160        .0
161    }
162
163    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
164        self.connections
165            .get(&peer)
166            .expect("No connection found for peer")
167            .receive()
168            .await
169    }
170}
171
172#[derive(Clone)]
173struct P2PConnection<M> {
174    outgoing_sender: Sender<M>,
175    incoming_receiver: Receiver<M>,
176}
177
178impl<M: Send + 'static> P2PConnection<M> {
179    #[allow(clippy::too_many_arguments)]
180    fn new(
181        our_id: PeerId,
182        peer_id: PeerId,
183        connector: DynP2PConnector<M>,
184        incoming_connections: Receiver<DynP2PConnection<M>>,
185        status_sender: watch::Sender<Option<Duration>>,
186        connection_type_sender: watch::Sender<ConnectionType>,
187        task_group: &TaskGroup,
188    ) -> P2PConnection<M> {
189        // We use small message queues here to avoid outdated messages such as requests
190        // for signed session outcomes to queue up while a peer is disconnected. The
191        // consensus expects an unreliable networking layer and will resend lost
192        // messages accordingly. Furthermore, during the DKG there will never be more
193        // than two messages in those channels at once, due to its sequential
194        // nature there.
195        let (outgoing_sender, outgoing_receiver) = bounded(5);
196        let (incoming_sender, incoming_receiver) = bounded(5);
197
198        let connector_clone = connector.clone();
199        let connection_type_sender_clone = connection_type_sender.clone();
200
201        // Spawn periodic connection type polling task
202        task_group.spawn_cancellable(
203            format!("connection-type-poller-{peer_id}"),
204            async move {
205                let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
206                loop {
207                    interval.tick().await;
208                    let connection_type = connector_clone.connection_type(peer_id).await;
209                    let _ = connection_type_sender_clone.send_replace(connection_type);
210                }
211            }
212            .instrument(info_span!("connection-type-poller", ?peer_id)),
213        );
214
215        task_group.spawn_cancellable(
216            format!("io-state-machine-{peer_id}"),
217            async move {
218                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
219
220                let mut state_machine = P2PConnectionStateMachine {
221                    common: P2PConnectionSMCommon {
222                        incoming_sender,
223                        outgoing_receiver,
224                        our_id_str: our_id.to_string(),
225                        our_id,
226                        peer_id_str: peer_id.to_string(),
227                        peer_id,
228                        connector,
229                        incoming_connections,
230                        status_sender,
231                    },
232                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
233                };
234
235                while let Some(sm) = state_machine.state_transition().await {
236                    state_machine = sm;
237                }
238
239                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
240            }
241            .instrument(info_span!("io-state-machine", ?peer_id)),
242        );
243
244        P2PConnection {
245            outgoing_sender,
246            incoming_receiver,
247        }
248    }
249
250    fn try_send(&self, message: M) {
251        if self.outgoing_sender.try_send(message).is_err() {
252            debug!(target: LOG_NET_PEER, "Outgoing message channel is full");
253        }
254    }
255
256    async fn receive(&self) -> Option<M> {
257        self.incoming_receiver.recv().await.ok()
258    }
259}
260
261struct P2PConnectionStateMachine<M> {
262    state: P2PConnectionSMState<M>,
263    common: P2PConnectionSMCommon<M>,
264}
265
266struct P2PConnectionSMCommon<M> {
267    incoming_sender: async_channel::Sender<M>,
268    outgoing_receiver: async_channel::Receiver<M>,
269    our_id: PeerId,
270    our_id_str: String,
271    peer_id: PeerId,
272    peer_id_str: String,
273    connector: DynP2PConnector<M>,
274    incoming_connections: Receiver<DynP2PConnection<M>>,
275    status_sender: watch::Sender<Option<Duration>>,
276}
277
278enum P2PConnectionSMState<M> {
279    Disconnected(FibonacciBackoff),
280    Connected(DynP2PConnection<M>),
281}
282
283impl<M: Send + 'static> P2PConnectionStateMachine<M> {
284    async fn state_transition(mut self) -> Option<Self> {
285        match self.state {
286            P2PConnectionSMState::Disconnected(backoff) => {
287                self.common.status_sender.send_replace(None);
288
289                self.common.transition_disconnected(backoff).await
290            }
291            P2PConnectionSMState::Connected(connection) => {
292                self.common
293                    .status_sender
294                    .send_replace(Some(connection.rtt()));
295
296                self.common.transition_connected(connection).await
297            }
298        }
299        .map(|state| P2PConnectionStateMachine {
300            common: self.common,
301            state,
302        })
303    }
304}
305
306impl<M: Send + 'static> P2PConnectionSMCommon<M> {
307    async fn transition_connected(
308        &mut self,
309        mut connection: DynP2PConnection<M>,
310    ) -> Option<P2PConnectionSMState<M>> {
311        tokio::select! {
312            message = self.outgoing_receiver.recv() => {
313                Some(self.send_message(connection, message.ok()?).await)
314            },
315            connection = self.incoming_connections.recv() => {
316                info!(target: LOG_NET_PEER, "Connected to peer");
317
318                Some(P2PConnectionSMState::Connected(connection.ok()?))
319            },
320            message = connection.receive() => {
321                let mut message = match message {
322                    Ok(message) => message,
323                    Err(e) => return Some(self.disconnect(e)),
324                };
325
326                match message.read_to_end().await {
327                    Ok(message) => {
328                        PEER_MESSAGES_COUNT
329                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
330                            .inc();
331
332                        if self.incoming_sender.try_send(message).is_err() {
333                            debug!(target: LOG_NET_PEER, "Incoming message channel is full");
334                        }
335                    },
336                    Err(e) => return Some(self.disconnect(e)),
337                }
338
339                Some(P2PConnectionSMState::Connected(connection))
340            },
341        }
342    }
343
344    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
345        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
346
347        PEER_DISCONNECT_COUNT
348            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
349            .inc();
350
351        P2PConnectionSMState::Disconnected(api_networking_backoff())
352    }
353
354    async fn send_message(
355        &mut self,
356        mut connection: DynP2PConnection<M>,
357        peer_message: M,
358    ) -> P2PConnectionSMState<M> {
359        PEER_MESSAGES_COUNT
360            .with_label_values(&[
361                self.our_id_str.as_str(),
362                self.peer_id_str.as_str(),
363                "outgoing",
364            ])
365            .inc();
366
367        if let Err(e) = connection.send(peer_message).await {
368            return self.disconnect(e);
369        }
370
371        P2PConnectionSMState::Connected(connection)
372    }
373
374    async fn transition_disconnected(
375        &mut self,
376        mut backoff: FibonacciBackoff,
377    ) -> Option<P2PConnectionSMState<M>> {
378        tokio::select! {
379            connection = self.incoming_connections.recv() => {
380                PEER_CONNECT_COUNT
381                    .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
382                    .inc();
383
384                info!(target: LOG_NET_PEER, "Connected to peer");
385
386                Some(P2PConnectionSMState::Connected(connection.ok()?))
387            },
388            // to prevent "reconnection ping-pongs", only the side with lower PeerId reconnects
389            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
390
391
392                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
393
394                match  self.connector.connect(self.peer_id).await {
395                    Ok(connection) => {
396                        PEER_CONNECT_COUNT
397                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
398                            .inc();
399
400                        info!(target: LOG_NET_PEER, "Connected to peer");
401
402                        return Some(P2PConnectionSMState::Connected(connection));
403                    }
404                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
405                }
406
407                Some(P2PConnectionSMState::Disconnected(backoff))
408            },
409        }
410    }
411}