fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::{
9    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
10    parse_kv_list_from_env,
11};
12use fedimint_core::module::{
13    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
14    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
15};
16use fedimint_core::task::spawn;
17use fedimint_core::util::{FmtCompact as _, SafeUrl};
18use fedimint_core::{apply, async_trait_maybe_send};
19use fedimint_logging::LOG_NET_IROH;
20use futures::Future;
21use futures::stream::{FuturesUnordered, StreamExt};
22use iroh::discovery::pkarr::PkarrResolver;
23use iroh::endpoint::Connection;
24use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
25use iroh_base::ticket::NodeTicket;
26use iroh_next::Watcher as _;
27use reqwest::{Method, StatusCode};
28use serde_json::Value;
29use tracing::{debug, trace, warn};
30
31use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
32use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
33
34#[derive(Clone)]
35pub(crate) struct IrohConnector {
36    stable: iroh::endpoint::Endpoint,
37    next: Option<iroh_next::endpoint::Endpoint>,
38
39    /// List of overrides to use when attempting to connect to given
40    /// `NodeId`
41    ///
42    /// This is useful for testing, or forcing non-default network
43    /// connectivity.
44    connection_overrides: BTreeMap<NodeId, NodeAddr>,
45}
46
47impl fmt::Debug for IrohConnector {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.debug_struct("IrohEndpoint")
50            .field("stable-id", &self.stable.node_id())
51            .field(
52                "next-id",
53                &self.next.as_ref().map(iroh_next::Endpoint::node_id),
54            )
55            .finish_non_exhaustive()
56    }
57}
58
59impl IrohConnector {
60    pub async fn new(
61        iroh_dns: Option<SafeUrl>,
62        iroh_enable_dht: bool,
63        iroh_enable_next: bool,
64    ) -> anyhow::Result<Self> {
65        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
66        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
67        let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
68
69        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
70            s = s.with_connection_override(k, v.into());
71        }
72
73        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
74            s = s.with_connection_override(k, v.into());
75        }
76
77        Ok(s)
78    }
79
80    #[allow(clippy::too_many_lines)]
81    pub async fn new_no_overrides(
82        iroh_dns: Option<SafeUrl>,
83        iroh_enable_dht: bool,
84        iroh_enable_next: bool,
85    ) -> anyhow::Result<Self> {
86        let endpoint_stable = Box::pin({
87            let iroh_dns = iroh_dns.clone();
88            async {
89                let mut builder = Endpoint::builder();
90
91                if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
92                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
93                }
94
95                // As a client, we don't need to register on any relays
96                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
97
98                #[cfg(not(target_family = "wasm"))]
99                if iroh_enable_dht {
100                    builder = builder.discovery_dht();
101                }
102
103                // instead of `.discovery_n0`, which brings publisher we don't want
104                {
105                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
106                        #[cfg(target_family = "wasm")]
107                        {
108                            builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
109                        }
110                    } else {
111                        warn!(
112                            target: LOG_NET_IROH,
113                            "Iroh pkarr resolver is disabled"
114                        );
115                    }
116
117                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
118                        #[cfg(not(target_family = "wasm"))]
119                        {
120                            builder = builder.add_discovery(move |_| {
121                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
122                            });
123                        }
124                    } else {
125                        warn!(
126                            target: LOG_NET_IROH,
127                            "Iroh n0 discovery is disabled"
128                        );
129                    }
130                }
131
132                let endpoint = builder.bind().await?;
133                debug!(
134                    target: LOG_NET_IROH,
135                    node_id = %endpoint.node_id(),
136                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
137                    "Iroh api client endpoint (stable)"
138                );
139                Ok::<_, anyhow::Error>(endpoint)
140            }
141        });
142        let endpoint_next = Box::pin(async {
143            let mut builder = iroh_next::Endpoint::builder();
144
145            if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
146                builder = builder.add_discovery(
147                    iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
148                );
149            }
150
151            // As a client, we don't need to register on any relays
152            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
153
154            #[cfg(not(target_family = "wasm"))]
155            if iroh_enable_dht {
156                builder = builder.discovery_dht();
157            }
158
159            // instead of `.discovery_n0`, which brings publisher we don't want
160            {
161                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
162                #[cfg(target_family = "wasm")]
163                {
164                    builder =
165                        builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
166                }
167                // Resolve using DNS queries outside browsers.
168                #[cfg(not(target_family = "wasm"))]
169                {
170                    builder =
171                        builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
172                }
173            }
174
175            let endpoint = builder.bind().await?;
176            debug!(
177                target: LOG_NET_IROH,
178                node_id = %endpoint.node_id(),
179                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
180                "Iroh api client endpoint (next)"
181            );
182            Ok(endpoint)
183        });
184
185        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
186            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
187            (s, Some(n))
188        } else {
189            (endpoint_stable.await?, None)
190        };
191
192        Ok(Self {
193            stable: endpoint_stable,
194            next: endpoint_next,
195            connection_overrides: BTreeMap::new(),
196        })
197    }
198
199    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
200        self.connection_overrides.insert(node, addr);
201        self
202    }
203
204    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
205        if url.scheme() != "iroh" {
206            bail!(
207                "Unsupported scheme: {}, passed to iroh endpoint handler",
208                url.scheme()
209            );
210        }
211        let host = url.host_str().context("Missing host string in Iroh URL")?;
212
213        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
214
215        Ok(node_id)
216    }
217}
218
219#[async_trait::async_trait]
220impl crate::Connector for IrohConnector {
221    async fn connect_guardian(
222        &self,
223        url: &SafeUrl,
224        api_secret: Option<&str>,
225    ) -> ServerResult<DynGuaridianConnection> {
226        if api_secret.is_some() {
227            // There seem to be no way to pass secret over current Iroh calling
228            // convention
229            ServerError::Connection(anyhow::format_err!(
230                "Iroh api secrets currently not supported"
231            ));
232        }
233        let node_id =
234            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
235                source,
236                url: url.to_owned(),
237            })?;
238        let mut futures = FuturesUnordered::<
239            Pin<
240                Box<
241                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
242                        + Send,
243                >,
244            >,
245        >::new();
246        let connection_override = self.connection_overrides.get(&node_id).cloned();
247
248        let self_clone = self.clone();
249        futures.push(Box::pin({
250            let connection_override = connection_override.clone();
251            async move {
252                (
253                    self_clone
254                        .make_new_connection_stable(node_id, connection_override)
255                        .await
256                        .map(super::IGuardianConnection::into_dyn),
257                    "stable",
258                )
259            }
260        }));
261
262        if let Some(endpoint_next) = &self.next {
263            let self_clone = self.clone();
264            let endpoint_next = endpoint_next.clone();
265            futures.push(Box::pin(async move {
266                (
267                    self_clone
268                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
269                        .await
270                        .map(super::IGuardianConnection::into_dyn),
271                    "next",
272                )
273            }));
274        }
275
276        // Remember last error, so we have something to return if
277        // neither connection works.
278        let mut prev_err = None;
279
280        // Loop until first success, or running out of connections.
281        while let Some((result, iroh_stack)) = futures.next().await {
282            match result {
283                Ok(connection) => return Ok(connection),
284                Err(err) => {
285                    warn!(
286                        target: LOG_NET_IROH,
287                        err = %err.fmt_compact(),
288                        %iroh_stack,
289                        "Join error in iroh connection task"
290                    );
291                    prev_err = Some(err);
292                }
293            }
294        }
295
296        Err(prev_err.unwrap_or_else(|| {
297            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
298        }))
299    }
300
301    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
302        let node_id = Self::node_id_from_url(url)?;
303        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
304            let conn = self
305                .stable
306                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
307                .await?;
308
309            #[cfg(not(target_family = "wasm"))]
310            Self::spawn_connection_monitoring_stable(&self.stable, node_id);
311
312            Ok(IGatewayConnection::into_dyn(conn))
313        } else {
314            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
315            Ok(IGatewayConnection::into_dyn(conn))
316        }
317    }
318}
319
320impl IrohConnector {
321    #[cfg(not(target_family = "wasm"))]
322    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
323        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
324            #[allow(clippy::let_underscore_future)]
325            let _ = spawn("iroh connection (stable)", async move {
326                if let Ok(conn_type) = conn_type_watcher.get() {
327                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
328                }
329                while let Ok(event) = conn_type_watcher.updated().await {
330                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
331                }
332            });
333        }
334    }
335
336    #[cfg(not(target_family = "wasm"))]
337    fn spawn_connection_monitoring_next(
338        endpoint: &iroh_next::Endpoint,
339        node_addr: &iroh_next::NodeAddr,
340    ) {
341        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
342            let node_id = node_addr.node_id;
343            #[allow(clippy::let_underscore_future)]
344            let _ = spawn("iroh connection (next)", async move {
345                if let Ok(conn_type) = conn_type_watcher.get() {
346                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
347                }
348                while let Ok(event) = conn_type_watcher.updated().await {
349                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
350                }
351            });
352        }
353    }
354
355    async fn make_new_connection_stable(
356        &self,
357        node_id: NodeId,
358        node_addr: Option<NodeAddr>,
359    ) -> ServerResult<Connection> {
360        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
361        let conn = match node_addr.clone() {
362            Some(node_addr) => {
363                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
364                let conn = self.stable
365                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
366                    .await;
367
368                #[cfg(not(target_family = "wasm"))]
369                if conn.is_ok() {
370                    Self::spawn_connection_monitoring_stable(&self.stable, node_id);
371                }
372                conn
373            }
374            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
375        }.map_err(ServerError::Connection)?;
376
377        Ok(conn)
378    }
379
380    async fn make_new_connection_next(
381        &self,
382        endpoint_next: &iroh_next::Endpoint,
383        node_id: NodeId,
384        node_addr: Option<NodeAddr>,
385    ) -> ServerResult<iroh_next::endpoint::Connection> {
386        let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
387
388        let endpoint_next = endpoint_next.clone();
389
390        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
391        let conn = match node_addr.clone() {
392            Some(node_addr) => {
393                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
394                let node_addr = node_addr_stable_to_next(&node_addr);
395                let conn = endpoint_next
396                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
397                    .await;
398
399                #[cfg(not(target_family = "wasm"))]
400                if conn.is_ok() {
401                    Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
402                }
403
404                conn
405            }
406            None => endpoint_next.connect(
407                next_node_id,
408                FEDIMINT_API_ALPN
409            ).await,
410        }
411        .map_err(Into::into)
412        .map_err(ServerError::Connection)?;
413
414        Ok(conn)
415    }
416}
417
418fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
419    iroh_next::NodeAddr {
420        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
421        relay_url: stable
422            .relay_url
423            .as_ref()
424            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
425        direct_addresses: stable.direct_addresses.clone(),
426    }
427}
428
429#[apply(async_trait_maybe_send!)]
430impl IConnection for Connection {
431    async fn await_disconnection(&self) {
432        self.closed().await;
433    }
434
435    fn is_connected(&self) -> bool {
436        self.close_reason().is_none()
437    }
438}
439
440#[async_trait]
441impl IGuardianConnection for Connection {
442    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
443        let json = serde_json::to_vec(&IrohApiRequest { method, request })
444            .expect("Serialization to vec can't fail");
445
446        let (mut sink, mut stream) = self
447            .open_bi()
448            .await
449            .map_err(|e| ServerError::Transport(e.into()))?;
450
451        sink.write_all(&json)
452            .await
453            .map_err(|e| ServerError::Transport(e.into()))?;
454
455        sink.finish()
456            .map_err(|e| ServerError::Transport(e.into()))?;
457
458        let response = stream
459            .read_to_end(1_000_000)
460            .await
461            .map_err(|e| ServerError::Transport(e.into()))?;
462
463        // TODO: We should not be serializing Results on the wire
464        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
465            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
466
467        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
468    }
469}
470
471#[apply(async_trait_maybe_send!)]
472impl IConnection for iroh_next::endpoint::Connection {
473    async fn await_disconnection(&self) {
474        self.closed().await;
475    }
476
477    fn is_connected(&self) -> bool {
478        self.close_reason().is_none()
479    }
480}
481
482#[async_trait]
483impl IGuardianConnection for iroh_next::endpoint::Connection {
484    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
485        let json = serde_json::to_vec(&IrohApiRequest { method, request })
486            .expect("Serialization to vec can't fail");
487
488        let (mut sink, mut stream) = self
489            .open_bi()
490            .await
491            .map_err(|e| ServerError::Transport(e.into()))?;
492
493        sink.write_all(&json)
494            .await
495            .map_err(|e| ServerError::Transport(e.into()))?;
496
497        sink.finish()
498            .map_err(|e| ServerError::Transport(e.into()))?;
499
500        let response = stream
501            .read_to_end(1_000_000)
502            .await
503            .map_err(|e| ServerError::Transport(e.into()))?;
504
505        // TODO: We should not be serializing Results on the wire
506        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
507            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
508
509        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
510    }
511}
512
513#[apply(async_trait_maybe_send!)]
514impl IGatewayConnection for Connection {
515    async fn request(
516        &self,
517        password: Option<String>,
518        _method: Method,
519        route: &str,
520        payload: Option<Value>,
521    ) -> ServerResult<Value> {
522        let iroh_request = IrohGatewayRequest {
523            route: route.to_string(),
524            params: payload,
525            password,
526        };
527        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
528
529        let (mut sink, mut stream) = self
530            .open_bi()
531            .await
532            .map_err(|e| ServerError::Transport(e.into()))?;
533
534        sink.write_all(&json)
535            .await
536            .map_err(|e| ServerError::Transport(e.into()))?;
537
538        sink.finish()
539            .map_err(|e| ServerError::Transport(e.into()))?;
540
541        let response = stream
542            .read_to_end(1_000_000)
543            .await
544            .map_err(|e| ServerError::Transport(e.into()))?;
545
546        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
547            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
548        match StatusCode::from_u16(response.status).map_err(|e| {
549            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
550        })? {
551            StatusCode::OK => Ok(response.body),
552            status => Err(ServerError::ServerError(anyhow::anyhow!(
553                "Server returned status code: {}",
554                status
555            ))),
556        }
557    }
558}