fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::{
9    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
10    parse_kv_list_from_env,
11};
12use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
13use fedimint_core::module::{
14    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
15    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
16};
17use fedimint_core::task::spawn;
18use fedimint_core::util::{FmtCompact as _, SafeUrl};
19use fedimint_core::{apply, async_trait_maybe_send};
20use fedimint_logging::LOG_NET_IROH;
21use futures::Future;
22use futures::stream::{FuturesUnordered, StreamExt};
23use iroh::discovery::pkarr::PkarrResolver;
24use iroh::endpoint::Connection;
25use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
26use iroh_base::ticket::NodeTicket;
27use iroh_next::Watcher as _;
28use reqwest::{Method, StatusCode};
29use serde_json::Value;
30use tracing::{debug, trace, warn};
31use url::Url;
32
33use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
34use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
35
36#[derive(Clone)]
37pub(crate) struct IrohConnector {
38    stable: iroh::endpoint::Endpoint,
39    next: Option<iroh_next::endpoint::Endpoint>,
40
41    /// List of overrides to use when attempting to connect to given
42    /// `NodeId`
43    ///
44    /// This is useful for testing, or forcing non-default network
45    /// connectivity.
46    connection_overrides: BTreeMap<NodeId, NodeAddr>,
47}
48
49impl fmt::Debug for IrohConnector {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("IrohEndpoint")
52            .field("stable-id", &self.stable.node_id())
53            .field(
54                "next-id",
55                &self.next.as_ref().map(iroh_next::Endpoint::node_id),
56            )
57            .finish_non_exhaustive()
58    }
59}
60
61impl IrohConnector {
62    pub async fn new(
63        iroh_dns: Option<SafeUrl>,
64        iroh_enable_dht: bool,
65        iroh_enable_next: bool,
66    ) -> anyhow::Result<Self> {
67        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
68        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
69        let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
70
71        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
72            s = s.with_connection_override(k, v.into());
73        }
74
75        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
76            s = s.with_connection_override(k, v.into());
77        }
78
79        Ok(s)
80    }
81
82    #[allow(clippy::too_many_lines)]
83    pub async fn new_no_overrides(
84        iroh_dns: Option<SafeUrl>,
85        iroh_enable_dht: bool,
86        iroh_enable_next: bool,
87    ) -> anyhow::Result<Self> {
88        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
89            || {
90                FM_IROH_DNS_FEDIMINT_PROD
91                    .into_iter()
92                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
93                    .collect()
94            },
95            |url| vec![url.to_unsafe()],
96        );
97
98        let endpoint_stable = Box::pin({
99            let iroh_dns_servers = iroh_dns_servers.clone();
100            async {
101                let mut builder = Endpoint::builder();
102
103                for iroh_dns in iroh_dns_servers {
104                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
105                }
106
107                // As a client, we don't need to register on any relays
108                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
109
110                #[cfg(not(target_family = "wasm"))]
111                if iroh_enable_dht {
112                    builder = builder.discovery_dht();
113                }
114
115                // instead of `.discovery_n0`, which brings publisher we don't want
116                {
117                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
118                        #[cfg(target_family = "wasm")]
119                        {
120                            builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
121                        }
122                    } else {
123                        warn!(
124                            target: LOG_NET_IROH,
125                            "Iroh pkarr resolver is disabled"
126                        );
127                    }
128
129                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
130                        #[cfg(not(target_family = "wasm"))]
131                        {
132                            builder = builder.add_discovery(move |_| {
133                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
134                            });
135                        }
136                    } else {
137                        warn!(
138                            target: LOG_NET_IROH,
139                            "Iroh n0 discovery is disabled"
140                        );
141                    }
142                }
143
144                let endpoint = builder.bind().await?;
145                debug!(
146                    target: LOG_NET_IROH,
147                    node_id = %endpoint.node_id(),
148                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
149                    "Iroh api client endpoint (stable)"
150                );
151                Ok::<_, anyhow::Error>(endpoint)
152            }
153        });
154        let endpoint_next = Box::pin(async {
155            let mut builder = iroh_next::Endpoint::builder();
156
157            for iroh_dns in iroh_dns_servers {
158                builder = builder.add_discovery(
159                    iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
160                );
161            }
162
163            // As a client, we don't need to register on any relays
164            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
165
166            #[cfg(not(target_family = "wasm"))]
167            if iroh_enable_dht {
168                builder = builder.discovery_dht();
169            }
170
171            // instead of `.discovery_n0`, which brings publisher we don't want
172            {
173                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
174                #[cfg(target_family = "wasm")]
175                {
176                    builder =
177                        builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
178                }
179                // Resolve using DNS queries outside browsers.
180                #[cfg(not(target_family = "wasm"))]
181                {
182                    builder =
183                        builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
184                }
185            }
186
187            let endpoint = builder.bind().await?;
188            debug!(
189                target: LOG_NET_IROH,
190                node_id = %endpoint.node_id(),
191                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
192                "Iroh api client endpoint (next)"
193            );
194            Ok(endpoint)
195        });
196
197        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
198            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
199            (s, Some(n))
200        } else {
201            (endpoint_stable.await?, None)
202        };
203
204        Ok(Self {
205            stable: endpoint_stable,
206            next: endpoint_next,
207            connection_overrides: BTreeMap::new(),
208        })
209    }
210
211    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
212        self.connection_overrides.insert(node, addr);
213        self
214    }
215
216    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
217        if url.scheme() != "iroh" {
218            bail!(
219                "Unsupported scheme: {}, passed to iroh endpoint handler",
220                url.scheme()
221            );
222        }
223        let host = url.host_str().context("Missing host string in Iroh URL")?;
224
225        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
226
227        Ok(node_id)
228    }
229}
230
231#[async_trait::async_trait]
232impl crate::Connector for IrohConnector {
233    async fn connect_guardian(
234        &self,
235        url: &SafeUrl,
236        api_secret: Option<&str>,
237    ) -> ServerResult<DynGuaridianConnection> {
238        if api_secret.is_some() {
239            // There seem to be no way to pass secret over current Iroh calling
240            // convention
241            ServerError::Connection(anyhow::format_err!(
242                "Iroh api secrets currently not supported"
243            ));
244        }
245        let node_id =
246            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
247                source,
248                url: url.to_owned(),
249            })?;
250        let mut futures = FuturesUnordered::<
251            Pin<
252                Box<
253                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
254                        + Send,
255                >,
256            >,
257        >::new();
258        let connection_override = self.connection_overrides.get(&node_id).cloned();
259
260        let self_clone = self.clone();
261        futures.push(Box::pin({
262            let connection_override = connection_override.clone();
263            async move {
264                (
265                    self_clone
266                        .make_new_connection_stable(node_id, connection_override)
267                        .await
268                        .map(super::IGuardianConnection::into_dyn),
269                    "stable",
270                )
271            }
272        }));
273
274        if let Some(endpoint_next) = &self.next {
275            let self_clone = self.clone();
276            let endpoint_next = endpoint_next.clone();
277            futures.push(Box::pin(async move {
278                (
279                    self_clone
280                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
281                        .await
282                        .map(super::IGuardianConnection::into_dyn),
283                    "next",
284                )
285            }));
286        }
287
288        // Remember last error, so we have something to return if
289        // neither connection works.
290        let mut prev_err = None;
291
292        // Loop until first success, or running out of connections.
293        while let Some((result, iroh_stack)) = futures.next().await {
294            match result {
295                Ok(connection) => return Ok(connection),
296                Err(err) => {
297                    warn!(
298                        target: LOG_NET_IROH,
299                        err = %err.fmt_compact(),
300                        %iroh_stack,
301                        "Join error in iroh connection task"
302                    );
303                    prev_err = Some(err);
304                }
305            }
306        }
307
308        Err(prev_err.unwrap_or_else(|| {
309            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
310        }))
311    }
312
313    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
314        let node_id = Self::node_id_from_url(url)?;
315        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
316            let conn = self
317                .stable
318                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
319                .await?;
320
321            #[cfg(not(target_family = "wasm"))]
322            Self::spawn_connection_monitoring_stable(&self.stable, node_id);
323
324            Ok(IGatewayConnection::into_dyn(conn))
325        } else {
326            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
327            Ok(IGatewayConnection::into_dyn(conn))
328        }
329    }
330}
331
332impl IrohConnector {
333    #[cfg(not(target_family = "wasm"))]
334    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
335        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
336            #[allow(clippy::let_underscore_future)]
337            let _ = spawn("iroh connection (stable)", async move {
338                if let Ok(conn_type) = conn_type_watcher.get() {
339                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
340                }
341                while let Ok(event) = conn_type_watcher.updated().await {
342                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
343                }
344            });
345        }
346    }
347
348    #[cfg(not(target_family = "wasm"))]
349    fn spawn_connection_monitoring_next(
350        endpoint: &iroh_next::Endpoint,
351        node_addr: &iroh_next::NodeAddr,
352    ) {
353        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
354            let node_id = node_addr.node_id;
355            #[allow(clippy::let_underscore_future)]
356            let _ = spawn("iroh connection (next)", async move {
357                if let Ok(conn_type) = conn_type_watcher.get() {
358                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
359                }
360                while let Ok(event) = conn_type_watcher.updated().await {
361                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
362                }
363            });
364        }
365    }
366
367    async fn make_new_connection_stable(
368        &self,
369        node_id: NodeId,
370        node_addr: Option<NodeAddr>,
371    ) -> ServerResult<Connection> {
372        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
373        let conn = match node_addr.clone() {
374            Some(node_addr) => {
375                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
376                let conn = self.stable
377                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
378                    .await;
379
380                #[cfg(not(target_family = "wasm"))]
381                if conn.is_ok() {
382                    Self::spawn_connection_monitoring_stable(&self.stable, node_id);
383                }
384                conn
385            }
386            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
387        }.map_err(ServerError::Connection)?;
388
389        Ok(conn)
390    }
391
392    async fn make_new_connection_next(
393        &self,
394        endpoint_next: &iroh_next::Endpoint,
395        node_id: NodeId,
396        node_addr: Option<NodeAddr>,
397    ) -> ServerResult<iroh_next::endpoint::Connection> {
398        let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
399
400        let endpoint_next = endpoint_next.clone();
401
402        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
403        let conn = match node_addr.clone() {
404            Some(node_addr) => {
405                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
406                let node_addr = node_addr_stable_to_next(&node_addr);
407                let conn = endpoint_next
408                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
409                    .await;
410
411                #[cfg(not(target_family = "wasm"))]
412                if conn.is_ok() {
413                    Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
414                }
415
416                conn
417            }
418            None => endpoint_next.connect(
419                next_node_id,
420                FEDIMINT_API_ALPN
421            ).await,
422        }
423        .map_err(Into::into)
424        .map_err(ServerError::Connection)?;
425
426        Ok(conn)
427    }
428}
429
430fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
431    iroh_next::NodeAddr {
432        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
433        relay_url: stable
434            .relay_url
435            .as_ref()
436            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
437        direct_addresses: stable.direct_addresses.clone(),
438    }
439}
440
441#[apply(async_trait_maybe_send!)]
442impl IConnection for Connection {
443    async fn await_disconnection(&self) {
444        self.closed().await;
445    }
446
447    fn is_connected(&self) -> bool {
448        self.close_reason().is_none()
449    }
450}
451
452#[async_trait]
453impl IGuardianConnection for Connection {
454    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
455        let json = serde_json::to_vec(&IrohApiRequest { method, request })
456            .expect("Serialization to vec can't fail");
457
458        let (mut sink, mut stream) = self
459            .open_bi()
460            .await
461            .map_err(|e| ServerError::Transport(e.into()))?;
462
463        sink.write_all(&json)
464            .await
465            .map_err(|e| ServerError::Transport(e.into()))?;
466
467        sink.finish()
468            .map_err(|e| ServerError::Transport(e.into()))?;
469
470        let response = stream
471            .read_to_end(1_000_000)
472            .await
473            .map_err(|e| ServerError::Transport(e.into()))?;
474
475        // TODO: We should not be serializing Results on the wire
476        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
477            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
478
479        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
480    }
481}
482
483#[apply(async_trait_maybe_send!)]
484impl IConnection for iroh_next::endpoint::Connection {
485    async fn await_disconnection(&self) {
486        self.closed().await;
487    }
488
489    fn is_connected(&self) -> bool {
490        self.close_reason().is_none()
491    }
492}
493
494#[async_trait]
495impl IGuardianConnection for iroh_next::endpoint::Connection {
496    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
497        let json = serde_json::to_vec(&IrohApiRequest { method, request })
498            .expect("Serialization to vec can't fail");
499
500        let (mut sink, mut stream) = self
501            .open_bi()
502            .await
503            .map_err(|e| ServerError::Transport(e.into()))?;
504
505        sink.write_all(&json)
506            .await
507            .map_err(|e| ServerError::Transport(e.into()))?;
508
509        sink.finish()
510            .map_err(|e| ServerError::Transport(e.into()))?;
511
512        let response = stream
513            .read_to_end(1_000_000)
514            .await
515            .map_err(|e| ServerError::Transport(e.into()))?;
516
517        // TODO: We should not be serializing Results on the wire
518        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
519            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
520
521        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
522    }
523}
524
525#[apply(async_trait_maybe_send!)]
526impl IGatewayConnection for Connection {
527    async fn request(
528        &self,
529        password: Option<String>,
530        _method: Method,
531        route: &str,
532        payload: Option<Value>,
533    ) -> ServerResult<Value> {
534        let iroh_request = IrohGatewayRequest {
535            route: route.to_string(),
536            params: payload,
537            password,
538        };
539        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
540
541        let (mut sink, mut stream) = self
542            .open_bi()
543            .await
544            .map_err(|e| ServerError::Transport(e.into()))?;
545
546        sink.write_all(&json)
547            .await
548            .map_err(|e| ServerError::Transport(e.into()))?;
549
550        sink.finish()
551            .map_err(|e| ServerError::Transport(e.into()))?;
552
553        let response = stream
554            .read_to_end(1_000_000)
555            .await
556            .map_err(|e| ServerError::Transport(e.into()))?;
557
558        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
559            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
560        match StatusCode::from_u16(response.status).map_err(|e| {
561            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
562        })? {
563            StatusCode::OK => Ok(response.body),
564            status => Err(ServerError::ServerError(anyhow::anyhow!(
565                "Server returned status code: {}",
566                status
567            ))),
568        }
569    }
570}