fedimint_server/net/
p2p_connector.rs

1//! Provides an abstract network connector interface and multiple
2//! implementations
3
4use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
15use fedimint_core::iroh_prod::{FM_DNS_PKARR_RELAY_PROD, FM_IROH_RELAYS_PROD};
16use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
17use fedimint_core::util::SafeUrl;
18use fedimint_logging::LOG_NET_IROH;
19use iroh::defaults::DEFAULT_STUN_PORT;
20use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
21use iroh::{Endpoint, NodeAddr, NodeId, RelayMode, RelayNode, RelayUrl, SecretKey};
22use iroh_base::ticket::NodeTicket;
23use iroh_relay::RelayQuicConfig;
24use rustls::ServerName;
25use serde::Serialize;
26use serde::de::DeserializeOwned;
27use tokio::net::{TcpListener, TcpStream};
28use tokio_rustls::rustls::RootCertStore;
29use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
30use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
31use tokio_util::codec::LengthDelimitedCodec;
32use tracing::{info, trace};
33use url::Url;
34
35use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
36
37pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
38
39/// Allows to connect to peers and to listen for incoming connections.
40/// Connections are message based and should be authenticated and encrypted for
41/// production deployments.
42#[async_trait]
43pub trait IP2PConnector<M>: Send + Sync + 'static {
44    fn peers(&self) -> Vec<PeerId>;
45
46    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
47
48    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
49
50    fn into_dyn(self) -> DynP2PConnector<M>
51    where
52        Self: Sized,
53    {
54        Arc::new(self)
55    }
56}
57
58#[derive(Debug, Clone)]
59pub struct TlsConfig {
60    pub private_key: rustls::PrivateKey,
61    pub certificates: BTreeMap<PeerId, rustls::Certificate>,
62    pub peer_names: BTreeMap<PeerId, String>,
63}
64
65/// TCP connector with encryption and authentication
66pub struct TlsTcpConnector {
67    cfg: TlsConfig,
68    peers: BTreeMap<PeerId, SafeUrl>,
69    identity: PeerId,
70    listener: TcpListener,
71    acceptor: TlsAcceptor,
72}
73
74impl TlsTcpConnector {
75    pub async fn new(
76        cfg: TlsConfig,
77        p2p_bind_addr: SocketAddr,
78        peers: BTreeMap<PeerId, PeerUrl>,
79        identity: PeerId,
80    ) -> TlsTcpConnector {
81        let mut root_cert_store = RootCertStore::empty();
82
83        for cert in cfg.certificates.values() {
84            root_cert_store
85                .add(cert)
86                .expect("Could not add peer certificate");
87        }
88
89        let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
90
91        let certificate = cfg
92            .certificates
93            .get(&identity)
94            .expect("No certificate for ourself found")
95            .clone();
96
97        let config = rustls::ServerConfig::builder()
98            .with_safe_defaults()
99            .with_client_cert_verifier(Arc::from(verifier))
100            .with_single_cert(vec![certificate], cfg.private_key.clone())
101            .expect("Failed to create TLS config");
102
103        let listener = TcpListener::bind(p2p_bind_addr)
104            .await
105            .expect("Could not bind to port");
106
107        let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
108
109        TlsTcpConnector {
110            cfg,
111            peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
112            identity,
113            listener,
114            acceptor,
115        }
116    }
117}
118
119#[async_trait]
120impl<M> IP2PConnector<M> for TlsTcpConnector
121where
122    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
123{
124    fn peers(&self) -> Vec<PeerId> {
125        self.peers
126            .keys()
127            .filter(|peer| **peer != self.identity)
128            .copied()
129            .collect()
130    }
131
132    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
133        let mut root_cert_store = RootCertStore::empty();
134
135        for cert in self.cfg.certificates.values() {
136            root_cert_store
137                .add(cert)
138                .expect("Could not add peer certificate");
139        }
140
141        let certificate = self
142            .cfg
143            .certificates
144            .get(&self.identity)
145            .expect("No certificate for ourself found")
146            .clone();
147
148        let cfg = rustls::ClientConfig::builder()
149            .with_safe_defaults()
150            .with_root_certificates(root_cert_store)
151            .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
152            .expect("Failed to create TLS config");
153
154        let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
155            .expect("Always a valid DNS name");
156
157        let destination = self.peers.get(&peer).expect("No url for peer");
158
159        let tls = TlsConnector::from(Arc::new(cfg))
160            .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
161            .await?;
162
163        let certificate = tls
164            .get_ref()
165            .1
166            .peer_certificates()
167            .context("Peer did not authenticate itself")?
168            .first()
169            .context("Received certificate chain of length zero")?;
170
171        let auth_peer = self
172            .cfg
173            .certificates
174            .iter()
175            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
176            .context("Unknown certificate")?;
177
178        ensure!(auth_peer == peer, "Connected to unexpected peer");
179
180        let framed = LengthDelimitedCodec::builder()
181            .length_field_type::<u64>()
182            .new_framed(TlsStream::Client(tls));
183
184        Ok(framed.into_dyn())
185    }
186
187    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
188        let tls = self
189            .acceptor
190            .accept(self.listener.accept().await?.0)
191            .await?;
192
193        let certificate = tls
194            .get_ref()
195            .1
196            .peer_certificates()
197            .context("Peer did not authenticate itself")?
198            .first()
199            .context("Received certificate chain of length zero")?;
200
201        let auth_peer = self
202            .cfg
203            .certificates
204            .iter()
205            .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
206            .context("Unknown certificate")?;
207
208        let framed = LengthDelimitedCodec::builder()
209            .length_field_type::<u64>()
210            .new_framed(TlsStream::Server(tls));
211
212        Ok((auth_peer, framed.into_dyn()))
213    }
214}
215
216pub fn gen_cert_and_key(
217    name: &str,
218) -> Result<(rustls::Certificate, rustls::PrivateKey), anyhow::Error> {
219    let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
220
221    Ok((
222        rustls::Certificate(cert_key.cert.der().to_vec()),
223        rustls::PrivateKey(cert_key.key_pair.serialize_der()),
224    ))
225}
226
227/// Sanitizes name as valid domain name
228pub fn dns_sanitize(name: &str) -> String {
229    format!(
230        "peer{}",
231        name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
232    )
233}
234
235/// Parses the host and port from a url
236pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
237    ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
238
239    let host = url.host_str().context("p2p url is missing host")?;
240
241    let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
242
243    Ok(format!("{host}:{port}"))
244}
245
246#[derive(Debug, Clone)]
247pub struct IrohConnector {
248    /// Map of all peers' connection information we want to be connected to
249    pub node_ids: BTreeMap<PeerId, NodeId>,
250    /// The Iroh endpoint
251    pub endpoint: Endpoint,
252    /// List of overrides to use when attempting to connect to given `NodeId`
253    ///
254    /// This is useful for testing, or forcing non-default network connectivity.
255    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
256}
257
258const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
259
260impl IrohConnector {
261    pub async fn new(
262        secret_key: SecretKey,
263        p2p_bind_addr: SocketAddr,
264        iroh_dns: Option<SafeUrl>,
265        iroh_relays: Vec<SafeUrl>,
266        node_ids: BTreeMap<PeerId, NodeId>,
267    ) -> anyhow::Result<Self> {
268        let mut s =
269            Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
270                .await?;
271
272        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
273            s = s.with_connection_override(k, v.into());
274        }
275
276        Ok(s)
277    }
278
279    pub async fn new_no_overrides(
280        secret_key: SecretKey,
281        bind_addr: SocketAddr,
282        iroh_dns: Option<SafeUrl>,
283        iroh_relays: Vec<SafeUrl>,
284        node_ids: BTreeMap<PeerId, NodeId>,
285    ) -> anyhow::Result<Self> {
286        let identity = *node_ids
287            .iter()
288            .find(|entry| entry.1 == &secret_key.public())
289            .expect("Our public key is not part of the keyset")
290            .0;
291
292        let endpoint = build_iroh_endpoint(
293            secret_key,
294            bind_addr,
295            iroh_dns,
296            iroh_relays,
297            FEDIMINT_P2P_ALPN,
298        )
299        .await?;
300
301        Ok(Self {
302            node_ids: node_ids
303                .into_iter()
304                .filter(|entry| entry.0 != identity)
305                .collect(),
306            endpoint,
307            connection_overrides: BTreeMap::default(),
308        })
309    }
310
311    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
312        self.connection_overrides.insert(node, addr);
313        self
314    }
315}
316
317pub(crate) async fn build_iroh_endpoint(
318    secret_key: SecretKey,
319    bind_addr: SocketAddr,
320    iroh_dns: Option<SafeUrl>,
321    iroh_relays: Vec<SafeUrl>,
322    alpn: &[u8],
323) -> Result<Endpoint, anyhow::Error> {
324    let iroh_dns_servers: Vec<_> = iroh_dns.clone().map_or_else(
325        || {
326            FM_DNS_PKARR_RELAY_PROD
327                .into_iter()
328                .map(|dns| dns.parse().expect("Can't fail"))
329                .collect()
330        },
331        |iroh_dns| vec![iroh_dns.to_unsafe()],
332    );
333
334    let relay_mode = if iroh_relays.is_empty() {
335        RelayMode::Custom(
336            FM_IROH_RELAYS_PROD
337                .into_iter()
338                .map(|url| RelayNode {
339                    url: RelayUrl::from(Url::parse(url).expect("Hardcoded, can't fail")),
340                    stun_only: false,
341                    stun_port: DEFAULT_STUN_PORT,
342                    quic: Some(RelayQuicConfig::default()),
343                })
344                .collect(),
345        )
346    } else {
347        RelayMode::Custom(
348            iroh_relays
349                .into_iter()
350                .map(|url| RelayNode {
351                    url: RelayUrl::from(url.to_unsafe()),
352                    stun_only: false,
353                    stun_port: DEFAULT_STUN_PORT,
354                    quic: Some(RelayQuicConfig::default()),
355                })
356                .collect(),
357        )
358    };
359
360    let mut builder = Endpoint::builder();
361
362    for iroh_dns in iroh_dns_servers {
363        builder = builder
364            .add_discovery({
365                let iroh_dns = iroh_dns.clone();
366                move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
367            })
368            .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
369    }
370
371    builder = builder
372        .discovery_dht()
373        .discovery_n0()
374        .relay_mode(relay_mode)
375        .secret_key(secret_key)
376        .alpns(vec![alpn.to_vec()]);
377
378    let builder = match bind_addr {
379        SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
380        SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
381    };
382
383    let endpoint = builder.bind().await.expect("Could not bind to port");
384
385    info!(
386        target: LOG_NET_IROH,
387        %bind_addr,
388        node_id = %endpoint.node_id(),
389        node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
390        "Iroh p2p server endpoint"
391    );
392
393    Ok(endpoint)
394}
395
396#[async_trait]
397impl<M> IP2PConnector<M> for IrohConnector
398where
399    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
400{
401    fn peers(&self) -> Vec<PeerId> {
402        self.node_ids.keys().copied().collect()
403    }
404
405    async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
406        let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
407
408        let connection = match self.connection_overrides.get(&node_id) {
409            Some(node_addr) => {
410                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
411                self.endpoint
412                    .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
413                    .await?
414            }
415            None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
416        };
417
418        Ok(connection.into_dyn())
419    }
420
421    async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
422        let connection = self
423            .endpoint
424            .accept()
425            .await
426            .context("Listener closed unexpectedly")?
427            .accept()?
428            .await?;
429
430        let node_id = connection.remote_node_id()?;
431
432        let auth_peer = self
433            .node_ids
434            .iter()
435            .find(|entry| entry.1 == &node_id)
436            .with_context(|| format!("Node id {node_id} is unknown"))?
437            .0;
438
439        Ok((*auth_peer, connection.into_dyn()))
440    }
441}