fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connector interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{
15    FM_IROH_CONNECT_OVERRIDES_ENV, FM_IROH_ENABLE_DHT_ENV, is_env_var_disabled,
16    parse_kv_list_from_env,
17};
18use fedimint_core::iroh_prod::{FM_IROH_DNS_FEDIMINT_PROD, FM_IROH_RELAYS_FEDIMINT_PROD};
19use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
20use fedimint_core::util::SafeUrl;
21use fedimint_logging::LOG_NET_IROH;
22use fedimint_server_core::dashboard_ui::ConnectionType;
23use iroh::defaults::DEFAULT_STUN_PORT;
24use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
25use iroh::{Endpoint, NodeAddr, NodeId, RelayMode, RelayNode, RelayUrl, SecretKey};
26use iroh_base::ticket::NodeTicket;
27use iroh_relay::RelayQuicConfig;
28use rustls::pki_types::ServerName;
29use serde::Serialize;
30use serde::de::DeserializeOwned;
31use tokio::net::{TcpListener, TcpStream};
32use tokio_rustls::rustls::RootCertStore;
33use tokio_rustls::rustls::server::WebPkiClientVerifier;
34use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
35use tokio_util::codec::LengthDelimitedCodec;
36use tracing::{info, trace};
37use url::Url;
38
39use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
40
41pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
42
43/// Allows to connect to peers and to listen for incoming connections.
44/// Connections are message based and should be authenticated and encrypted for
45/// production deployments.
46#[async_trait]
47pub trait IP2PConnector<M>: Send + Sync + 'static {
48    fn peers(&self) -> Vec<PeerId>;
49
50    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
51
52    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
53
54    /// Get the connection type for a specific peer
55    async fn connection_type(&self, peer: PeerId) -> ConnectionType;
56
57    fn into_dyn(self) -> DynP2PConnector<M>
58    where
59        Self: Sized,
60    {
61        Arc::new(self)
62    }
63}
64
65#[derive(Debug, Clone)]
66pub struct TlsConfig {
67    pub private_key: Arc<rustls::pki_types::PrivateKeyDer<'static>>,
68    pub certificates: BTreeMap<PeerId, rustls::pki_types::CertificateDer<'static>>,
69    pub peer_names: BTreeMap<PeerId, String>,
70}
71
72/// TCP connector with encryption and authentication
73pub struct TlsTcpConnector {
74    cfg: TlsConfig,
75    peers: BTreeMap<PeerId, SafeUrl>,
76    identity: PeerId,
77    listener: TcpListener,
78    acceptor: TlsAcceptor,
79}
80
81impl TlsTcpConnector {
82    pub async fn new(
83        cfg: TlsConfig,
84        p2p_bind_addr: SocketAddr,
85        peers: BTreeMap<PeerId, PeerUrl>,
86        identity: PeerId,
87    ) -> TlsTcpConnector {
88        let mut root_cert_store = RootCertStore::empty();
89
90        for cert in cfg.certificates.values() {
91            root_cert_store
92                .add(cert.clone())
93                .expect("Could not add peer certificate");
94        }
95
96        let verifier = WebPkiClientVerifier::builder(root_cert_store.into())
97            .build()
98            .expect("Failed to create client verifier");
99
100        let certificate = cfg
101            .certificates
102            .get(&identity)
103            .expect("No certificate for ourself found")
104            .clone();
105
106        let config = rustls::ServerConfig::builder()
107            .with_client_cert_verifier(verifier)
108            .with_single_cert(vec![certificate], cfg.private_key.clone_key())
109            .expect("Failed to create TLS config");
110
111        let listener = TcpListener::bind(p2p_bind_addr)
112            .await
113            .expect("Could not bind to port");
114
115        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
116
117        TlsTcpConnector {
118            cfg,
119            peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
120            identity,
121            listener,
122            acceptor,
123        }
124    }
125}
126
127#[async_trait]
128impl<M> IP2PConnector<M> for TlsTcpConnector
129where
130    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
131{
132    fn peers(&self) -> Vec<PeerId> {
133        self.peers
134            .keys()
135            .filter(|peer| **peer != self.identity)
136            .copied()
137            .collect()
138    }
139
140    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
141        let mut root_cert_store = RootCertStore::empty();
142
143        for cert in self.cfg.certificates.values() {
144            root_cert_store
145                .add(cert.clone())
146                .expect("Could not add peer certificate");
147        }
148
149        let certificate = self
150            .cfg
151            .certificates
152            .get(&self.identity)
153            .expect("No certificate for ourself found")
154            .clone();
155
156        let cfg = rustls::ClientConfig::builder()
157            .with_root_certificates(root_cert_store)
158            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone_key())
159            .expect("Failed to create TLS config");
160
161        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]))
162            .expect("Always a valid DNS name");
163
164        let destination = self.peers.get(&peer).expect("No url for peer");
165
166        let tls = TlsConnector::from(Arc::new(cfg))
167            .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
168            .await?;
169
170        let certificate = tls
171            .get_ref()
172            .1
173            .peer_certificates()
174            .context("Peer did not authenticate itself")?
175            .first()
176            .context("Received certificate chain of length zero")?;
177
178        let auth_peer = self
179            .cfg
180            .certificates
181            .iter()
182            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
183            .context("Unknown certificate")?;
184
185        ensure!(auth_peer == peer, "Connected to unexpected peer");
186
187        let framed = LengthDelimitedCodec::builder()
188            .length_field_type::<u64>()
189            .new_framed(TlsStream::Client(tls));
190
191        Ok(framed.into_dyn())
192    }
193
194    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
195        let tls = self
196            .acceptor
197            .accept(self.listener.accept().await?.0)
198            .await?;
199
200        let certificate = tls
201            .get_ref()
202            .1
203            .peer_certificates()
204            .context("Peer did not authenticate itself")?
205            .first()
206            .context("Received certificate chain of length zero")?;
207
208        let auth_peer = self
209            .cfg
210            .certificates
211            .iter()
212            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
213            .context("Unknown certificate")?;
214
215        let framed = LengthDelimitedCodec::builder()
216            .length_field_type::<u64>()
217            .new_framed(TlsStream::Server(tls));
218
219        Ok((auth_peer, framed.into_dyn()))
220    }
221
222    async fn connection_type(&self, _peer: PeerId) -> ConnectionType {
223        // TLS connections are always direct
224        ConnectionType::Direct
225    }
226}
227
228pub fn gen_cert_and_key(
229    name: &str,
230) -> Result<
231    (
232        rustls::pki_types::CertificateDer<'static>,
233        Arc<rustls::pki_types::PrivateKeyDer<'static>>,
234    ),
235    anyhow::Error,
236> {
237    let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
238
239    Ok((
240        rustls::pki_types::CertificateDer::from(cert_key.cert.der().to_vec()),
241        Arc::new(
242            rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der())
243                .expect("Failed to create private key"),
244        ),
245    ))
246}
247
248/// Sanitizes name as valid domain name
249pub fn dns_sanitize(name: &str) -> String {
250    format!(
251        "peer{}",
252        name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
253    )
254}
255
256/// Parses the host and port from a url
257pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
258    ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
259
260    let host = url.host_str().context("p2p url is missing host")?;
261
262    let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
263
264    Ok(format!("{host}:{port}"))
265}
266
267#[derive(Debug, Clone)]
268pub struct IrohConnector {
269    /// Map of all peers' connection information we want to be connected to
270    pub node_ids: BTreeMap<PeerId, NodeId>,
271    /// The Iroh endpoint
272    pub endpoint: Endpoint,
273    /// List of overrides to use when attempting to connect to given `NodeId`
274    ///
275    /// This is useful for testing, or forcing non-default network connectivity.
276    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
277}
278
279const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
280
281impl IrohConnector {
282    pub async fn new(
283        secret_key: SecretKey,
284        p2p_bind_addr: SocketAddr,
285        iroh_dns: Option<SafeUrl>,
286        iroh_relays: Vec<SafeUrl>,
287        node_ids: BTreeMap<PeerId, NodeId>,
288    ) -> anyhow::Result<Self> {
289        let mut s =
290            Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
291                .await?;
292
293        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
294            s = s.with_connection_override(k, v.into());
295        }
296
297        Ok(s)
298    }
299
300    pub async fn new_no_overrides(
301        secret_key: SecretKey,
302        bind_addr: SocketAddr,
303        iroh_dns: Option<SafeUrl>,
304        iroh_relays: Vec<SafeUrl>,
305        node_ids: BTreeMap<PeerId, NodeId>,
306    ) -> anyhow::Result<Self> {
307        let identity = *node_ids
308            .iter()
309            .find(|entry| entry.1 == &secret_key.public())
310            .expect("Our public key is not part of the keyset")
311            .0;
312
313        let endpoint = build_iroh_endpoint(
314            secret_key,
315            bind_addr,
316            iroh_dns,
317            iroh_relays,
318            FEDIMINT_P2P_ALPN,
319        )
320        .await?;
321
322        Ok(Self {
323            node_ids: node_ids
324                .into_iter()
325                .filter(|entry| entry.0 != identity)
326                .collect(),
327            endpoint,
328            connection_overrides: BTreeMap::default(),
329        })
330    }
331
332    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
333        self.connection_overrides.insert(node, addr);
334        self
335    }
336}
337
338pub(crate) async fn build_iroh_endpoint(
339    secret_key: SecretKey,
340    bind_addr: SocketAddr,
341    iroh_dns: Option<SafeUrl>,
342    iroh_relays: Vec<SafeUrl>,
343    alpn: &[u8],
344) -> Result<Endpoint, anyhow::Error> {
345    let iroh_dns_servers: Vec<_> = iroh_dns.clone().map_or_else(
346        || {
347            FM_IROH_DNS_FEDIMINT_PROD
348                .into_iter()
349                .map(|dns| dns.parse().expect("Can't fail"))
350                .collect()
351        },
352        |iroh_dns| vec![iroh_dns.to_unsafe()],
353    );
354
355    let relay_mode = if iroh_relays.is_empty() {
356        RelayMode::Custom(
357            FM_IROH_RELAYS_FEDIMINT_PROD
358                .into_iter()
359                .map(|url| RelayNode {
360                    url: RelayUrl::from(Url::parse(url).expect("Hardcoded, can't fail")),
361                    stun_only: false,
362                    stun_port: DEFAULT_STUN_PORT,
363                    quic: Some(RelayQuicConfig::default()),
364                })
365                .collect(),
366        )
367    } else {
368        RelayMode::Custom(
369            iroh_relays
370                .into_iter()
371                .map(|url| RelayNode {
372                    url: RelayUrl::from(url.to_unsafe()),
373                    stun_only: false,
374                    stun_port: DEFAULT_STUN_PORT,
375                    quic: Some(RelayQuicConfig::default()),
376                })
377                .collect(),
378        )
379    };
380
381    let mut builder = Endpoint::builder();
382
383    for iroh_dns in iroh_dns_servers {
384        builder = builder
385            .add_discovery({
386                let iroh_dns = iroh_dns.clone();
387                move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
388            })
389            .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
390    }
391
392    // See <https://github.com/fedimint/fedimint/issues/7811>
393    let builder = if is_env_var_disabled(FM_IROH_ENABLE_DHT_ENV) {
394        info!(
395            target: LOG_NET_IROH,
396            "Iroh DHT is disabled"
397        );
398        builder
399    } else {
400        builder.discovery_dht()
401    };
402
403    let builder = builder
404        .discovery_n0()
405        .relay_mode(relay_mode)
406        .secret_key(secret_key)
407        .alpns(vec![alpn.to_vec()]);
408
409    let builder = match bind_addr {
410        SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
411        SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
412    };
413
414    let endpoint = builder.bind().await.expect("Could not bind to port");
415
416    info!(
417        target: LOG_NET_IROH,
418        %bind_addr,
419        node_id = %endpoint.node_id(),
420        node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
421        "Iroh p2p server endpoint"
422    );
423
424    Ok(endpoint)
425}
426
427#[async_trait]
428impl<M> IP2PConnector<M> for IrohConnector
429where
430    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
431{
432    fn peers(&self) -> Vec<PeerId> {
433        self.node_ids.keys().copied().collect()
434    }
435
436    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
437        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
438
439        let connection = match self.connection_overrides.get(&node_id) {
440            Some(node_addr) => {
441                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
442                self.endpoint
443                    .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
444                    .await?
445            }
446            None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
447        };
448
449        Ok(connection.into_dyn())
450    }
451
452    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
453        let connection = self
454            .endpoint
455            .accept()
456            .await
457            .context("Listener closed unexpectedly")?
458            .accept()?
459            .await?;
460
461        let node_id = connection.remote_node_id()?;
462
463        let auth_peer = self
464            .node_ids
465            .iter()
466            .find(|entry| entry.1 == &node_id)
467            .with_context(|| format!("Node id {node_id} is unknown"))?
468            .0;
469
470        Ok((*auth_peer, connection.into_dyn()))
471    }
472
473    async fn connection_type(&self, peer: PeerId) -> ConnectionType {
474        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
475
476        // Try to get connection information from Iroh endpoint
477        let conn_type_watcher = if let Ok(watcher) = self.endpoint.conn_type(node_id) {
478            watcher
479        } else {
480            // If conn_type returns None, return Unknown
481            return ConnectionType::Unknown;
482        };
483
484        let conn_type = if let Ok(conn_type) = conn_type_watcher.get() {
485            conn_type
486        } else {
487            // If we can't get the connection type, return Unknown
488            return ConnectionType::Unknown;
489        };
490
491        match conn_type {
492            iroh::endpoint::ConnectionType::Relay(_) => ConnectionType::Relay,
493            iroh::endpoint::ConnectionType::Direct(_)
494            | iroh::endpoint::ConnectionType::Mixed(_, _) => ConnectionType::Direct, /* Mixed connections include direct, so consider as Direct */
495            iroh::endpoint::ConnectionType::None => ConnectionType::Unknown,
496        }
497    }
498}