Skip to main content

fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
9use fedimint_core::envs::{
10    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
11    parse_kv_list_from_env,
12};
13use fedimint_core::module::{
14    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
15    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
16};
17
18/// The maximum number of bytes we are willing to buffer when reading an API
19/// response from an iroh QUIC stream. This must be large enough to accommodate
20/// the largest possible signed session outcome. A session can contain up to
21/// `broadcast_rounds_per_session` (default 3600) rounds, each peer produces one
22/// unit per round, and each unit can be up to `ALEPH_BFT_UNIT_BYTE_LIMIT`
23/// bytes. The response is JSON-serialized which hex-encodes the consensus
24/// bytes, roughly doubling the size. We use 2x the raw max as a conservative
25/// upper bound. For a 4-peer federation this is ~1.44 GB.
26const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
27use fedimint_core::task::spawn;
28use fedimint_core::util::{FmtCompact as _, SafeUrl};
29use fedimint_core::{apply, async_trait_maybe_send};
30use fedimint_logging::LOG_NET_IROH;
31use futures::Future;
32use futures::stream::{FuturesUnordered, StreamExt};
33use iroh::discovery::pkarr::PkarrResolver;
34use iroh::endpoint::Connection;
35use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
36use iroh_base::ticket::NodeTicket;
37use iroh_next::Watcher as _;
38use reqwest::{Method, StatusCode};
39use serde_json::Value;
40use tracing::{debug, trace, warn};
41
42use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
43use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
44
45#[derive(Clone)]
46pub(crate) struct IrohConnector {
47    stable: iroh::endpoint::Endpoint,
48    next: Option<iroh_next::endpoint::Endpoint>,
49
50    /// List of overrides to use when attempting to connect to given
51    /// `NodeId`
52    ///
53    /// This is useful for testing, or forcing non-default network
54    /// connectivity.
55    connection_overrides: BTreeMap<NodeId, NodeAddr>,
56}
57
58impl fmt::Debug for IrohConnector {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("IrohEndpoint")
61            .field("stable-id", &self.stable.node_id())
62            .field(
63                "next-id",
64                &self.next.as_ref().map(iroh_next::Endpoint::node_id),
65            )
66            .finish_non_exhaustive()
67    }
68}
69
70impl IrohConnector {
71    pub async fn new(
72        iroh_dns: Option<SafeUrl>,
73        iroh_enable_dht: bool,
74        iroh_enable_next: bool,
75    ) -> anyhow::Result<Self> {
76        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
77        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
78        let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
79
80        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
81            s = s.with_connection_override(k, v.into());
82        }
83
84        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
85            s = s.with_connection_override(k, v.into());
86        }
87
88        Ok(s)
89    }
90
91    #[allow(clippy::too_many_lines)]
92    pub async fn new_no_overrides(
93        iroh_dns: Option<SafeUrl>,
94        iroh_enable_dht: bool,
95        iroh_enable_next: bool,
96    ) -> anyhow::Result<Self> {
97        let endpoint_stable = Box::pin({
98            let iroh_dns = iroh_dns.clone();
99            async {
100                let mut builder = Endpoint::builder();
101
102                if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
103                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
104                }
105
106                // As a client, we don't need to register on any relays
107                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
108
109                #[cfg(not(target_family = "wasm"))]
110                if iroh_enable_dht {
111                    builder = builder.discovery_dht();
112                }
113
114                // instead of `.discovery_n0`, which brings publisher we don't want
115                {
116                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
117                        #[cfg(target_family = "wasm")]
118                        {
119                            builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
120                        }
121                    } else {
122                        warn!(
123                            target: LOG_NET_IROH,
124                            "Iroh pkarr resolver is disabled"
125                        );
126                    }
127
128                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
129                        #[cfg(not(target_family = "wasm"))]
130                        {
131                            builder = builder.add_discovery(move |_| {
132                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
133                            });
134                        }
135                    } else {
136                        warn!(
137                            target: LOG_NET_IROH,
138                            "Iroh n0 discovery is disabled"
139                        );
140                    }
141                }
142
143                let endpoint = builder.bind().await?;
144                debug!(
145                    target: LOG_NET_IROH,
146                    node_id = %endpoint.node_id(),
147                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
148                    "Iroh api client endpoint (stable)"
149                );
150                Ok::<_, anyhow::Error>(endpoint)
151            }
152        });
153        let endpoint_next = Box::pin(async {
154            let mut builder = iroh_next::Endpoint::builder();
155
156            if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
157                builder = builder.add_discovery(
158                    iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
159                );
160            }
161
162            // As a client, we don't need to register on any relays
163            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
164
165            #[cfg(not(target_family = "wasm"))]
166            if iroh_enable_dht {
167                builder = builder.discovery_dht();
168            }
169
170            // instead of `.discovery_n0`, which brings publisher we don't want
171            {
172                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
173                #[cfg(target_family = "wasm")]
174                {
175                    builder =
176                        builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
177                }
178                // Resolve using DNS queries outside browsers.
179                #[cfg(not(target_family = "wasm"))]
180                {
181                    builder =
182                        builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
183                }
184            }
185
186            let endpoint = builder.bind().await?;
187            debug!(
188                target: LOG_NET_IROH,
189                node_id = %endpoint.node_id(),
190                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
191                "Iroh api client endpoint (next)"
192            );
193            Ok(endpoint)
194        });
195
196        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
197            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
198            (s, Some(n))
199        } else {
200            (endpoint_stable.await?, None)
201        };
202
203        Ok(Self {
204            stable: endpoint_stable,
205            next: endpoint_next,
206            connection_overrides: BTreeMap::new(),
207        })
208    }
209
210    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
211        self.connection_overrides.insert(node, addr);
212        self
213    }
214
215    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
216        if url.scheme() != "iroh" {
217            bail!(
218                "Unsupported scheme: {}, passed to iroh endpoint handler",
219                url.scheme()
220            );
221        }
222        let host = url.host_str().context("Missing host string in Iroh URL")?;
223
224        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
225
226        Ok(node_id)
227    }
228}
229
230#[async_trait::async_trait]
231impl crate::Connector for IrohConnector {
232    async fn connect_guardian(
233        &self,
234        url: &SafeUrl,
235        api_secret: Option<&str>,
236    ) -> ServerResult<DynGuaridianConnection> {
237        if api_secret.is_some() {
238            // There seem to be no way to pass secret over current Iroh calling
239            // convention
240            ServerError::Connection(anyhow::format_err!(
241                "Iroh api secrets currently not supported"
242            ));
243        }
244        let node_id =
245            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
246                source,
247                url: url.to_owned(),
248            })?;
249        let mut futures = FuturesUnordered::<
250            Pin<
251                Box<
252                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
253                        + Send,
254                >,
255            >,
256        >::new();
257        let connection_override = self.connection_overrides.get(&node_id).cloned();
258
259        let self_clone = self.clone();
260        futures.push(Box::pin({
261            let connection_override = connection_override.clone();
262            async move {
263                (
264                    self_clone
265                        .make_new_connection_stable(node_id, connection_override)
266                        .await
267                        .map(super::IGuardianConnection::into_dyn),
268                    "stable",
269                )
270            }
271        }));
272
273        if let Some(endpoint_next) = &self.next {
274            let self_clone = self.clone();
275            let endpoint_next = endpoint_next.clone();
276            futures.push(Box::pin(async move {
277                (
278                    self_clone
279                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
280                        .await
281                        .map(super::IGuardianConnection::into_dyn),
282                    "next",
283                )
284            }));
285        }
286
287        // Remember last error, so we have something to return if
288        // neither connection works.
289        let mut prev_err = None;
290
291        // Loop until first success, or running out of connections.
292        while let Some((result, iroh_stack)) = futures.next().await {
293            match result {
294                Ok(connection) => return Ok(connection),
295                Err(err) => {
296                    warn!(
297                        target: LOG_NET_IROH,
298                        err = %err.fmt_compact(),
299                        %iroh_stack,
300                        "Join error in iroh connection task"
301                    );
302                    prev_err = Some(err);
303                }
304            }
305        }
306
307        Err(prev_err.unwrap_or_else(|| {
308            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
309        }))
310    }
311
312    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
313        let node_id = Self::node_id_from_url(url)?;
314        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
315            let conn = self
316                .stable
317                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
318                .await?;
319
320            #[cfg(not(target_family = "wasm"))]
321            Self::spawn_connection_monitoring_stable(&self.stable, node_id);
322
323            Ok(IGatewayConnection::into_dyn(conn))
324        } else {
325            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
326            Ok(IGatewayConnection::into_dyn(conn))
327        }
328    }
329}
330
331impl IrohConnector {
332    #[cfg(not(target_family = "wasm"))]
333    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
334        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
335            #[allow(clippy::let_underscore_future)]
336            let _ = spawn("iroh connection (stable)", async move {
337                if let Ok(conn_type) = conn_type_watcher.get() {
338                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
339                }
340                while let Ok(event) = conn_type_watcher.updated().await {
341                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
342                }
343            });
344        }
345    }
346
347    #[cfg(not(target_family = "wasm"))]
348    fn spawn_connection_monitoring_next(
349        endpoint: &iroh_next::Endpoint,
350        node_addr: &iroh_next::NodeAddr,
351    ) {
352        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
353            let node_id = node_addr.node_id;
354            #[allow(clippy::let_underscore_future)]
355            let _ = spawn("iroh connection (next)", async move {
356                if let Ok(conn_type) = conn_type_watcher.get() {
357                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
358                }
359                while let Ok(event) = conn_type_watcher.updated().await {
360                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
361                }
362            });
363        }
364    }
365
366    async fn make_new_connection_stable(
367        &self,
368        node_id: NodeId,
369        node_addr: Option<NodeAddr>,
370    ) -> ServerResult<Connection> {
371        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
372        let conn = match node_addr.clone() {
373            Some(node_addr) => {
374                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
375                let conn = self.stable
376                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
377                    .await;
378
379                #[cfg(not(target_family = "wasm"))]
380                if conn.is_ok() {
381                    Self::spawn_connection_monitoring_stable(&self.stable, node_id);
382                }
383                conn
384            }
385            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
386        }.map_err(ServerError::Connection)?;
387
388        Ok(conn)
389    }
390
391    async fn make_new_connection_next(
392        &self,
393        endpoint_next: &iroh_next::Endpoint,
394        node_id: NodeId,
395        node_addr: Option<NodeAddr>,
396    ) -> ServerResult<iroh_next::endpoint::Connection> {
397        let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
398
399        let endpoint_next = endpoint_next.clone();
400
401        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
402        let conn = match node_addr.clone() {
403            Some(node_addr) => {
404                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
405                let node_addr = node_addr_stable_to_next(&node_addr);
406                let conn = endpoint_next
407                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
408                    .await;
409
410                #[cfg(not(target_family = "wasm"))]
411                if conn.is_ok() {
412                    Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
413                }
414
415                conn
416            }
417            None => endpoint_next.connect(
418                next_node_id,
419                FEDIMINT_API_ALPN
420            ).await,
421        }
422        .map_err(Into::into)
423        .map_err(ServerError::Connection)?;
424
425        Ok(conn)
426    }
427}
428
429fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
430    iroh_next::NodeAddr {
431        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
432        relay_url: stable
433            .relay_url
434            .as_ref()
435            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
436        direct_addresses: stable.direct_addresses.clone(),
437    }
438}
439
440#[apply(async_trait_maybe_send!)]
441impl IConnection for Connection {
442    async fn await_disconnection(&self) {
443        self.closed().await;
444    }
445
446    fn is_connected(&self) -> bool {
447        self.close_reason().is_none()
448    }
449}
450
451#[async_trait]
452impl IGuardianConnection for Connection {
453    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
454        let json = serde_json::to_vec(&IrohApiRequest { method, request })
455            .expect("Serialization to vec can't fail");
456
457        let (mut sink, mut stream) = self
458            .open_bi()
459            .await
460            .map_err(|e| ServerError::Transport(e.into()))?;
461
462        sink.write_all(&json)
463            .await
464            .map_err(|e| ServerError::Transport(e.into()))?;
465
466        sink.finish()
467            .map_err(|e| ServerError::Transport(e.into()))?;
468
469        let response = stream
470            .read_to_end(IROH_MAX_RESPONSE_BYTES)
471            .await
472            .map_err(|e| ServerError::Transport(e.into()))?;
473
474        // TODO: We should not be serializing Results on the wire
475        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
476            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
477
478        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
479    }
480}
481
482#[apply(async_trait_maybe_send!)]
483impl IConnection for iroh_next::endpoint::Connection {
484    async fn await_disconnection(&self) {
485        self.closed().await;
486    }
487
488    fn is_connected(&self) -> bool {
489        self.close_reason().is_none()
490    }
491}
492
493#[async_trait]
494impl IGuardianConnection for iroh_next::endpoint::Connection {
495    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
496        let json = serde_json::to_vec(&IrohApiRequest { method, request })
497            .expect("Serialization to vec can't fail");
498
499        let (mut sink, mut stream) = self
500            .open_bi()
501            .await
502            .map_err(|e| ServerError::Transport(e.into()))?;
503
504        sink.write_all(&json)
505            .await
506            .map_err(|e| ServerError::Transport(e.into()))?;
507
508        sink.finish()
509            .map_err(|e| ServerError::Transport(e.into()))?;
510
511        let response = stream
512            .read_to_end(IROH_MAX_RESPONSE_BYTES)
513            .await
514            .map_err(|e| ServerError::Transport(e.into()))?;
515
516        // TODO: We should not be serializing Results on the wire
517        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
518            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
519
520        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
521    }
522}
523
524#[apply(async_trait_maybe_send!)]
525impl IGatewayConnection for Connection {
526    async fn request(
527        &self,
528        password: Option<String>,
529        _method: Method,
530        route: &str,
531        payload: Option<Value>,
532    ) -> ServerResult<Value> {
533        let iroh_request = IrohGatewayRequest {
534            route: route.to_string(),
535            params: payload,
536            password,
537        };
538        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
539
540        let (mut sink, mut stream) = self
541            .open_bi()
542            .await
543            .map_err(|e| ServerError::Transport(e.into()))?;
544
545        sink.write_all(&json)
546            .await
547            .map_err(|e| ServerError::Transport(e.into()))?;
548
549        sink.finish()
550            .map_err(|e| ServerError::Transport(e.into()))?;
551
552        let response = stream
553            .read_to_end(IROH_MAX_RESPONSE_BYTES)
554            .await
555            .map_err(|e| ServerError::Transport(e.into()))?;
556
557        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
558            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
559        match StatusCode::from_u16(response.status).map_err(|e| {
560            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
561        })? {
562            StatusCode::OK => Ok(response.body),
563            status => Err(ServerError::ServerError(anyhow::anyhow!(
564                "Server returned status code: {}",
565                status
566            ))),
567        }
568    }
569}