fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connector interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
15use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
16use fedimint_core::util::SafeUrl;
17use fedimint_logging::LOG_NET_IROH;
18use iroh::defaults::DEFAULT_STUN_PORT;
19use iroh::discovery::pkarr::{N0_DNS_PKARR_RELAY_PROD, PkarrPublisher, PkarrResolver};
20use iroh::{Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, RelayNode, RelayUrl, SecretKey};
21use iroh_base::ticket::NodeTicket;
22use iroh_relay::RelayQuicConfig;
23use rustls::ServerName;
24use serde::Serialize;
25use serde::de::DeserializeOwned;
26use tokio::net::{TcpListener, TcpStream};
27use tokio_rustls::rustls::RootCertStore;
28use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
29use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
30use tokio_util::codec::LengthDelimitedCodec;
31use tracing::{info, trace};
32
33use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
34
35pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
36
37/// Allows to connect to peers and to listen for incoming connections.
38/// Connections are message based and should be authenticated and encrypted for
39/// production deployments.
40#[async_trait]
41pub trait IP2PConnector<M>: Send + Sync + 'static {
42    fn peers(&self) -> Vec<PeerId>;
43
44    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
45
46    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
47
48    fn into_dyn(self) -> DynP2PConnector<M>
49    where
50        Self: Sized,
51    {
52        Arc::new(self)
53    }
54}
55
56#[derive(Debug, Clone)]
57pub struct TlsConfig {
58    pub private_key: rustls::PrivateKey,
59    pub certificates: BTreeMap<PeerId, rustls::Certificate>,
60    pub peer_names: BTreeMap<PeerId, String>,
61}
62
63/// TCP connector with encryption and authentication
64pub struct TlsTcpConnector {
65    cfg: TlsConfig,
66    peers: BTreeMap<PeerId, SafeUrl>,
67    identity: PeerId,
68    listener: TcpListener,
69    acceptor: TlsAcceptor,
70}
71
72impl TlsTcpConnector {
73    pub async fn new(
74        cfg: TlsConfig,
75        p2p_bind_addr: SocketAddr,
76        peers: BTreeMap<PeerId, PeerUrl>,
77        identity: PeerId,
78    ) -> TlsTcpConnector {
79        let mut root_cert_store = RootCertStore::empty();
80
81        for cert in cfg.certificates.values() {
82            root_cert_store
83                .add(cert)
84                .expect("Could not add peer certificate");
85        }
86
87        let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
88
89        let certificate = cfg
90            .certificates
91            .get(&identity)
92            .expect("No certificate for ourself found")
93            .clone();
94
95        let config = rustls::ServerConfig::builder()
96            .with_safe_defaults()
97            .with_client_cert_verifier(Arc::from(verifier))
98            .with_single_cert(vec![certificate], cfg.private_key.clone())
99            .expect("Failed to create TLS config");
100
101        let listener = TcpListener::bind(p2p_bind_addr)
102            .await
103            .expect("Could not bind to port");
104
105        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
106
107        TlsTcpConnector {
108            cfg,
109            peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
110            identity,
111            listener,
112            acceptor,
113        }
114    }
115}
116
117#[async_trait]
118impl<M> IP2PConnector<M> for TlsTcpConnector
119where
120    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
121{
122    fn peers(&self) -> Vec<PeerId> {
123        self.peers
124            .keys()
125            .filter(|peer| **peer != self.identity)
126            .copied()
127            .collect()
128    }
129
130    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
131        let mut root_cert_store = RootCertStore::empty();
132
133        for cert in self.cfg.certificates.values() {
134            root_cert_store
135                .add(cert)
136                .expect("Could not add peer certificate");
137        }
138
139        let certificate = self
140            .cfg
141            .certificates
142            .get(&self.identity)
143            .expect("No certificate for ourself found")
144            .clone();
145
146        let cfg = rustls::ClientConfig::builder()
147            .with_safe_defaults()
148            .with_root_certificates(root_cert_store)
149            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
150            .expect("Failed to create TLS config");
151
152        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
153            .expect("Always a valid DNS name");
154
155        let destination = self.peers.get(&peer).expect("No url for peer");
156
157        let tls = TlsConnector::from(Arc::new(cfg))
158            .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
159            .await?;
160
161        let certificate = tls
162            .get_ref()
163            .1
164            .peer_certificates()
165            .context("Peer did not authenticate itself")?
166            .first()
167            .context("Received certificate chain of length zero")?;
168
169        let auth_peer = self
170            .cfg
171            .certificates
172            .iter()
173            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
174            .context("Unknown certificate")?;
175
176        ensure!(auth_peer == peer, "Connected to unexpected peer");
177
178        let framed = LengthDelimitedCodec::builder()
179            .length_field_type::<u64>()
180            .new_framed(TlsStream::Client(tls));
181
182        Ok(framed.into_dyn())
183    }
184
185    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
186        let tls = self
187            .acceptor
188            .accept(self.listener.accept().await?.0)
189            .await?;
190
191        let certificate = tls
192            .get_ref()
193            .1
194            .peer_certificates()
195            .context("Peer did not authenticate itself")?
196            .first()
197            .context("Received certificate chain of length zero")?;
198
199        let auth_peer = self
200            .cfg
201            .certificates
202            .iter()
203            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
204            .context("Unknown certificate")?;
205
206        let framed = LengthDelimitedCodec::builder()
207            .length_field_type::<u64>()
208            .new_framed(TlsStream::Server(tls));
209
210        Ok((auth_peer, framed.into_dyn()))
211    }
212}
213
214pub fn gen_cert_and_key(
215    name: &str,
216) -> Result<(rustls::Certificate, rustls::PrivateKey), anyhow::Error> {
217    let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
218
219    Ok((
220        rustls::Certificate(cert_key.cert.der().to_vec()),
221        rustls::PrivateKey(cert_key.key_pair.serialize_der()),
222    ))
223}
224
225/// Sanitizes name as valid domain name
226pub fn dns_sanitize(name: &str) -> String {
227    format!(
228        "peer{}",
229        name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
230    )
231}
232
233/// Parses the host and port from a url
234pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
235    ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
236
237    let host = url.host_str().context("p2p url is missing host")?;
238
239    let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
240
241    Ok(format!("{host}:{port}"))
242}
243
244#[derive(Debug, Clone)]
245pub struct IrohConnector {
246    /// Map of all peers' connection information we want to be connected to
247    pub node_ids: BTreeMap<PeerId, NodeId>,
248    /// The Iroh endpoint
249    pub endpoint: Endpoint,
250    /// List of overrides to use when attempting to connect to given `NodeId`
251    ///
252    /// This is useful for testing, or forcing non-default network connectivity.
253    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
254}
255
256const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
257
258impl IrohConnector {
259    pub async fn new(
260        secret_key: SecretKey,
261        p2p_bind_addr: SocketAddr,
262        iroh_dns: Option<SafeUrl>,
263        iroh_relays: Vec<SafeUrl>,
264        node_ids: BTreeMap<PeerId, NodeId>,
265    ) -> anyhow::Result<Self> {
266        let mut s =
267            Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
268                .await?;
269
270        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
271            s = s.with_connection_override(k, v.into());
272        }
273
274        Ok(s)
275    }
276
277    pub async fn new_no_overrides(
278        secret_key: SecretKey,
279        bind_addr: SocketAddr,
280        iroh_dns: Option<SafeUrl>,
281        iroh_relays: Vec<SafeUrl>,
282        node_ids: BTreeMap<PeerId, NodeId>,
283    ) -> anyhow::Result<Self> {
284        let identity = *node_ids
285            .iter()
286            .find(|entry| entry.1 == &secret_key.public())
287            .expect("Our public key is not part of the keyset")
288            .0;
289
290        let endpoint = build_iroh_endpoint(
291            secret_key,
292            bind_addr,
293            iroh_dns,
294            iroh_relays,
295            FEDIMINT_P2P_ALPN,
296        )
297        .await?;
298
299        Ok(Self {
300            node_ids: node_ids
301                .into_iter()
302                .filter(|entry| entry.0 != identity)
303                .collect(),
304            endpoint,
305            connection_overrides: BTreeMap::default(),
306        })
307    }
308
309    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
310        self.connection_overrides.insert(node, addr);
311        self
312    }
313}
314
315pub(crate) async fn build_iroh_endpoint(
316    secret_key: SecretKey,
317    bind_addr: SocketAddr,
318    iroh_dns: Option<SafeUrl>,
319    iroh_relays: Vec<SafeUrl>,
320    alpn: &[u8],
321) -> Result<Endpoint, anyhow::Error> {
322    let iroh_dns = iroh_dns.clone().map_or(
323        N0_DNS_PKARR_RELAY_PROD.parse().expect("Can't fail"),
324        SafeUrl::to_unsafe,
325    );
326
327    let relay_mode = if iroh_relays.is_empty() {
328        RelayMode::Default
329    } else {
330        RelayMode::Custom(RelayMap::from_nodes(iroh_relays.into_iter().map(|url| {
331            RelayNode {
332                url: RelayUrl::from(url.to_unsafe()),
333                stun_only: false,
334                stun_port: DEFAULT_STUN_PORT,
335                quic: Some(RelayQuicConfig::default()),
336            }
337        }))?)
338    };
339
340    let builder = Endpoint::builder()
341        .add_discovery({
342            let iroh_dns = iroh_dns.clone();
343            move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
344        })
345        .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)))
346        .discovery_dht()
347        .relay_mode(relay_mode)
348        .secret_key(secret_key)
349        .alpns(vec![alpn.to_vec()]);
350
351    let builder = match bind_addr {
352        SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
353        SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
354    };
355
356    let endpoint = builder.bind().await.expect("Could not bind to port");
357
358    info!(
359        target: LOG_NET_IROH,
360        %bind_addr,
361        node_id = %endpoint.node_id(),
362        node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
363        "Iroh p2p server endpoint"
364    );
365
366    Ok(endpoint)
367}
368
369#[async_trait]
370impl<M> IP2PConnector<M> for IrohConnector
371where
372    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
373{
374    fn peers(&self) -> Vec<PeerId> {
375        self.node_ids.keys().copied().collect()
376    }
377
378    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
379        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
380
381        let connection = match self.connection_overrides.get(&node_id) {
382            Some(node_addr) => {
383                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
384                self.endpoint
385                    .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
386                    .await?
387            }
388            None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
389        };
390
391        Ok(connection.into_dyn())
392    }
393
394    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
395        let connection = self
396            .endpoint
397            .accept()
398            .await
399            .context("Listener closed unexpectedly")?
400            .accept()?
401            .await?;
402
403        let node_id = connection.remote_node_id()?;
404
405        let auth_peer = self
406            .node_ids
407            .iter()
408            .find(|entry| entry.1 == &node_id)
409            .with_context(|| format!("Node id {node_id} is unknown"))?
410            .0;
411
412        Ok((*auth_peer, connection.into_dyn()))
413    }
414}