fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connector interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
15use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
16use fedimint_core::util::SafeUrl;
17use fedimint_logging::LOG_NET_IROH;
18use iroh::{Endpoint, NodeAddr, NodeId, SecretKey};
19use iroh_base::ticket::NodeTicket;
20use rustls::ServerName;
21use serde::Serialize;
22use serde::de::DeserializeOwned;
23use tokio::net::{TcpListener, TcpStream};
24use tokio_rustls::rustls::RootCertStore;
25use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
26use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
27use tokio_util::codec::LengthDelimitedCodec;
28use tracing::trace;
29
30use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
31
32pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
33
34/// Allows to connect to peers and to listen for incoming connections.
35/// Connections are message based and should be authenticated and encrypted for
36/// production deployments.
37#[async_trait]
38pub trait IP2PConnector<M>: Send + Sync + 'static {
39    fn peers(&self) -> Vec<PeerId>;
40
41    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
42
43    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
44
45    fn into_dyn(self) -> DynP2PConnector<M>
46    where
47        Self: Sized,
48    {
49        Arc::new(self)
50    }
51}
52
53#[derive(Debug, Clone)]
54pub struct TlsConfig {
55    pub private_key: rustls::PrivateKey,
56    pub certificates: BTreeMap<PeerId, rustls::Certificate>,
57    pub peer_names: BTreeMap<PeerId, String>,
58}
59
60/// TCP connector with encryption and authentication
61pub struct TlsTcpConnector {
62    cfg: TlsConfig,
63    peers: BTreeMap<PeerId, SafeUrl>,
64    identity: PeerId,
65    listener: TcpListener,
66    acceptor: TlsAcceptor,
67}
68
69impl TlsTcpConnector {
70    pub async fn new(
71        cfg: TlsConfig,
72        p2p_bind_addr: SocketAddr,
73        peers: BTreeMap<PeerId, PeerUrl>,
74        identity: PeerId,
75    ) -> TlsTcpConnector {
76        let mut root_cert_store = RootCertStore::empty();
77
78        for cert in cfg.certificates.values() {
79            root_cert_store
80                .add(cert)
81                .expect("Could not add peer certificate");
82        }
83
84        let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
85
86        let certificate = cfg
87            .certificates
88            .get(&identity)
89            .expect("No certificate for ourself found")
90            .clone();
91
92        let config = rustls::ServerConfig::builder()
93            .with_safe_defaults()
94            .with_client_cert_verifier(Arc::from(verifier))
95            .with_single_cert(vec![certificate], cfg.private_key.clone())
96            .expect("Failed to create TLS config");
97
98        let listener = TcpListener::bind(p2p_bind_addr)
99            .await
100            .expect("Could not bind to port");
101
102        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
103
104        TlsTcpConnector {
105            cfg,
106            peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
107            identity,
108            listener,
109            acceptor,
110        }
111    }
112}
113
114#[async_trait]
115impl<M> IP2PConnector<M> for TlsTcpConnector
116where
117    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
118{
119    fn peers(&self) -> Vec<PeerId> {
120        self.peers
121            .keys()
122            .filter(|peer| **peer != self.identity)
123            .copied()
124            .collect()
125    }
126
127    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
128        let mut root_cert_store = RootCertStore::empty();
129
130        for cert in self.cfg.certificates.values() {
131            root_cert_store
132                .add(cert)
133                .expect("Could not add peer certificate");
134        }
135
136        let certificate = self
137            .cfg
138            .certificates
139            .get(&self.identity)
140            .expect("No certificate for ourself found")
141            .clone();
142
143        let cfg = rustls::ClientConfig::builder()
144            .with_safe_defaults()
145            .with_root_certificates(root_cert_store)
146            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
147            .expect("Failed to create TLS config");
148
149        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
150            .expect("Always a valid DNS name");
151
152        let destination = self.peers.get(&peer).expect("No url for peer {peer}");
153
154        let tls = TlsConnector::from(Arc::new(cfg))
155            .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
156            .await?;
157
158        let certificate = tls
159            .get_ref()
160            .1
161            .peer_certificates()
162            .context("Peer did not authenticate itself")?
163            .first()
164            .context("Received certificate chain of length zero")?;
165
166        let auth_peer = self
167            .cfg
168            .certificates
169            .iter()
170            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
171            .context("Unknown certificate")?;
172
173        ensure!(auth_peer == peer, "Connected to unexpected peer");
174
175        let framed = LengthDelimitedCodec::builder()
176            .length_field_type::<u64>()
177            .new_framed(TlsStream::Client(tls));
178
179        Ok(framed.into_dyn())
180    }
181
182    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
183        let tls = self
184            .acceptor
185            .accept(self.listener.accept().await?.0)
186            .await?;
187
188        let certificate = tls
189            .get_ref()
190            .1
191            .peer_certificates()
192            .context("Peer did not authenticate itself")?
193            .first()
194            .context("Received certificate chain of length zero")?;
195
196        let auth_peer = self
197            .cfg
198            .certificates
199            .iter()
200            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
201            .context("Unknown certificate")?;
202
203        let framed = LengthDelimitedCodec::builder()
204            .length_field_type::<u64>()
205            .new_framed(TlsStream::Server(tls));
206
207        Ok((auth_peer, framed.into_dyn()))
208    }
209}
210
211pub fn gen_cert_and_key(
212    name: &str,
213) -> Result<(rustls::Certificate, rustls::PrivateKey), anyhow::Error> {
214    let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
215
216    Ok((
217        rustls::Certificate(cert_key.cert.der().to_vec()),
218        rustls::PrivateKey(cert_key.key_pair.serialize_der()),
219    ))
220}
221
222/// Sanitizes name as valid domain name
223pub fn dns_sanitize(name: &str) -> String {
224    format!(
225        "peer{}",
226        name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
227    )
228}
229
230/// Parses the host and port from a url
231pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
232    ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
233
234    let host = url.host_str().context("p2p url is missing host")?;
235
236    let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
237
238    Ok(format!("{host}:{port}"))
239}
240
241#[derive(Debug, Clone)]
242pub struct IrohConnector {
243    /// Map of all peers' connection information we want to be connected to
244    pub node_ids: BTreeMap<PeerId, NodeId>,
245    /// The Iroh endpoint
246    pub endpoint: Endpoint,
247    /// List of overrides to use when attempting to connect to given `NodeId`
248    ///
249    /// This is useful for testing, or forcing non-default network connectivity.
250    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
251}
252
253const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
254
255impl IrohConnector {
256    pub async fn new(
257        secret_key: SecretKey,
258        p2p_bind_addr: SocketAddr,
259        node_ids: BTreeMap<PeerId, NodeId>,
260    ) -> anyhow::Result<Self> {
261        let mut s = Self::new_no_overrides(secret_key, p2p_bind_addr, node_ids).await;
262
263        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
264            s = s.with_connection_override(k, v.into());
265        }
266
267        Ok(s)
268    }
269
270    pub async fn new_no_overrides(
271        secret_key: SecretKey,
272        bind_addr: SocketAddr,
273        node_ids: BTreeMap<PeerId, NodeId>,
274    ) -> Self {
275        let identity = *node_ids
276            .iter()
277            .find(|entry| entry.1 == &secret_key.public())
278            .expect("Our public key is not part of the keyset")
279            .0;
280
281        let builder = Endpoint::builder()
282            .discovery_n0()
283            .discovery_dht()
284            .secret_key(secret_key)
285            .alpns(vec![FEDIMINT_P2P_ALPN.to_vec()]);
286
287        let builder = match bind_addr {
288            SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
289            SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
290        };
291        Self {
292            node_ids: node_ids
293                .into_iter()
294                .filter(|entry| entry.0 != identity)
295                .collect(),
296            endpoint: builder.bind().await.expect("Could not bind to port"),
297            connection_overrides: BTreeMap::default(),
298        }
299    }
300
301    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
302        self.connection_overrides.insert(node, addr);
303        self
304    }
305}
306
307#[async_trait]
308impl<M> IP2PConnector<M> for IrohConnector
309where
310    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
311{
312    fn peers(&self) -> Vec<PeerId> {
313        self.node_ids.keys().copied().collect()
314    }
315
316    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
317        let node_id = *self
318            .node_ids
319            .get(&peer)
320            .expect("No node id found for peer {peer}");
321
322        let connection = match self.connection_overrides.get(&node_id) {
323            Some(node_addr) => {
324                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
325                self.endpoint
326                    .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
327                    .await?
328            }
329            None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
330        };
331
332        Ok(connection.into_dyn())
333    }
334
335    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
336        let connection = self
337            .endpoint
338            .accept()
339            .await
340            .context("Listener closed unexpectedly")?
341            .accept()?
342            .await?;
343
344        let node_id = connection.remote_node_id()?;
345
346        let auth_peer = self
347            .node_ids
348            .iter()
349            .find(|entry| entry.1 == &node_id)
350            .context("Node id {node_id} is unknown")?
351            .0;
352
353        Ok((*auth_peer, connection.into_dyn()))
354    }
355}