1use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
15use fedimint_core::iroh_prod::{FM_DNS_PKARR_RELAY_PROD, FM_IROH_RELAYS_PROD};
16use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
17use fedimint_core::util::SafeUrl;
18use fedimint_logging::LOG_NET_IROH;
19use iroh::defaults::DEFAULT_STUN_PORT;
20use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
21use iroh::{Endpoint, NodeAddr, NodeId, RelayMode, RelayNode, RelayUrl, SecretKey};
22use iroh_base::ticket::NodeTicket;
23use iroh_relay::RelayQuicConfig;
24use rustls::pki_types::ServerName;
25use serde::Serialize;
26use serde::de::DeserializeOwned;
27use tokio::net::{TcpListener, TcpStream};
28use tokio_rustls::rustls::RootCertStore;
29use tokio_rustls::rustls::server::WebPkiClientVerifier;
30use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
31use tokio_util::codec::LengthDelimitedCodec;
32use tracing::{info, trace};
33use url::Url;
34
35use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
36
37pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
38
39#[async_trait]
43pub trait IP2PConnector<M>: Send + Sync + 'static {
44 fn peers(&self) -> Vec<PeerId>;
45
46 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
47
48 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
49
50 fn into_dyn(self) -> DynP2PConnector<M>
51 where
52 Self: Sized,
53 {
54 Arc::new(self)
55 }
56}
57
58#[derive(Debug, Clone)]
59pub struct TlsConfig {
60 pub private_key: Arc<rustls::pki_types::PrivateKeyDer<'static>>,
61 pub certificates: BTreeMap<PeerId, rustls::pki_types::CertificateDer<'static>>,
62 pub peer_names: BTreeMap<PeerId, String>,
63}
64
65pub struct TlsTcpConnector {
67 cfg: TlsConfig,
68 peers: BTreeMap<PeerId, SafeUrl>,
69 identity: PeerId,
70 listener: TcpListener,
71 acceptor: TlsAcceptor,
72}
73
74impl TlsTcpConnector {
75 pub async fn new(
76 cfg: TlsConfig,
77 p2p_bind_addr: SocketAddr,
78 peers: BTreeMap<PeerId, PeerUrl>,
79 identity: PeerId,
80 ) -> TlsTcpConnector {
81 let mut root_cert_store = RootCertStore::empty();
82
83 for cert in cfg.certificates.values() {
84 root_cert_store
85 .add(cert.clone())
86 .expect("Could not add peer certificate");
87 }
88
89 let verifier = WebPkiClientVerifier::builder(root_cert_store.into())
90 .build()
91 .expect("Failed to create client verifier");
92
93 let certificate = cfg
94 .certificates
95 .get(&identity)
96 .expect("No certificate for ourself found")
97 .clone();
98
99 let config = rustls::ServerConfig::builder()
100 .with_client_cert_verifier(verifier)
101 .with_single_cert(vec![certificate], cfg.private_key.clone_key())
102 .expect("Failed to create TLS config");
103
104 let listener = TcpListener::bind(p2p_bind_addr)
105 .await
106 .expect("Could not bind to port");
107
108 let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
109
110 TlsTcpConnector {
111 cfg,
112 peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
113 identity,
114 listener,
115 acceptor,
116 }
117 }
118}
119
120#[async_trait]
121impl<M> IP2PConnector<M> for TlsTcpConnector
122where
123 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
124{
125 fn peers(&self) -> Vec<PeerId> {
126 self.peers
127 .keys()
128 .filter(|peer| **peer != self.identity)
129 .copied()
130 .collect()
131 }
132
133 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
134 let mut root_cert_store = RootCertStore::empty();
135
136 for cert in self.cfg.certificates.values() {
137 root_cert_store
138 .add(cert.clone())
139 .expect("Could not add peer certificate");
140 }
141
142 let certificate = self
143 .cfg
144 .certificates
145 .get(&self.identity)
146 .expect("No certificate for ourself found")
147 .clone();
148
149 let cfg = rustls::ClientConfig::builder()
150 .with_root_certificates(root_cert_store)
151 .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone_key())
152 .expect("Failed to create TLS config");
153
154 let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]))
155 .expect("Always a valid DNS name");
156
157 let destination = self.peers.get(&peer).expect("No url for peer");
158
159 let tls = TlsConnector::from(Arc::new(cfg))
160 .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
161 .await?;
162
163 let certificate = tls
164 .get_ref()
165 .1
166 .peer_certificates()
167 .context("Peer did not authenticate itself")?
168 .first()
169 .context("Received certificate chain of length zero")?;
170
171 let auth_peer = self
172 .cfg
173 .certificates
174 .iter()
175 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
176 .context("Unknown certificate")?;
177
178 ensure!(auth_peer == peer, "Connected to unexpected peer");
179
180 let framed = LengthDelimitedCodec::builder()
181 .length_field_type::<u64>()
182 .new_framed(TlsStream::Client(tls));
183
184 Ok(framed.into_dyn())
185 }
186
187 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
188 let tls = self
189 .acceptor
190 .accept(self.listener.accept().await?.0)
191 .await?;
192
193 let certificate = tls
194 .get_ref()
195 .1
196 .peer_certificates()
197 .context("Peer did not authenticate itself")?
198 .first()
199 .context("Received certificate chain of length zero")?;
200
201 let auth_peer = self
202 .cfg
203 .certificates
204 .iter()
205 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
206 .context("Unknown certificate")?;
207
208 let framed = LengthDelimitedCodec::builder()
209 .length_field_type::<u64>()
210 .new_framed(TlsStream::Server(tls));
211
212 Ok((auth_peer, framed.into_dyn()))
213 }
214}
215
216pub fn gen_cert_and_key(
217 name: &str,
218) -> Result<
219 (
220 rustls::pki_types::CertificateDer<'static>,
221 Arc<rustls::pki_types::PrivateKeyDer<'static>>,
222 ),
223 anyhow::Error,
224> {
225 let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
226
227 Ok((
228 rustls::pki_types::CertificateDer::from(cert_key.cert.der().to_vec()),
229 Arc::new(
230 rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der())
231 .expect("Failed to create private key"),
232 ),
233 ))
234}
235
236pub fn dns_sanitize(name: &str) -> String {
238 format!(
239 "peer{}",
240 name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
241 )
242}
243
244pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
246 ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
247
248 let host = url.host_str().context("p2p url is missing host")?;
249
250 let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
251
252 Ok(format!("{host}:{port}"))
253}
254
255#[derive(Debug, Clone)]
256pub struct IrohConnector {
257 pub node_ids: BTreeMap<PeerId, NodeId>,
259 pub endpoint: Endpoint,
261 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
265}
266
267const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
268
269impl IrohConnector {
270 pub async fn new(
271 secret_key: SecretKey,
272 p2p_bind_addr: SocketAddr,
273 iroh_dns: Option<SafeUrl>,
274 iroh_relays: Vec<SafeUrl>,
275 node_ids: BTreeMap<PeerId, NodeId>,
276 ) -> anyhow::Result<Self> {
277 let mut s =
278 Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
279 .await?;
280
281 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
282 s = s.with_connection_override(k, v.into());
283 }
284
285 Ok(s)
286 }
287
288 pub async fn new_no_overrides(
289 secret_key: SecretKey,
290 bind_addr: SocketAddr,
291 iroh_dns: Option<SafeUrl>,
292 iroh_relays: Vec<SafeUrl>,
293 node_ids: BTreeMap<PeerId, NodeId>,
294 ) -> anyhow::Result<Self> {
295 let identity = *node_ids
296 .iter()
297 .find(|entry| entry.1 == &secret_key.public())
298 .expect("Our public key is not part of the keyset")
299 .0;
300
301 let endpoint = build_iroh_endpoint(
302 secret_key,
303 bind_addr,
304 iroh_dns,
305 iroh_relays,
306 FEDIMINT_P2P_ALPN,
307 )
308 .await?;
309
310 Ok(Self {
311 node_ids: node_ids
312 .into_iter()
313 .filter(|entry| entry.0 != identity)
314 .collect(),
315 endpoint,
316 connection_overrides: BTreeMap::default(),
317 })
318 }
319
320 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
321 self.connection_overrides.insert(node, addr);
322 self
323 }
324}
325
326pub(crate) async fn build_iroh_endpoint(
327 secret_key: SecretKey,
328 bind_addr: SocketAddr,
329 iroh_dns: Option<SafeUrl>,
330 iroh_relays: Vec<SafeUrl>,
331 alpn: &[u8],
332) -> Result<Endpoint, anyhow::Error> {
333 let iroh_dns_servers: Vec<_> = iroh_dns.clone().map_or_else(
334 || {
335 FM_DNS_PKARR_RELAY_PROD
336 .into_iter()
337 .map(|dns| dns.parse().expect("Can't fail"))
338 .collect()
339 },
340 |iroh_dns| vec![iroh_dns.to_unsafe()],
341 );
342
343 let relay_mode = if iroh_relays.is_empty() {
344 RelayMode::Custom(
345 FM_IROH_RELAYS_PROD
346 .into_iter()
347 .map(|url| RelayNode {
348 url: RelayUrl::from(Url::parse(url).expect("Hardcoded, can't fail")),
349 stun_only: false,
350 stun_port: DEFAULT_STUN_PORT,
351 quic: Some(RelayQuicConfig::default()),
352 })
353 .collect(),
354 )
355 } else {
356 RelayMode::Custom(
357 iroh_relays
358 .into_iter()
359 .map(|url| RelayNode {
360 url: RelayUrl::from(url.to_unsafe()),
361 stun_only: false,
362 stun_port: DEFAULT_STUN_PORT,
363 quic: Some(RelayQuicConfig::default()),
364 })
365 .collect(),
366 )
367 };
368
369 let mut builder = Endpoint::builder();
370
371 for iroh_dns in iroh_dns_servers {
372 builder = builder
373 .add_discovery({
374 let iroh_dns = iroh_dns.clone();
375 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
376 })
377 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
378 }
379
380 builder = builder
381 .discovery_dht()
382 .discovery_n0()
383 .relay_mode(relay_mode)
384 .secret_key(secret_key)
385 .alpns(vec![alpn.to_vec()]);
386
387 let builder = match bind_addr {
388 SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
389 SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
390 };
391
392 let endpoint = builder.bind().await.expect("Could not bind to port");
393
394 info!(
395 target: LOG_NET_IROH,
396 %bind_addr,
397 node_id = %endpoint.node_id(),
398 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
399 "Iroh p2p server endpoint"
400 );
401
402 Ok(endpoint)
403}
404
405#[async_trait]
406impl<M> IP2PConnector<M> for IrohConnector
407where
408 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
409{
410 fn peers(&self) -> Vec<PeerId> {
411 self.node_ids.keys().copied().collect()
412 }
413
414 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
415 let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
416
417 let connection = match self.connection_overrides.get(&node_id) {
418 Some(node_addr) => {
419 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
420 self.endpoint
421 .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
422 .await?
423 }
424 None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
425 };
426
427 Ok(connection.into_dyn())
428 }
429
430 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
431 let connection = self
432 .endpoint
433 .accept()
434 .await
435 .context("Listener closed unexpectedly")?
436 .accept()?
437 .await?;
438
439 let node_id = connection.remote_node_id()?;
440
441 let auth_peer = self
442 .node_ids
443 .iter()
444 .find(|entry| entry.1 == &node_id)
445 .with_context(|| format!("Node id {node_id} is unknown"))?
446 .0;
447
448 Ok((*auth_peer, connection.into_dyn()))
449 }
450}