1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use anyhow::{Context, bail};
8use async_trait::async_trait;
9use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
10use fedimint_core::envs::{
11 FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
12 parse_kv_list_from_env,
13};
14use fedimint_core::module::{
15 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
16 IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
17};
18
19const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
28use fedimint_core::task::spawn;
29use fedimint_core::util::{FmtCompact as _, SafeUrl};
30use fedimint_core::{apply, async_trait_maybe_send};
31use fedimint_logging::LOG_NET_IROH;
32use futures::Future;
33use futures::stream::{FuturesUnordered, StreamExt};
34use iroh::discovery::pkarr::PkarrResolver;
35use iroh::endpoint::Connection;
36use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
37use iroh_base::ticket::NodeTicket;
38use iroh_next::Watcher as _;
39use reqwest::{Method, StatusCode};
40use serde_json::Value;
41use tokio::sync::watch;
42use tracing::{debug, trace, warn};
43
44use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
45use crate::{Connectivity, DynGatewayConnection, IConnection, IGatewayConnection};
46
47#[derive(Clone)]
48pub(crate) struct IrohConnector {
49 stable: iroh::endpoint::Endpoint,
50 next: Option<iroh_next::endpoint::Endpoint>,
51
52 connection_overrides: BTreeMap<NodeId, NodeAddr>,
58
59 path_change: Arc<watch::Sender<u64>>,
64}
65
66impl fmt::Debug for IrohConnector {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("IrohEndpoint")
69 .field("stable-id", &self.stable.node_id())
70 .field("next-id", &self.next.as_ref().map(iroh_next::Endpoint::id))
71 .finish_non_exhaustive()
72 }
73}
74
75impl IrohConnector {
76 pub async fn new(
77 iroh_dns: Option<SafeUrl>,
78 iroh_enable_dht: bool,
79 iroh_enable_next: bool,
80 path_change: Arc<watch::Sender<u64>>,
81 ) -> anyhow::Result<Self> {
82 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
83 const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
84 let mut s =
85 Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next, path_change)
86 .await?;
87
88 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
89 s = s.with_connection_override(k, v.into());
90 }
91
92 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
93 s = s.with_connection_override(k, v.into());
94 }
95
96 Ok(s)
97 }
98
99 #[allow(clippy::too_many_lines)]
100 pub async fn new_no_overrides(
101 iroh_dns: Option<SafeUrl>,
102 iroh_enable_dht: bool,
103 iroh_enable_next: bool,
104 path_change: Arc<watch::Sender<u64>>,
105 ) -> anyhow::Result<Self> {
106 let endpoint_stable = Box::pin({
107 let iroh_dns = iroh_dns.clone();
108 async {
109 let mut builder = Endpoint::builder();
110
111 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
112 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
113 }
114
115 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
117
118 #[cfg(not(target_family = "wasm"))]
119 if iroh_enable_dht {
120 builder = builder.discovery_dht();
121 }
122
123 {
126 if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
127 #[cfg(target_family = "wasm")]
128 {
129 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
130 }
131 } else {
132 warn!(
133 target: LOG_NET_IROH,
134 "Iroh pkarr resolver is disabled"
135 );
136 }
137
138 if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
139 #[cfg(not(target_family = "wasm"))]
140 {
141 builder = builder.add_discovery(move |_| {
142 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
143 });
144 }
145 } else {
146 warn!(
147 target: LOG_NET_IROH,
148 "Iroh n0 discovery is disabled"
149 );
150 }
151 }
152
153 let endpoint = builder.bind().await?;
154 debug!(
155 target: LOG_NET_IROH,
156 node_id = %endpoint.node_id(),
157 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
158 "Iroh api client endpoint (stable)"
159 );
160 Ok::<_, anyhow::Error>(endpoint)
161 }
162 });
163 let endpoint_next = Box::pin(async {
164 let mut builder = iroh_next::Endpoint::builder();
165
166 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
167 builder = builder.address_lookup(
168 iroh_next::address_lookup::PkarrResolver::builder(iroh_dns).build(),
169 );
170 }
171
172 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
174
175 #[cfg(not(target_family = "wasm"))]
176 if iroh_enable_dht {
177 builder =
178 builder.address_lookup(iroh_next::address_lookup::DhtAddressLookup::builder());
179 }
180
181 {
184 #[cfg(target_family = "wasm")]
186 {
187 builder =
188 builder.address_lookup(iroh_next::address_lookup::PkarrResolver::n0_dns());
189 }
190 #[cfg(not(target_family = "wasm"))]
192 {
193 builder = builder
194 .address_lookup(iroh_next::address_lookup::DnsAddressLookup::n0_dns());
195 }
196 }
197
198 let endpoint = builder.bind().await?;
199 debug!(
200 target: LOG_NET_IROH,
201 node_id = %endpoint.id(),
202 node_id_pkarr = %z32::encode(endpoint.id().as_bytes()),
203 "Iroh api client endpoint (next)"
204 );
205 Ok(endpoint)
206 });
207
208 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
209 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
210 (s, Some(n))
211 } else {
212 (endpoint_stable.await?, None)
213 };
214
215 Ok(Self {
216 stable: endpoint_stable,
217 next: endpoint_next,
218 connection_overrides: BTreeMap::new(),
219 path_change,
220 })
221 }
222
223 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
224 self.connection_overrides.insert(node, addr);
225 self
226 }
227
228 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
229 if url.scheme() != "iroh" {
230 bail!(
231 "Unsupported scheme: {}, passed to iroh endpoint handler",
232 url.scheme()
233 );
234 }
235 let host = url.host_str().context("Missing host string in Iroh URL")?;
236
237 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
238
239 Ok(node_id)
240 }
241}
242
243#[async_trait::async_trait]
244impl crate::Connector for IrohConnector {
245 async fn connect_guardian(
246 &self,
247 url: &SafeUrl,
248 api_secret: Option<&str>,
249 ) -> ServerResult<DynGuaridianConnection> {
250 if api_secret.is_some() {
251 ServerError::Connection(anyhow::format_err!(
254 "Iroh api secrets currently not supported"
255 ));
256 }
257 let node_id =
258 Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
259 source,
260 url: url.to_owned(),
261 })?;
262 let mut futures = FuturesUnordered::<
263 Pin<
264 Box<
265 dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
266 + Send,
267 >,
268 >,
269 >::new();
270 let connection_override = self.connection_overrides.get(&node_id).cloned();
271
272 let self_clone = self.clone();
273 futures.push(Box::pin({
274 let connection_override = connection_override.clone();
275 async move {
276 (
277 self_clone
278 .make_new_connection_stable(node_id, connection_override)
279 .await
280 .map(super::IGuardianConnection::into_dyn),
281 "stable",
282 )
283 }
284 }));
285
286 if let Some(endpoint_next) = &self.next {
287 let self_clone = self.clone();
288 let endpoint_next = endpoint_next.clone();
289 futures.push(Box::pin(async move {
290 (
291 self_clone
292 .make_new_connection_next(&endpoint_next, node_id, connection_override)
293 .await
294 .map(super::IGuardianConnection::into_dyn),
295 "next",
296 )
297 }));
298 }
299
300 let mut prev_err = None;
303
304 while let Some((result, iroh_stack)) = futures.next().await {
306 match result {
307 Ok(connection) => return Ok(connection),
308 Err(err) => {
309 warn!(
310 target: LOG_NET_IROH,
311 err = %err.fmt_compact(),
312 %iroh_stack,
313 "Join error in iroh connection task"
314 );
315 prev_err = Some(err);
316 }
317 }
318 }
319
320 Err(prev_err.unwrap_or_else(|| {
321 ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
322 }))
323 }
324
325 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
326 let node_id = Self::node_id_from_url(url)?;
327 if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
328 let conn = self
329 .stable
330 .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
331 .await?;
332
333 #[cfg(not(target_family = "wasm"))]
334 Self::spawn_connection_monitoring_stable(
335 &self.stable,
336 node_id,
337 self.path_change.clone(),
338 );
339
340 Ok(IGatewayConnection::into_dyn(conn))
341 } else {
342 let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
343 Ok(IGatewayConnection::into_dyn(conn))
344 }
345 }
346
347 fn connectivity(&self, url: &SafeUrl) -> Connectivity {
348 let Ok(node_id) = Self::node_id_from_url(url) else {
349 return Connectivity::Unknown;
350 };
351 let Ok(watcher) = self.stable.conn_type(node_id) else {
352 return Connectivity::Unknown;
353 };
354 match watcher.get() {
355 Ok(iroh::endpoint::ConnectionType::Direct(_)) => Connectivity::Direct,
356 Ok(iroh::endpoint::ConnectionType::Relay(_)) => Connectivity::Relay,
357 Ok(iroh::endpoint::ConnectionType::Mixed(..)) => Connectivity::Mixed,
358 Ok(iroh::endpoint::ConnectionType::None) | Err(_) => Connectivity::Unknown,
359 }
360 }
361}
362
363impl IrohConnector {
364 #[cfg(not(target_family = "wasm"))]
365 fn spawn_connection_monitoring_stable(
366 endpoint: &Endpoint,
367 node_id: NodeId,
368 path_change: Arc<watch::Sender<u64>>,
369 ) {
370 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
371 #[allow(clippy::let_underscore_future)]
372 let _ = spawn("iroh connection (stable)", async move {
373 if let Ok(conn_type) = conn_type_watcher.get() {
374 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
375 }
376 while let Ok(event) = conn_type_watcher.updated().await {
377 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
378 path_change.send_modify(|c| *c = c.wrapping_add(1));
379 }
380 });
381 }
382 }
383
384 #[cfg(not(target_family = "wasm"))]
385 fn spawn_connection_monitoring_next(
386 conn: &iroh_next::endpoint::Connection,
387 node_id: iroh_next::EndpointId,
388 path_change: Arc<watch::Sender<u64>>,
389 ) {
390 let mut paths_watcher = conn.paths();
391 #[allow(clippy::let_underscore_future)]
392 let _ = spawn("iroh connection (next)", async move {
393 let paths = paths_watcher.get();
394 debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths (initial)");
395 while let Ok(paths) = paths_watcher.updated().await {
396 debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths changed");
397 path_change.send_modify(|c| *c = c.wrapping_add(1));
398 }
399 });
400 }
401
402 async fn make_new_connection_stable(
403 &self,
404 node_id: NodeId,
405 node_addr: Option<NodeAddr>,
406 ) -> ServerResult<Connection> {
407 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
408 let conn = match node_addr.clone() {
409 Some(node_addr) => {
410 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
411 let conn = self.stable
412 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
413 .await;
414
415 #[cfg(not(target_family = "wasm"))]
416 if conn.is_ok() {
417 Self::spawn_connection_monitoring_stable(
418 &self.stable,
419 node_id,
420 self.path_change.clone(),
421 );
422 }
423 conn
424 }
425 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
426 }.map_err(ServerError::Connection)?;
427
428 Ok(conn)
429 }
430
431 async fn make_new_connection_next(
432 &self,
433 endpoint_next: &iroh_next::Endpoint,
434 node_id: NodeId,
435 node_addr: Option<NodeAddr>,
436 ) -> ServerResult<iroh_next::endpoint::Connection> {
437 let next_node_id =
438 iroh_next::EndpointId::from_bytes(node_id.as_bytes()).expect("Can't fail");
439
440 let endpoint_next = endpoint_next.clone();
441
442 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
443 let conn = match node_addr.clone() {
444 Some(node_addr) => {
445 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
446 let node_addr = node_addr_stable_to_next(&node_addr);
447 let conn = endpoint_next
448 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
449 .await;
450
451 #[cfg(not(target_family = "wasm"))]
452 if let Ok(conn) = &conn {
453 Self::spawn_connection_monitoring_next(
454 conn,
455 node_addr.id,
456 self.path_change.clone(),
457 );
458 }
459
460 conn
461 }
462 None => endpoint_next.connect(
463 next_node_id,
464 FEDIMINT_API_ALPN
465 ).await,
466 }
467 .map_err(Into::into)
468 .map_err(ServerError::Connection)?;
469
470 Ok(conn)
471 }
472}
473
474fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::EndpointAddr {
475 let next_node_id =
476 iroh_next::EndpointId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail");
477 let relay_addrs = stable.relay_url.iter().map(|u| {
478 iroh_next::TransportAddr::Relay(
479 iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail"),
480 )
481 });
482 let direct_addrs = stable
483 .direct_addresses
484 .iter()
485 .copied()
486 .map(iroh_next::TransportAddr::Ip);
487
488 iroh_next::EndpointAddr::from_parts(next_node_id, relay_addrs.chain(direct_addrs))
489}
490
491#[apply(async_trait_maybe_send!)]
492impl IConnection for Connection {
493 async fn await_disconnection(&self) {
494 self.closed().await;
495 }
496
497 fn is_connected(&self) -> bool {
498 self.close_reason().is_none()
499 }
500}
501
502#[async_trait]
503impl IGuardianConnection for Connection {
504 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
505 let json = serde_json::to_vec(&IrohApiRequest { method, request })
506 .expect("Serialization to vec can't fail");
507
508 let (mut sink, mut stream) = self
509 .open_bi()
510 .await
511 .map_err(|e| ServerError::Transport(e.into()))?;
512
513 sink.write_all(&json)
514 .await
515 .map_err(|e| ServerError::Transport(e.into()))?;
516
517 sink.finish()
518 .map_err(|e| ServerError::Transport(e.into()))?;
519
520 let response = stream
521 .read_to_end(IROH_MAX_RESPONSE_BYTES)
522 .await
523 .map_err(|e| ServerError::Transport(e.into()))?;
524
525 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
527 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
528
529 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
530 }
531}
532
533#[apply(async_trait_maybe_send!)]
534impl IConnection for iroh_next::endpoint::Connection {
535 async fn await_disconnection(&self) {
536 self.closed().await;
537 }
538
539 fn is_connected(&self) -> bool {
540 self.close_reason().is_none()
541 }
542}
543
544#[async_trait]
545impl IGuardianConnection for iroh_next::endpoint::Connection {
546 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
547 let json = serde_json::to_vec(&IrohApiRequest { method, request })
548 .expect("Serialization to vec can't fail");
549
550 let (mut sink, mut stream) = self
551 .open_bi()
552 .await
553 .map_err(|e| ServerError::Transport(e.into()))?;
554
555 sink.write_all(&json)
556 .await
557 .map_err(|e| ServerError::Transport(e.into()))?;
558
559 sink.finish()
560 .map_err(|e| ServerError::Transport(e.into()))?;
561
562 let response = stream
563 .read_to_end(IROH_MAX_RESPONSE_BYTES)
564 .await
565 .map_err(|e| ServerError::Transport(e.into()))?;
566
567 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
569 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
570
571 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
572 }
573}
574
575#[apply(async_trait_maybe_send!)]
576impl IGatewayConnection for Connection {
577 async fn request(
578 &self,
579 password: Option<String>,
580 _method: Method,
581 route: &str,
582 payload: Option<Value>,
583 ) -> ServerResult<Value> {
584 let iroh_request = IrohGatewayRequest {
585 route: route.to_string(),
586 params: payload,
587 password,
588 };
589 let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
590
591 let (mut sink, mut stream) = self
592 .open_bi()
593 .await
594 .map_err(|e| ServerError::Transport(e.into()))?;
595
596 sink.write_all(&json)
597 .await
598 .map_err(|e| ServerError::Transport(e.into()))?;
599
600 sink.finish()
601 .map_err(|e| ServerError::Transport(e.into()))?;
602
603 let response = stream
604 .read_to_end(IROH_MAX_RESPONSE_BYTES)
605 .await
606 .map_err(|e| ServerError::Transport(e.into()))?;
607
608 let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
609 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
610 match StatusCode::from_u16(response.status).map_err(|e| {
611 ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
612 })? {
613 StatusCode::OK => Ok(response.body),
614 status => Err(ServerError::ServerError(anyhow::anyhow!(
615 "Server returned status code: {}",
616 status
617 ))),
618 }
619 }
620}