1use std::collections::BTreeMap;
5use std::fmt::Debug;
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use anyhow::{Context, ensure};
10use async_trait::async_trait;
11use fedimint_core::PeerId;
12use fedimint_core::config::PeerUrl;
13use fedimint_core::encoding::{Decodable, Encodable};
14use fedimint_core::envs::{
15 FM_IROH_CONNECT_OVERRIDES_ENV, FM_IROH_ENABLE_DHT_ENV, is_env_var_disabled,
16 parse_kv_list_from_env,
17};
18use fedimint_core::iroh_prod::{FM_IROH_DNS_FEDIMINT_PROD, FM_IROH_RELAYS_FEDIMINT_PROD};
19use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
20use fedimint_core::util::SafeUrl;
21use fedimint_logging::LOG_NET_IROH;
22use fedimint_server_core::dashboard_ui::ConnectionType;
23use iroh::defaults::DEFAULT_STUN_PORT;
24use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
25use iroh::{Endpoint, NodeAddr, NodeId, RelayMode, RelayNode, RelayUrl, SecretKey};
26use iroh_base::ticket::NodeTicket;
27use iroh_relay::RelayQuicConfig;
28use rustls::pki_types::ServerName;
29use serde::Serialize;
30use serde::de::DeserializeOwned;
31use tokio::net::{TcpListener, TcpStream};
32use tokio_rustls::rustls::RootCertStore;
33use tokio_rustls::rustls::server::WebPkiClientVerifier;
34use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream, rustls};
35use tokio_util::codec::LengthDelimitedCodec;
36use tracing::{info, trace};
37use url::Url;
38
39use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
40
41pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
42
43#[async_trait]
47pub trait IP2PConnector<M>: Send + Sync + 'static {
48 fn peers(&self) -> Vec<PeerId>;
49
50 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
51
52 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)>;
53
54 async fn connection_type(&self, peer: PeerId) -> ConnectionType;
56
57 fn into_dyn(self) -> DynP2PConnector<M>
58 where
59 Self: Sized,
60 {
61 Arc::new(self)
62 }
63}
64
65#[derive(Debug, Clone)]
66pub struct TlsConfig {
67 pub private_key: Arc<rustls::pki_types::PrivateKeyDer<'static>>,
68 pub certificates: BTreeMap<PeerId, rustls::pki_types::CertificateDer<'static>>,
69 pub peer_names: BTreeMap<PeerId, String>,
70}
71
72pub struct TlsTcpConnector {
74 cfg: TlsConfig,
75 peers: BTreeMap<PeerId, SafeUrl>,
76 identity: PeerId,
77 listener: TcpListener,
78 acceptor: TlsAcceptor,
79}
80
81impl TlsTcpConnector {
82 pub async fn new(
83 cfg: TlsConfig,
84 p2p_bind_addr: SocketAddr,
85 peers: BTreeMap<PeerId, PeerUrl>,
86 identity: PeerId,
87 ) -> TlsTcpConnector {
88 let mut root_cert_store = RootCertStore::empty();
89
90 for cert in cfg.certificates.values() {
91 root_cert_store
92 .add(cert.clone())
93 .expect("Could not add peer certificate");
94 }
95
96 let verifier = WebPkiClientVerifier::builder(root_cert_store.into())
97 .build()
98 .expect("Failed to create client verifier");
99
100 let certificate = cfg
101 .certificates
102 .get(&identity)
103 .expect("No certificate for ourself found")
104 .clone();
105
106 let config = rustls::ServerConfig::builder()
107 .with_client_cert_verifier(verifier)
108 .with_single_cert(vec![certificate], cfg.private_key.clone_key())
109 .expect("Failed to create TLS config");
110
111 let listener = TcpListener::bind(p2p_bind_addr)
112 .await
113 .expect("Could not bind to port");
114
115 let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
116
117 TlsTcpConnector {
118 cfg,
119 peers: peers.into_iter().map(|(id, peer)| (id, peer.url)).collect(),
120 identity,
121 listener,
122 acceptor,
123 }
124 }
125}
126
127#[async_trait]
128impl<M> IP2PConnector<M> for TlsTcpConnector
129where
130 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
131{
132 fn peers(&self) -> Vec<PeerId> {
133 self.peers
134 .keys()
135 .filter(|peer| **peer != self.identity)
136 .copied()
137 .collect()
138 }
139
140 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
141 let mut root_cert_store = RootCertStore::empty();
142
143 for cert in self.cfg.certificates.values() {
144 root_cert_store
145 .add(cert.clone())
146 .expect("Could not add peer certificate");
147 }
148
149 let certificate = self
150 .cfg
151 .certificates
152 .get(&self.identity)
153 .expect("No certificate for ourself found")
154 .clone();
155
156 let cfg = rustls::ClientConfig::builder()
157 .with_root_certificates(root_cert_store)
158 .with_client_auth_cert(vec![certificate], self.cfg.private_key.clone_key())
159 .expect("Failed to create TLS config");
160
161 let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]))
162 .expect("Always a valid DNS name");
163
164 let destination = self.peers.get(&peer).expect("No url for peer");
165
166 let tls = TlsConnector::from(Arc::new(cfg))
167 .connect(domain, TcpStream::connect(parse_p2p(destination)?).await?)
168 .await?;
169
170 let certificate = tls
171 .get_ref()
172 .1
173 .peer_certificates()
174 .context("Peer did not authenticate itself")?
175 .first()
176 .context("Received certificate chain of length zero")?;
177
178 let auth_peer = self
179 .cfg
180 .certificates
181 .iter()
182 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
183 .context("Unknown certificate")?;
184
185 ensure!(auth_peer == peer, "Connected to unexpected peer");
186
187 let framed = LengthDelimitedCodec::builder()
188 .length_field_type::<u64>()
189 .new_framed(TlsStream::Client(tls));
190
191 Ok(framed.into_dyn())
192 }
193
194 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
195 let tls = self
196 .acceptor
197 .accept(self.listener.accept().await?.0)
198 .await?;
199
200 let certificate = tls
201 .get_ref()
202 .1
203 .peer_certificates()
204 .context("Peer did not authenticate itself")?
205 .first()
206 .context("Received certificate chain of length zero")?;
207
208 let auth_peer = self
209 .cfg
210 .certificates
211 .iter()
212 .find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
213 .context("Unknown certificate")?;
214
215 let framed = LengthDelimitedCodec::builder()
216 .length_field_type::<u64>()
217 .new_framed(TlsStream::Server(tls));
218
219 Ok((auth_peer, framed.into_dyn()))
220 }
221
222 async fn connection_type(&self, _peer: PeerId) -> ConnectionType {
223 ConnectionType::Direct
225 }
226}
227
228pub fn gen_cert_and_key(
229 name: &str,
230) -> Result<
231 (
232 rustls::pki_types::CertificateDer<'static>,
233 Arc<rustls::pki_types::PrivateKeyDer<'static>>,
234 ),
235 anyhow::Error,
236> {
237 let cert_key = rcgen::generate_simple_self_signed(vec![dns_sanitize(name)])?;
238
239 Ok((
240 rustls::pki_types::CertificateDer::from(cert_key.cert.der().to_vec()),
241 Arc::new(
242 rustls::pki_types::PrivateKeyDer::try_from(cert_key.key_pair.serialize_der())
243 .expect("Failed to create private key"),
244 ),
245 ))
246}
247
248pub fn dns_sanitize(name: &str) -> String {
250 format!(
251 "peer{}",
252 name.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
253 )
254}
255
256pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
258 ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
259
260 let host = url.host_str().context("p2p url is missing host")?;
261
262 let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
263
264 Ok(format!("{host}:{port}"))
265}
266
267#[derive(Debug, Clone)]
268pub struct IrohConnector {
269 pub node_ids: BTreeMap<PeerId, NodeId>,
271 pub endpoint: Endpoint,
273 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
277}
278
279const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
280
281impl IrohConnector {
282 pub async fn new(
283 secret_key: SecretKey,
284 p2p_bind_addr: SocketAddr,
285 iroh_dns: Option<SafeUrl>,
286 iroh_relays: Vec<SafeUrl>,
287 node_ids: BTreeMap<PeerId, NodeId>,
288 ) -> anyhow::Result<Self> {
289 let mut s =
290 Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
291 .await?;
292
293 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
294 s = s.with_connection_override(k, v.into());
295 }
296
297 Ok(s)
298 }
299
300 pub async fn new_no_overrides(
301 secret_key: SecretKey,
302 bind_addr: SocketAddr,
303 iroh_dns: Option<SafeUrl>,
304 iroh_relays: Vec<SafeUrl>,
305 node_ids: BTreeMap<PeerId, NodeId>,
306 ) -> anyhow::Result<Self> {
307 let identity = *node_ids
308 .iter()
309 .find(|entry| entry.1 == &secret_key.public())
310 .expect("Our public key is not part of the keyset")
311 .0;
312
313 let endpoint = build_iroh_endpoint(
314 secret_key,
315 bind_addr,
316 iroh_dns,
317 iroh_relays,
318 FEDIMINT_P2P_ALPN,
319 )
320 .await?;
321
322 Ok(Self {
323 node_ids: node_ids
324 .into_iter()
325 .filter(|entry| entry.0 != identity)
326 .collect(),
327 endpoint,
328 connection_overrides: BTreeMap::default(),
329 })
330 }
331
332 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
333 self.connection_overrides.insert(node, addr);
334 self
335 }
336}
337
338pub(crate) async fn build_iroh_endpoint(
339 secret_key: SecretKey,
340 bind_addr: SocketAddr,
341 iroh_dns: Option<SafeUrl>,
342 iroh_relays: Vec<SafeUrl>,
343 alpn: &[u8],
344) -> Result<Endpoint, anyhow::Error> {
345 let iroh_dns_servers: Vec<_> = iroh_dns.clone().map_or_else(
346 || {
347 FM_IROH_DNS_FEDIMINT_PROD
348 .into_iter()
349 .map(|dns| dns.parse().expect("Can't fail"))
350 .collect()
351 },
352 |iroh_dns| vec![iroh_dns.to_unsafe()],
353 );
354
355 let relay_mode = if iroh_relays.is_empty() {
356 RelayMode::Custom(
357 FM_IROH_RELAYS_FEDIMINT_PROD
358 .into_iter()
359 .map(|url| RelayNode {
360 url: RelayUrl::from(Url::parse(url).expect("Hardcoded, can't fail")),
361 stun_only: false,
362 stun_port: DEFAULT_STUN_PORT,
363 quic: Some(RelayQuicConfig::default()),
364 })
365 .collect(),
366 )
367 } else {
368 RelayMode::Custom(
369 iroh_relays
370 .into_iter()
371 .map(|url| RelayNode {
372 url: RelayUrl::from(url.to_unsafe()),
373 stun_only: false,
374 stun_port: DEFAULT_STUN_PORT,
375 quic: Some(RelayQuicConfig::default()),
376 })
377 .collect(),
378 )
379 };
380
381 let mut builder = Endpoint::builder();
382
383 for iroh_dns in iroh_dns_servers {
384 builder = builder
385 .add_discovery({
386 let iroh_dns = iroh_dns.clone();
387 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
388 })
389 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
390 }
391
392 let builder = if is_env_var_disabled(FM_IROH_ENABLE_DHT_ENV) {
394 info!(
395 target: LOG_NET_IROH,
396 "Iroh DHT is disabled"
397 );
398 builder
399 } else {
400 builder.discovery_dht()
401 };
402
403 let builder = builder
404 .discovery_n0()
405 .relay_mode(relay_mode)
406 .secret_key(secret_key)
407 .alpns(vec![alpn.to_vec()]);
408
409 let builder = match bind_addr {
410 SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
411 SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
412 };
413
414 let endpoint = builder.bind().await.expect("Could not bind to port");
415
416 info!(
417 target: LOG_NET_IROH,
418 %bind_addr,
419 node_id = %endpoint.node_id(),
420 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
421 "Iroh p2p server endpoint"
422 );
423
424 Ok(endpoint)
425}
426
427#[async_trait]
428impl<M> IP2PConnector<M> for IrohConnector
429where
430 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
431{
432 fn peers(&self) -> Vec<PeerId> {
433 self.node_ids.keys().copied().collect()
434 }
435
436 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
437 let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
438
439 let connection = match self.connection_overrides.get(&node_id) {
440 Some(node_addr) => {
441 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
442 self.endpoint
443 .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
444 .await?
445 }
446 None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
447 };
448
449 Ok(connection.into_dyn())
450 }
451
452 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
453 let connection = self
454 .endpoint
455 .accept()
456 .await
457 .context("Listener closed unexpectedly")?
458 .accept()?
459 .await?;
460
461 let node_id = connection.remote_node_id()?;
462
463 let auth_peer = self
464 .node_ids
465 .iter()
466 .find(|entry| entry.1 == &node_id)
467 .with_context(|| format!("Node id {node_id} is unknown"))?
468 .0;
469
470 Ok((*auth_peer, connection.into_dyn()))
471 }
472
473 async fn connection_type(&self, peer: PeerId) -> ConnectionType {
474 let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
475
476 let conn_type_watcher = if let Ok(watcher) = self.endpoint.conn_type(node_id) {
478 watcher
479 } else {
480 return ConnectionType::Unknown;
482 };
483
484 let conn_type = if let Ok(conn_type) = conn_type_watcher.get() {
485 conn_type
486 } else {
487 return ConnectionType::Unknown;
489 };
490
491 match conn_type {
492 iroh::endpoint::ConnectionType::Relay(_) => ConnectionType::Relay,
493 iroh::endpoint::ConnectionType::Direct(_)
494 | iroh::endpoint::ConnectionType::Mixed(_, _) => ConnectionType::Direct, iroh::endpoint::ConnectionType::None => ConnectionType::Unknown,
496 }
497 }
498}