Skip to main content

fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use anyhow::{Context, bail};
8use async_trait::async_trait;
9use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
10use fedimint_core::envs::{
11    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
12    parse_kv_list_from_env,
13};
14use fedimint_core::module::{
15    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
16    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
17};
18
19/// The maximum number of bytes we are willing to buffer when reading an API
20/// response from an iroh QUIC stream. This must be large enough to accommodate
21/// the largest possible signed session outcome. A session can contain up to
22/// `broadcast_rounds_per_session` (default 3600) rounds, each peer produces one
23/// unit per round, and each unit can be up to `ALEPH_BFT_UNIT_BYTE_LIMIT`
24/// bytes. The response is JSON-serialized which hex-encodes the consensus
25/// bytes, roughly doubling the size. We use 2x the raw max as a conservative
26/// upper bound. For a 4-peer federation this is ~1.44 GB.
27const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
28use fedimint_core::task::spawn;
29use fedimint_core::util::{FmtCompact as _, SafeUrl};
30use fedimint_core::{apply, async_trait_maybe_send};
31use fedimint_logging::LOG_NET_IROH;
32use futures::Future;
33use futures::stream::{FuturesUnordered, StreamExt};
34use iroh::discovery::pkarr::PkarrResolver;
35use iroh::endpoint::Connection;
36use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
37use iroh_base::ticket::NodeTicket;
38use iroh_next::Watcher as _;
39use reqwest::{Method, StatusCode};
40use serde_json::Value;
41use tokio::sync::watch;
42use tracing::{debug, trace, warn};
43
44use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
45use crate::{Connectivity, DynGatewayConnection, IConnection, IGatewayConnection};
46
47#[derive(Clone)]
48pub(crate) struct IrohConnector {
49    stable: iroh::endpoint::Endpoint,
50    next: Option<iroh_next::endpoint::Endpoint>,
51
52    /// List of overrides to use when attempting to connect to given
53    /// `NodeId`
54    ///
55    /// This is useful for testing, or forcing non-default network
56    /// connectivity.
57    connection_overrides: BTreeMap<NodeId, NodeAddr>,
58
59    /// Registry-owned signal bumped whenever any per-connection monitoring
60    /// task observes a transport-level path change (e.g. iroh relay →
61    /// direct). Consumers of [`crate::ConnectorRegistry`] subscribe via
62    /// [`crate::ConnectorRegistry::connectivity_change_notifier`].
63    path_change: Arc<watch::Sender<u64>>,
64}
65
66impl fmt::Debug for IrohConnector {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("IrohEndpoint")
69            .field("stable-id", &self.stable.node_id())
70            .field("next-id", &self.next.as_ref().map(iroh_next::Endpoint::id))
71            .finish_non_exhaustive()
72    }
73}
74
75impl IrohConnector {
76    pub async fn new(
77        iroh_dns: Option<SafeUrl>,
78        iroh_enable_dht: bool,
79        iroh_enable_next: bool,
80        path_change: Arc<watch::Sender<u64>>,
81    ) -> anyhow::Result<Self> {
82        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
83        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
84        let mut s =
85            Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next, path_change)
86                .await?;
87
88        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
89            s = s.with_connection_override(k, v.into());
90        }
91
92        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
93            s = s.with_connection_override(k, v.into());
94        }
95
96        Ok(s)
97    }
98
99    #[allow(clippy::too_many_lines)]
100    pub async fn new_no_overrides(
101        iroh_dns: Option<SafeUrl>,
102        iroh_enable_dht: bool,
103        iroh_enable_next: bool,
104        path_change: Arc<watch::Sender<u64>>,
105    ) -> anyhow::Result<Self> {
106        let endpoint_stable = Box::pin({
107            let iroh_dns = iroh_dns.clone();
108            async {
109                let mut builder = Endpoint::builder();
110
111                if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
112                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
113                }
114
115                // As a client, we don't need to register on any relays
116                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
117
118                #[cfg(not(target_family = "wasm"))]
119                if iroh_enable_dht {
120                    builder = builder.discovery_dht();
121                }
122
123                // Add only resolver services here; the stable n0 convenience also
124                // installs a publisher.
125                {
126                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
127                        #[cfg(target_family = "wasm")]
128                        {
129                            builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
130                        }
131                    } else {
132                        warn!(
133                            target: LOG_NET_IROH,
134                            "Iroh pkarr resolver is disabled"
135                        );
136                    }
137
138                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
139                        #[cfg(not(target_family = "wasm"))]
140                        {
141                            builder = builder.add_discovery(move |_| {
142                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
143                            });
144                        }
145                    } else {
146                        warn!(
147                            target: LOG_NET_IROH,
148                            "Iroh n0 discovery is disabled"
149                        );
150                    }
151                }
152
153                let endpoint = builder.bind().await?;
154                debug!(
155                    target: LOG_NET_IROH,
156                    node_id = %endpoint.node_id(),
157                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
158                    "Iroh api client endpoint (stable)"
159                );
160                Ok::<_, anyhow::Error>(endpoint)
161            }
162        });
163        let endpoint_next = Box::pin(async {
164            let mut builder = iroh_next::Endpoint::builder();
165
166            if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
167                builder = builder.address_lookup(
168                    iroh_next::address_lookup::PkarrResolver::builder(iroh_dns).build(),
169                );
170            }
171
172            // As a client, we don't need to register on any relays
173            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
174
175            #[cfg(not(target_family = "wasm"))]
176            if iroh_enable_dht {
177                builder =
178                    builder.address_lookup(iroh_next::address_lookup::DhtAddressLookup::builder());
179            }
180
181            // Add only resolver services here; the iroh 0.96
182            // `.preset(presets::N0)` convenience also installs a publisher.
183            {
184                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
185                #[cfg(target_family = "wasm")]
186                {
187                    builder =
188                        builder.address_lookup(iroh_next::address_lookup::PkarrResolver::n0_dns());
189                }
190                // Resolve using DNS queries outside browsers.
191                #[cfg(not(target_family = "wasm"))]
192                {
193                    builder = builder
194                        .address_lookup(iroh_next::address_lookup::DnsAddressLookup::n0_dns());
195                }
196            }
197
198            let endpoint = builder.bind().await?;
199            debug!(
200                target: LOG_NET_IROH,
201                node_id = %endpoint.id(),
202                node_id_pkarr = %z32::encode(endpoint.id().as_bytes()),
203                "Iroh api client endpoint (next)"
204            );
205            Ok(endpoint)
206        });
207
208        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
209            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
210            (s, Some(n))
211        } else {
212            (endpoint_stable.await?, None)
213        };
214
215        Ok(Self {
216            stable: endpoint_stable,
217            next: endpoint_next,
218            connection_overrides: BTreeMap::new(),
219            path_change,
220        })
221    }
222
223    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
224        self.connection_overrides.insert(node, addr);
225        self
226    }
227
228    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
229        if url.scheme() != "iroh" {
230            bail!(
231                "Unsupported scheme: {}, passed to iroh endpoint handler",
232                url.scheme()
233            );
234        }
235        let host = url.host_str().context("Missing host string in Iroh URL")?;
236
237        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
238
239        Ok(node_id)
240    }
241}
242
243#[async_trait::async_trait]
244impl crate::Connector for IrohConnector {
245    async fn connect_guardian(
246        &self,
247        url: &SafeUrl,
248        api_secret: Option<&str>,
249    ) -> ServerResult<DynGuaridianConnection> {
250        if api_secret.is_some() {
251            // There seem to be no way to pass secret over current Iroh calling
252            // convention
253            ServerError::Connection(anyhow::format_err!(
254                "Iroh api secrets currently not supported"
255            ));
256        }
257        let node_id =
258            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
259                source,
260                url: url.to_owned(),
261            })?;
262        let mut futures = FuturesUnordered::<
263            Pin<
264                Box<
265                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
266                        + Send,
267                >,
268            >,
269        >::new();
270        let connection_override = self.connection_overrides.get(&node_id).cloned();
271
272        let self_clone = self.clone();
273        futures.push(Box::pin({
274            let connection_override = connection_override.clone();
275            async move {
276                (
277                    self_clone
278                        .make_new_connection_stable(node_id, connection_override)
279                        .await
280                        .map(super::IGuardianConnection::into_dyn),
281                    "stable",
282                )
283            }
284        }));
285
286        if let Some(endpoint_next) = &self.next {
287            let self_clone = self.clone();
288            let endpoint_next = endpoint_next.clone();
289            futures.push(Box::pin(async move {
290                (
291                    self_clone
292                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
293                        .await
294                        .map(super::IGuardianConnection::into_dyn),
295                    "next",
296                )
297            }));
298        }
299
300        // Remember last error, so we have something to return if
301        // neither connection works.
302        let mut prev_err = None;
303
304        // Loop until first success, or running out of connections.
305        while let Some((result, iroh_stack)) = futures.next().await {
306            match result {
307                Ok(connection) => return Ok(connection),
308                Err(err) => {
309                    warn!(
310                        target: LOG_NET_IROH,
311                        err = %err.fmt_compact(),
312                        %iroh_stack,
313                        "Join error in iroh connection task"
314                    );
315                    prev_err = Some(err);
316                }
317            }
318        }
319
320        Err(prev_err.unwrap_or_else(|| {
321            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
322        }))
323    }
324
325    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
326        let node_id = Self::node_id_from_url(url)?;
327        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
328            let conn = self
329                .stable
330                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
331                .await?;
332
333            #[cfg(not(target_family = "wasm"))]
334            Self::spawn_connection_monitoring_stable(
335                &self.stable,
336                node_id,
337                self.path_change.clone(),
338            );
339
340            Ok(IGatewayConnection::into_dyn(conn))
341        } else {
342            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
343            Ok(IGatewayConnection::into_dyn(conn))
344        }
345    }
346
347    fn connectivity(&self, url: &SafeUrl) -> Connectivity {
348        let Ok(node_id) = Self::node_id_from_url(url) else {
349            return Connectivity::Unknown;
350        };
351        let Ok(watcher) = self.stable.conn_type(node_id) else {
352            return Connectivity::Unknown;
353        };
354        match watcher.get() {
355            Ok(iroh::endpoint::ConnectionType::Direct(_)) => Connectivity::Direct,
356            Ok(iroh::endpoint::ConnectionType::Relay(_)) => Connectivity::Relay,
357            Ok(iroh::endpoint::ConnectionType::Mixed(..)) => Connectivity::Mixed,
358            Ok(iroh::endpoint::ConnectionType::None) | Err(_) => Connectivity::Unknown,
359        }
360    }
361}
362
363impl IrohConnector {
364    #[cfg(not(target_family = "wasm"))]
365    fn spawn_connection_monitoring_stable(
366        endpoint: &Endpoint,
367        node_id: NodeId,
368        path_change: Arc<watch::Sender<u64>>,
369    ) {
370        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
371            #[allow(clippy::let_underscore_future)]
372            let _ = spawn("iroh connection (stable)", async move {
373                if let Ok(conn_type) = conn_type_watcher.get() {
374                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
375                }
376                while let Ok(event) = conn_type_watcher.updated().await {
377                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
378                    path_change.send_modify(|c| *c = c.wrapping_add(1));
379                }
380            });
381        }
382    }
383
384    #[cfg(not(target_family = "wasm"))]
385    fn spawn_connection_monitoring_next(
386        conn: &iroh_next::endpoint::Connection,
387        node_id: iroh_next::EndpointId,
388        path_change: Arc<watch::Sender<u64>>,
389    ) {
390        let mut paths_watcher = conn.paths();
391        #[allow(clippy::let_underscore_future)]
392        let _ = spawn("iroh connection (next)", async move {
393            let paths = paths_watcher.get();
394            debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths (initial)");
395            while let Ok(paths) = paths_watcher.updated().await {
396                debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths changed");
397                path_change.send_modify(|c| *c = c.wrapping_add(1));
398            }
399        });
400    }
401
402    async fn make_new_connection_stable(
403        &self,
404        node_id: NodeId,
405        node_addr: Option<NodeAddr>,
406    ) -> ServerResult<Connection> {
407        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
408        let conn = match node_addr.clone() {
409            Some(node_addr) => {
410                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
411                let conn = self.stable
412                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
413                    .await;
414
415                #[cfg(not(target_family = "wasm"))]
416                if conn.is_ok() {
417                    Self::spawn_connection_monitoring_stable(
418                        &self.stable,
419                        node_id,
420                        self.path_change.clone(),
421                    );
422                }
423                conn
424            }
425            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
426        }.map_err(ServerError::Connection)?;
427
428        Ok(conn)
429    }
430
431    async fn make_new_connection_next(
432        &self,
433        endpoint_next: &iroh_next::Endpoint,
434        node_id: NodeId,
435        node_addr: Option<NodeAddr>,
436    ) -> ServerResult<iroh_next::endpoint::Connection> {
437        let next_node_id =
438            iroh_next::EndpointId::from_bytes(node_id.as_bytes()).expect("Can't fail");
439
440        let endpoint_next = endpoint_next.clone();
441
442        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
443        let conn = match node_addr.clone() {
444            Some(node_addr) => {
445                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
446                let node_addr = node_addr_stable_to_next(&node_addr);
447                let conn = endpoint_next
448                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
449                    .await;
450
451                #[cfg(not(target_family = "wasm"))]
452                if let Ok(conn) = &conn {
453                    Self::spawn_connection_monitoring_next(
454                        conn,
455                        node_addr.id,
456                        self.path_change.clone(),
457                    );
458                }
459
460                conn
461            }
462            None => endpoint_next.connect(
463                next_node_id,
464                FEDIMINT_API_ALPN
465            ).await,
466        }
467        .map_err(Into::into)
468        .map_err(ServerError::Connection)?;
469
470        Ok(conn)
471    }
472}
473
474fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::EndpointAddr {
475    let next_node_id =
476        iroh_next::EndpointId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail");
477    let relay_addrs = stable.relay_url.iter().map(|u| {
478        iroh_next::TransportAddr::Relay(
479            iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail"),
480        )
481    });
482    let direct_addrs = stable
483        .direct_addresses
484        .iter()
485        .copied()
486        .map(iroh_next::TransportAddr::Ip);
487
488    iroh_next::EndpointAddr::from_parts(next_node_id, relay_addrs.chain(direct_addrs))
489}
490
491#[apply(async_trait_maybe_send!)]
492impl IConnection for Connection {
493    async fn await_disconnection(&self) {
494        self.closed().await;
495    }
496
497    fn is_connected(&self) -> bool {
498        self.close_reason().is_none()
499    }
500}
501
502#[async_trait]
503impl IGuardianConnection for Connection {
504    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
505        let json = serde_json::to_vec(&IrohApiRequest { method, request })
506            .expect("Serialization to vec can't fail");
507
508        let (mut sink, mut stream) = self
509            .open_bi()
510            .await
511            .map_err(|e| ServerError::Transport(e.into()))?;
512
513        sink.write_all(&json)
514            .await
515            .map_err(|e| ServerError::Transport(e.into()))?;
516
517        sink.finish()
518            .map_err(|e| ServerError::Transport(e.into()))?;
519
520        let response = stream
521            .read_to_end(IROH_MAX_RESPONSE_BYTES)
522            .await
523            .map_err(|e| ServerError::Transport(e.into()))?;
524
525        // TODO: We should not be serializing Results on the wire
526        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
527            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
528
529        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
530    }
531}
532
533#[apply(async_trait_maybe_send!)]
534impl IConnection for iroh_next::endpoint::Connection {
535    async fn await_disconnection(&self) {
536        self.closed().await;
537    }
538
539    fn is_connected(&self) -> bool {
540        self.close_reason().is_none()
541    }
542}
543
544#[async_trait]
545impl IGuardianConnection for iroh_next::endpoint::Connection {
546    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
547        let json = serde_json::to_vec(&IrohApiRequest { method, request })
548            .expect("Serialization to vec can't fail");
549
550        let (mut sink, mut stream) = self
551            .open_bi()
552            .await
553            .map_err(|e| ServerError::Transport(e.into()))?;
554
555        sink.write_all(&json)
556            .await
557            .map_err(|e| ServerError::Transport(e.into()))?;
558
559        sink.finish()
560            .map_err(|e| ServerError::Transport(e.into()))?;
561
562        let response = stream
563            .read_to_end(IROH_MAX_RESPONSE_BYTES)
564            .await
565            .map_err(|e| ServerError::Transport(e.into()))?;
566
567        // TODO: We should not be serializing Results on the wire
568        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
569            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
570
571        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
572    }
573}
574
575#[apply(async_trait_maybe_send!)]
576impl IGatewayConnection for Connection {
577    async fn request(
578        &self,
579        password: Option<String>,
580        _method: Method,
581        route: &str,
582        payload: Option<Value>,
583    ) -> ServerResult<Value> {
584        let iroh_request = IrohGatewayRequest {
585            route: route.to_string(),
586            params: payload,
587            password,
588        };
589        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
590
591        let (mut sink, mut stream) = self
592            .open_bi()
593            .await
594            .map_err(|e| ServerError::Transport(e.into()))?;
595
596        sink.write_all(&json)
597            .await
598            .map_err(|e| ServerError::Transport(e.into()))?;
599
600        sink.finish()
601            .map_err(|e| ServerError::Transport(e.into()))?;
602
603        let response = stream
604            .read_to_end(IROH_MAX_RESPONSE_BYTES)
605            .await
606            .map_err(|e| ServerError::Transport(e.into()))?;
607
608        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
609            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
610        match StatusCode::from_u16(response.status).map_err(|e| {
611            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
612        })? {
613            StatusCode::OK => Ok(response.body),
614            status => Err(ServerError::ServerError(anyhow::anyhow!(
615                "Server returned status code: {}",
616                status
617            ))),
618        }
619    }
620}