fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connector interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
15use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
16use fedimint_core::net::iroh::build_iroh_endpoint;
17use fedimint_core::util::SafeUrl;
18use fedimint_logging::LOG_NET_IROH;
19use fedimint_server_core::dashboard_ui::ConnectionType;
20use iroh::{Endpoint, NodeAddr, NodeId, SecretKey};
21use iroh_base::ticket::NodeTicket;
22use rustls::pki_types::ServerName;
23use serde::Serialize;
24use serde::de::DeserializeOwned;
25use tokio::net::{TcpListener, TcpStream};
26use tokio_rustls::rustls::RootCertStore;
27use tokio_rustls::rustls::server::WebPkiClientVerifier;
28use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
29use tokio_util::codec::LengthDelimitedCodec;
30use tracing::trace;
31
32use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
33
34pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
35
36/// Allows to connect to peers and to listen for incoming connections.
37/// Connections are message based and should be authenticated and encrypted for
38/// production deployments.
39#[async_trait]
40pub trait IP2PConnector<M>: Send + Sync + 'static {
41    fn peers(&self) -> Vec<PeerId>;
42
43    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
44
45    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
46
47    /// Get the connection type for a specific peer
48    async fn connection_type(&self, peer: PeerId) -> ConnectionType;
49
50    fn into_dyn(self) -> DynP2PConnector<M>
51    where
52        Self: Sized,
53    {
54        Arc::new(self)
55    }
56}
57
58#[derive(Debug, Clone)]
59pub struct TlsConfig {
60    pub private_key: Arc<rustls::pki_types::PrivateKeyDer<'static>>,
61    pub certificates: BTreeMap<PeerId, rustls::pki_types::CertificateDer<'static>>,
62    pub peer_names: BTreeMap<PeerId, String>,
63}
64
65/// TCP connector with encryption and authentication
66pub struct TlsTcpConnector {
67    cfg: TlsConfig,
68    peers: BTreeMap<PeerId, SafeUrl>,
69    identity: PeerId,
70    listener: TcpListener,
71    acceptor: TlsAcceptor,
72}
73
74impl TlsTcpConnector {
75    pub async fn new(
76        cfg: TlsConfig,
77        p2p_bind_addr: SocketAddr,
78        peers: BTreeMap<PeerId, PeerUrl>,
79        identity: PeerId,
80    ) -> TlsTcpConnector {
81        let mut root_cert_store = RootCertStore::empty();
82
83        for cert in cfg.certificates.values() {
84            root_cert_store
85                .add(cert.clone())
86                .expect("Could not add peer certificate");
87        }
88
89        let verifier = WebPkiClientVerifier::builder(root_cert_store.into())
90            .build()
91            .expect("Failed to create client verifier");
92
93        let certificate = cfg
94            .certificates
95            .get(&identity)
96            .expect("No certificate for ourself found")
97            .clone();
98
99        let config = rustls::ServerConfig::builder()
100            .with_client_cert_verifier(verifier)
101            .with_single_cert(vec![certificate], cfg.private_key.clone_key())
102            .expect("Failed to create TLS config");
103
104        let listener = TcpListener::bind(p2p_bind_addr)
105            .await
106            .expect("Could not bind to port");
107
108        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
109
110        TlsTcpConnector {
111            cfg,
112            peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
113            identity,
114            listener,
115            acceptor,
116        }
117    }
118}
119
120#[async_trait]
121impl<M> IP2PConnector<M> for TlsTcpConnector
122where
123    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
124{
125    fn peers(&self) -> Vec<PeerId> {
126        self.peers
127            .keys()
128            .filter(|peer| **peer != self.identity)
129            .copied()
130            .collect()
131    }
132
133    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
134        let mut root_cert_store = RootCertStore::empty();
135
136        for cert in self.cfg.certificates.values() {
137            root_cert_store
138                .add(cert.clone())
139                .expect("Could not add peer certificate");
140        }
141
142        let certificate = self
143            .cfg
144            .certificates
145            .get(&self.identity)
146            .expect("No certificate for ourself found")
147            .clone();
148
149        let cfg = rustls::ClientConfig::builder()
150            .with_root_certificates(root_cert_store)
151            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone_key())
152            .expect("Failed to create TLS config");
153
154        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]))
155            .expect("Always a valid DNS name");
156
157        let destination = self.peers.get(&peer).expect("No url for peer");
158
159        let tls = TlsConnector::from(Arc::new(cfg))
160            .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
161            .await?;
162
163        let certificate = tls
164            .get_ref()
165            .1
166            .peer_certificates()
167            .context("Peer did not authenticate itself")?
168            .first()
169            .context("Received certificate chain of length zero")?;
170
171        let auth_peer = self
172            .cfg
173            .certificates
174            .iter()
175            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
176            .context("Unknown certificate")?;
177
178        ensure!(auth_peer == peer, "Connected to unexpected peer");
179
180        let framed = LengthDelimitedCodec::builder()
181            .length_field_type::<u64>()
182            .new_framed(TlsStream::Client(tls));
183
184        Ok(framed.into_dyn())
185    }
186
187    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
188        let tls = self
189            .acceptor
190            .accept(self.listener.accept().await?.0)
191            .await?;
192
193        let certificate = tls
194            .get_ref()
195            .1
196            .peer_certificates()
197            .context("Peer did not authenticate itself")?
198            .first()
199            .context("Received certificate chain of length zero")?;
200
201        let auth_peer = self
202            .cfg
203            .certificates
204            .iter()
205            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
206            .context("Unknown certificate")?;
207
208        let framed = LengthDelimitedCodec::builder()
209            .length_field_type::<u64>()
210            .new_framed(TlsStream::Server(tls));
211
212        Ok((auth_peer, framed.into_dyn()))
213    }
214
215    async fn connection_type(&self, _peer: PeerId) -> ConnectionType {
216        // TLS connections are always direct
217        ConnectionType::Direct
218    }
219}
220
221pub fn gen_cert_and_key(
222    name: &str,
223) -> Result<
224    (
225        rustls::pki_types::CertificateDer<'static>,
226        Arc<rustls::pki_types::PrivateKeyDer<'static>>,
227    ),
228    anyhow::Error,
229> {
230    let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
231
232    Ok((
233        rustls::pki_types::CertificateDer::from(cert_key.cert.der().to_vec()),
234        Arc::new(
235            rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der())
236                .expect("Failed to create private key"),
237        ),
238    ))
239}
240
241/// Sanitizes name as valid domain name
242pub fn dns_sanitize(name: &str) -> String {
243    format!(
244        "peer{}",
245        name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
246    )
247}
248
249/// Parses the host and port from a url
250pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
251    ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
252
253    let host = url.host_str().context("p2p url is missing host")?;
254
255    let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
256
257    Ok(format!("{host}:{port}"))
258}
259
260#[derive(Debug, Clone)]
261pub struct IrohConnector {
262    /// Map of all peers' connection information we want to be connected to
263    pub node_ids: BTreeMap<PeerId, NodeId>,
264    /// The Iroh endpoint
265    pub endpoint: Endpoint,
266    /// List of overrides to use when attempting to connect to given `NodeId`
267    ///
268    /// This is useful for testing, or forcing non-default network connectivity.
269    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
270}
271
272const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
273
274impl IrohConnector {
275    pub async fn new(
276        secret_key: SecretKey,
277        p2p_bind_addr: SocketAddr,
278        iroh_dns: Option<SafeUrl>,
279        iroh_relays: Vec<SafeUrl>,
280        node_ids: BTreeMap<PeerId, NodeId>,
281    ) -> anyhow::Result<Self> {
282        let mut s =
283            Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
284                .await?;
285
286        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
287            s = s.with_connection_override(k, v.into());
288        }
289
290        Ok(s)
291    }
292
293    pub async fn new_no_overrides(
294        secret_key: SecretKey,
295        bind_addr: SocketAddr,
296        iroh_dns: Option<SafeUrl>,
297        iroh_relays: Vec<SafeUrl>,
298        node_ids: BTreeMap<PeerId, NodeId>,
299    ) -> anyhow::Result<Self> {
300        let identity = *node_ids
301            .iter()
302            .find(|entry| entry.1 == &secret_key.public())
303            .expect("Our public key is not part of the keyset")
304            .0;
305
306        let endpoint = build_iroh_endpoint(
307            secret_key,
308            bind_addr,
309            iroh_dns,
310            iroh_relays,
311            FEDIMINT_P2P_ALPN,
312        )
313        .await?;
314
315        Ok(Self {
316            node_ids: node_ids
317                .into_iter()
318                .filter(|entry| entry.0 != identity)
319                .collect(),
320            endpoint,
321            connection_overrides: BTreeMap::default(),
322        })
323    }
324
325    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
326        self.connection_overrides.insert(node, addr);
327        self
328    }
329}
330
331#[async_trait]
332impl<M> IP2PConnector<M> for IrohConnector
333where
334    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
335{
336    fn peers(&self) -> Vec<PeerId> {
337        self.node_ids.keys().copied().collect()
338    }
339
340    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
341        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
342
343        let connection = match self.connection_overrides.get(&node_id) {
344            Some(node_addr) => {
345                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
346                self.endpoint
347                    .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
348                    .await?
349            }
350            None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
351        };
352
353        Ok(connection.into_dyn())
354    }
355
356    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
357        let connection = self
358            .endpoint
359            .accept()
360            .await
361            .context("Listener closed unexpectedly")?
362            .accept()?
363            .await?;
364
365        let node_id = connection.remote_node_id()?;
366
367        let auth_peer = self
368            .node_ids
369            .iter()
370            .find(|entry| entry.1 == &node_id)
371            .with_context(|| format!("Node id {node_id} is unknown"))?
372            .0;
373
374        Ok((*auth_peer, connection.into_dyn()))
375    }
376
377    async fn connection_type(&self, peer: PeerId) -> ConnectionType {
378        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
379
380        // Try to get connection information from Iroh endpoint
381        let conn_type_watcher = if let Ok(watcher) = self.endpoint.conn_type(node_id) {
382            watcher
383        } else {
384            // If conn_type returns None, return Unknown
385            return ConnectionType::Unknown;
386        };
387
388        let conn_type = if let Ok(conn_type) = conn_type_watcher.get() {
389            conn_type
390        } else {
391            // If we can't get the connection type, return Unknown
392            return ConnectionType::Unknown;
393        };
394
395        match conn_type {
396            iroh::endpoint::ConnectionType::Relay(_) => ConnectionType::Relay,
397            iroh::endpoint::ConnectionType::Direct(_)
398            | iroh::endpoint::ConnectionType::Mixed(_, _) => ConnectionType::Direct, /* Mixed connections include direct, so consider as Direct */
399            iroh::endpoint::ConnectionType::None => ConnectionType::Unknown,
400        }
401    }
402}