fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::FutureExt;
20use futures::future::select_all;
21use tokio::sync::watch;
22use tracing::{Instrument, info, info_span, warn};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
29pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
30
31pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
32    let mut senders = BTreeMap::new();
33    let mut receivers = BTreeMap::new();
34
35    for peer in peers {
36        let (sender, receiver) = watch::channel(None);
37
38        senders.insert(peer, sender);
39        receivers.insert(peer, receiver);
40    }
41
42    (senders, receivers)
43}
44
45#[derive(Clone)]
46pub struct ReconnectP2PConnections<M> {
47    connections: BTreeMap<PeerId, P2PConnection<M>>,
48}
49
50impl<M: Send + 'static> ReconnectP2PConnections<M> {
51    pub fn new(
52        identity: PeerId,
53        connector: DynP2PConnector<M>,
54        task_group: &TaskGroup,
55        status_senders: P2PStatusSenders,
56    ) -> Self {
57        let mut connection_senders = BTreeMap::new();
58        let mut connections = BTreeMap::new();
59
60        for peer_id in connector.peers() {
61            assert_ne!(peer_id, identity);
62
63            let (connection_sender, connection_receiver) = bounded(4);
64
65            let connection = P2PConnection::new(
66                identity,
67                peer_id,
68                connector.clone(),
69                connection_receiver,
70                status_senders
71                    .get(&peer_id)
72                    .expect("No p2p status sender for peer {peer}")
73                    .clone(),
74                task_group,
75            );
76
77            connection_senders.insert(peer_id, connection_sender);
78            connections.insert(peer_id, connection);
79        }
80
81        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
82            info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
83
84            loop {
85                match connector.accept().await {
86                    Ok((peer, connection)) => {
87                        if connection_senders
88                            .get_mut(&peer)
89                            .expect("Authenticating connectors dont return unknown peers")
90                            .send(connection)
91                            .await
92                            .is_err()
93                        {
94                            break;
95                        }
96                    },
97                    Err(err) => {
98                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
99                    }
100                }
101            }
102
103            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
104        });
105
106        ReconnectP2PConnections { connections }
107    }
108}
109
110#[async_trait]
111impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
112    async fn send(&self, recipient: Recipient, message: M) {
113        match recipient {
114            Recipient::Everyone => {
115                for connection in self.connections.values() {
116                    connection.send(message.clone()).await;
117                }
118            }
119            Recipient::Peer(peer) => match self.connections.get(&peer) {
120                Some(connection) => {
121                    connection.send(message).await;
122                }
123                _ => {
124                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
125                }
126            },
127        }
128    }
129
130    fn try_send(&self, recipient: Recipient, message: M) {
131        match recipient {
132            Recipient::Everyone => {
133                for connection in self.connections.values() {
134                    connection.try_send(message.clone());
135                }
136            }
137            Recipient::Peer(peer) => match self.connections.get(&peer) {
138                Some(connection) => {
139                    connection.try_send(message);
140                }
141                _ => {
142                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
143                }
144            },
145        }
146    }
147
148    async fn receive(&self) -> Option<(PeerId, M)> {
149        select_all(self.connections.iter().map(|(&peer, connection)| {
150            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
151        }))
152        .await
153        .0
154    }
155
156    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
157        self.connections
158            .get(&peer)
159            .expect("No connection found for peer {peer}")
160            .receive()
161            .await
162    }
163}
164
165#[derive(Clone)]
166struct P2PConnection<M> {
167    outgoing: Sender<M>,
168    incoming: Receiver<M>,
169}
170
171impl<M: Send + 'static> P2PConnection<M> {
172    #[allow(clippy::too_many_arguments)]
173    fn new(
174        our_id: PeerId,
175        peer_id: PeerId,
176        connector: DynP2PConnector<M>,
177        incoming_connections: Receiver<DynP2PConnection<M>>,
178        status_sender: watch::Sender<Option<Duration>>,
179        task_group: &TaskGroup,
180    ) -> P2PConnection<M> {
181        let (outgoing_sender, outgoing_receiver) = bounded(1024);
182        let (incoming_sender, incoming_receiver) = bounded(1024);
183
184        task_group.spawn_cancellable(
185            format!("io-state-machine-{peer_id}"),
186            async move {
187                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
188
189                let mut state_machine = P2PConnectionStateMachine {
190                    common: P2PConnectionSMCommon {
191                        incoming_sender,
192                        outgoing_receiver,
193                        our_id_str: our_id.to_string(),
194                        our_id,
195                        peer_id_str: peer_id.to_string(),
196                        peer_id,
197                        connector,
198                        incoming_connections,
199                        status_sender,
200                    },
201                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
202                };
203
204                while let Some(sm) = state_machine.state_transition().await {
205                    state_machine = sm;
206                }
207
208                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
209            }
210            .instrument(info_span!("io-state-machine", ?peer_id)),
211        );
212
213        P2PConnection {
214            outgoing: outgoing_sender,
215            incoming: incoming_receiver,
216        }
217    }
218
219    async fn send(&self, message: M) {
220        self.outgoing.send(message).await.ok();
221    }
222
223    fn try_send(&self, message: M) {
224        self.outgoing.try_send(message).ok();
225    }
226
227    async fn receive(&self) -> Option<M> {
228        self.incoming.recv().await.ok()
229    }
230}
231
232struct P2PConnectionStateMachine<M> {
233    state: P2PConnectionSMState<M>,
234    common: P2PConnectionSMCommon<M>,
235}
236
237struct P2PConnectionSMCommon<M> {
238    incoming_sender: async_channel::Sender<M>,
239    outgoing_receiver: async_channel::Receiver<M>,
240    our_id: PeerId,
241    our_id_str: String,
242    peer_id: PeerId,
243    peer_id_str: String,
244    connector: DynP2PConnector<M>,
245    incoming_connections: Receiver<DynP2PConnection<M>>,
246    status_sender: watch::Sender<Option<Duration>>,
247}
248
249enum P2PConnectionSMState<M> {
250    Disconnected(FibonacciBackoff),
251    Connected(DynP2PConnection<M>),
252}
253
254impl<M: Send + 'static> P2PConnectionStateMachine<M> {
255    async fn state_transition(mut self) -> Option<Self> {
256        match self.state {
257            P2PConnectionSMState::Disconnected(backoff) => {
258                self.common.status_sender.send_replace(None);
259
260                self.common.transition_disconnected(backoff).await
261            }
262            P2PConnectionSMState::Connected(connection) => {
263                self.common
264                    .status_sender
265                    .send_replace(Some(connection.rtt()));
266
267                self.common.transition_connected(connection).await
268            }
269        }
270        .map(|state| P2PConnectionStateMachine {
271            common: self.common,
272            state,
273        })
274    }
275}
276
277impl<M: Send + 'static> P2PConnectionSMCommon<M> {
278    async fn transition_connected(
279        &mut self,
280        mut connection: DynP2PConnection<M>,
281    ) -> Option<P2PConnectionSMState<M>> {
282        tokio::select! {
283            message = self.outgoing_receiver.recv() => {
284                Some(self.send_message(connection, message.ok()?).await)
285            },
286            connection = self.incoming_connections.recv() => {
287                info!(target: LOG_NET_PEER, "Connected to peer");
288
289                Some(P2PConnectionSMState::Connected(connection.ok()?))
290            },
291            message = connection.receive() => {
292                match message {
293                    Ok(message) => {
294                        PEER_MESSAGES_COUNT
295                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
296                            .inc();
297
298                         self.incoming_sender.send(message).await.ok()?;
299                    },
300                    Err(e) => return Some(self.disconnect(e)),
301                };
302
303                Some(P2PConnectionSMState::Connected(connection))
304            },
305        }
306    }
307
308    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
309        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
310
311        PEER_DISCONNECT_COUNT
312            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
313            .inc();
314
315        P2PConnectionSMState::Disconnected(api_networking_backoff())
316    }
317
318    async fn send_message(
319        &mut self,
320        mut connection: DynP2PConnection<M>,
321        peer_message: M,
322    ) -> P2PConnectionSMState<M> {
323        PEER_MESSAGES_COUNT
324            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
325            .inc();
326
327        if let Err(e) = connection.send(peer_message).await {
328            return self.disconnect(e);
329        }
330
331        P2PConnectionSMState::Connected(connection)
332    }
333
334    async fn transition_disconnected(
335        &mut self,
336        mut backoff: FibonacciBackoff,
337    ) -> Option<P2PConnectionSMState<M>> {
338        tokio::select! {
339            connection = self.incoming_connections.recv() => {
340                PEER_CONNECT_COUNT
341                    .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
342                    .inc();
343
344                info!(target: LOG_NET_PEER, "Connected to peer");
345
346                Some(P2PConnectionSMState::Connected(connection.ok()?))
347            },
348            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
349                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
350
351                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
352
353                match  self.connector.connect(self.peer_id).await {
354                    Ok(connection) => {
355                        PEER_CONNECT_COUNT
356                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
357                            .inc();
358
359                        info!(target: LOG_NET_PEER, "Connected to peer");
360
361                        return Some(P2PConnectionSMState::Connected(connection));
362                    }
363                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
364                }
365
366                Some(P2PConnectionSMState::Disconnected(backoff))
367            },
368        }
369    }
370}