fedimint_api_client/api/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
10use fedimint_core::module::{
11    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::task::spawn;
14use fedimint_core::util::{FmtCompact as _, SafeUrl};
15use fedimint_logging::LOG_NET_IROH;
16use futures::Future;
17use futures::stream::{FuturesUnordered, StreamExt};
18use iroh::discovery::pkarr::PkarrResolver;
19use iroh::endpoint::Connection;
20use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
21use iroh_base::ticket::NodeTicket;
22use iroh_next::Watcher as _;
23use serde_json::Value;
24use tracing::{debug, trace, warn};
25use url::Url;
26
27use super::{DynGuaridianConnection, IGuardianConnection, PeerError, PeerResult};
28
29#[derive(Clone)]
30pub(crate) struct IrohConnector {
31    stable: iroh::endpoint::Endpoint,
32    next: Option<iroh_next::endpoint::Endpoint>,
33
34    /// List of overrides to use when attempting to connect to given
35    /// `NodeId`
36    ///
37    /// This is useful for testing, or forcing non-default network
38    /// connectivity.
39    connection_overrides: BTreeMap<NodeId, NodeAddr>,
40}
41
42impl fmt::Debug for IrohConnector {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("IrohEndpoint")
45            .field("stable-id", &self.stable.node_id())
46            .field(
47                "next-id",
48                &self.next.as_ref().map(iroh_next::Endpoint::node_id),
49            )
50            .finish_non_exhaustive()
51    }
52}
53
54impl IrohConnector {
55    pub async fn new(
56        iroh_dns: Option<SafeUrl>,
57        iroh_enable_dht: bool,
58        iroh_enable_next: bool,
59    ) -> anyhow::Result<Self> {
60        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
61        let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
62
63        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
64            s = s.with_connection_override(k, v.into());
65        }
66
67        Ok(s)
68    }
69
70    pub async fn new_no_overrides(
71        iroh_dns: Option<SafeUrl>,
72        iroh_enable_dht: bool,
73        iroh_enable_next: bool,
74    ) -> anyhow::Result<Self> {
75        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
76            || {
77                FM_IROH_DNS_FEDIMINT_PROD
78                    .into_iter()
79                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
80                    .collect()
81            },
82            |url| vec![url.to_unsafe()],
83        );
84
85        let endpoint_stable = Box::pin({
86            let iroh_dns_servers = iroh_dns_servers.clone();
87            async {
88                let mut builder = Endpoint::builder();
89
90                for iroh_dns in iroh_dns_servers {
91                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
92                }
93
94                // As a client, we don't need to register on any relays
95                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
96
97                #[cfg(not(target_family = "wasm"))]
98                if iroh_enable_dht {
99                    builder = builder.discovery_dht();
100                }
101
102                // instead of `.discovery_n0`, which brings publisher we don't want
103                {
104                    #[cfg(target_family = "wasm")]
105                    {
106                        builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
107                    }
108
109                    #[cfg(not(target_family = "wasm"))]
110                    {
111                        builder = builder.add_discovery(move |_| {
112                            Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
113                        });
114                    }
115                }
116
117                let endpoint = builder.bind().await?;
118                debug!(
119                    target: LOG_NET_IROH,
120                    node_id = %endpoint.node_id(),
121                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
122                    "Iroh api client endpoint (stable)"
123                );
124                Ok::<_, anyhow::Error>(endpoint)
125            }
126        });
127        let endpoint_next = Box::pin(async {
128            let mut builder = iroh_next::Endpoint::builder();
129
130            for iroh_dns in iroh_dns_servers {
131                builder = builder.add_discovery(
132                    iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
133                );
134            }
135
136            // As a client, we don't need to register on any relays
137            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
138
139            #[cfg(not(target_family = "wasm"))]
140            if iroh_enable_dht {
141                builder = builder.discovery_dht();
142            }
143
144            // instead of `.discovery_n0`, which brings publisher we don't want
145            {
146                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
147                #[cfg(target_family = "wasm")]
148                {
149                    builder =
150                        builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
151                }
152                // Resolve using DNS queries outside browsers.
153                #[cfg(not(target_family = "wasm"))]
154                {
155                    builder =
156                        builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
157                }
158            }
159
160            let endpoint = builder.bind().await?;
161            debug!(
162                target: LOG_NET_IROH,
163                node_id = %endpoint.node_id(),
164                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
165                "Iroh api client endpoint (next)"
166            );
167            Ok(endpoint)
168        });
169
170        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
171            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
172            (s, Some(n))
173        } else {
174            (endpoint_stable.await?, None)
175        };
176
177        Ok(Self {
178            stable: endpoint_stable,
179            next: endpoint_next,
180            connection_overrides: BTreeMap::new(),
181        })
182    }
183
184    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
185        self.connection_overrides.insert(node, addr);
186        self
187    }
188
189    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
190        if url.scheme() != "iroh" {
191            bail!(
192                "Unsupported scheme: {}, passed to iroh endpoint handler",
193                url.scheme()
194            );
195        }
196        let host = url.host_str().context("Missing host string in Iroh URL")?;
197
198        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
199
200        Ok(node_id)
201    }
202}
203
204#[async_trait::async_trait]
205impl crate::api::Connector for IrohConnector {
206    async fn connect_guardian(
207        &self,
208        url: &SafeUrl,
209        api_secret: Option<&str>,
210    ) -> PeerResult<DynGuaridianConnection> {
211        if api_secret.is_some() {
212            // There seem to be no way to pass secret over current Iroh calling
213            // convention
214            PeerError::Connection(anyhow::format_err!(
215                "Iroh api secrets currently not supported"
216            ));
217        }
218        let node_id = Self::node_id_from_url(url).map_err(|source| PeerError::InvalidPeerUrl {
219            source,
220            url: url.to_owned(),
221        })?;
222        let mut futures = FuturesUnordered::<
223            Pin<
224                Box<dyn Future<Output = (PeerResult<DynGuaridianConnection>, &'static str)> + Send>,
225            >,
226        >::new();
227        let connection_override = self.connection_overrides.get(&node_id).cloned();
228
229        let self_clone = self.clone();
230        futures.push(Box::pin({
231            let connection_override = connection_override.clone();
232            async move {
233                (
234                    self_clone
235                        .make_new_connection_stable(node_id, connection_override)
236                        .await
237                        .map(super::IGuardianConnection::into_dyn),
238                    "stable",
239                )
240            }
241        }));
242
243        if let Some(endpoint_next) = &self.next {
244            let self_clone = self.clone();
245            let endpoint_next = endpoint_next.clone();
246            futures.push(Box::pin(async move {
247                (
248                    self_clone
249                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
250                        .await
251                        .map(super::IGuardianConnection::into_dyn),
252                    "next",
253                )
254            }));
255        }
256
257        // Remember last error, so we have something to return if
258        // neither connection works.
259        let mut prev_err = None;
260
261        // Loop until first success, or running out of connections.
262        while let Some((result, iroh_stack)) = futures.next().await {
263            match result {
264                Ok(connection) => return Ok(connection),
265                Err(err) => {
266                    warn!(
267                        target: LOG_NET_IROH,
268                        err = %err.fmt_compact(),
269                        %iroh_stack,
270                        "Join error in iroh connection task"
271                    );
272                    prev_err = Some(err);
273                }
274            }
275        }
276
277        Err(prev_err.unwrap_or_else(|| {
278            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
279        }))
280    }
281}
282
283impl IrohConnector {
284    #[cfg(not(target_family = "wasm"))]
285    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
286        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
287            #[allow(clippy::let_underscore_future)]
288            let _ = spawn("iroh connection (stable)", async move {
289                if let Ok(conn_type) = conn_type_watcher.get() {
290                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
291                }
292                while let Ok(event) = conn_type_watcher.updated().await {
293                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
294                }
295            });
296        }
297    }
298
299    #[cfg(not(target_family = "wasm"))]
300    fn spawn_connection_monitoring_next(
301        endpoint: &iroh_next::Endpoint,
302        node_addr: &iroh_next::NodeAddr,
303    ) {
304        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
305            let node_id = node_addr.node_id;
306            #[allow(clippy::let_underscore_future)]
307            let _ = spawn("iroh connection (next)", async move {
308                if let Ok(conn_type) = conn_type_watcher.get() {
309                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
310                }
311                while let Ok(event) = conn_type_watcher.updated().await {
312                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
313                }
314            });
315        }
316    }
317    async fn make_new_connection_stable(
318        &self,
319        node_id: NodeId,
320        node_addr: Option<NodeAddr>,
321    ) -> PeerResult<Connection> {
322        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
323        let conn = match node_addr.clone() {
324                    Some(node_addr) => {
325                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
326                        let conn = self.stable
327                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
328                            .await;
329
330                        #[cfg(not(target_family = "wasm"))]
331                        if conn.is_ok() {
332                            Self::spawn_connection_monitoring_stable(&self.stable, node_id);
333                        }
334                        conn
335                    }
336                    None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
337                }.map_err(PeerError::Connection)?;
338
339        Ok(conn)
340    }
341
342    async fn make_new_connection_next(
343        &self,
344        endpoint_next: &iroh_next::Endpoint,
345        node_id: NodeId,
346        node_addr: Option<NodeAddr>,
347    ) -> PeerResult<iroh_next::endpoint::Connection> {
348        let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
349
350        let endpoint_next = endpoint_next.clone();
351
352        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
353        let conn = match node_addr.clone() {
354                    Some(node_addr) => {
355                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
356                        let node_addr = node_addr_stable_to_next(&node_addr);
357                        let conn = endpoint_next
358                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
359                            .await;
360
361                        #[cfg(not(target_family = "wasm"))]
362                        if conn.is_ok() {
363                            Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
364                        }
365
366                        conn
367                    }
368                    None => endpoint_next.connect(
369                        next_node_id,
370                        FEDIMINT_API_ALPN
371                    ).await,
372                }
373                .map_err(Into::into)
374                .map_err(PeerError::Connection)?;
375
376        Ok(conn)
377    }
378}
379
380fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
381    iroh_next::NodeAddr {
382        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
383        relay_url: stable
384            .relay_url
385            .as_ref()
386            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
387        direct_addresses: stable.direct_addresses.clone(),
388    }
389}
390#[async_trait]
391impl IGuardianConnection for Connection {
392    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
393        let json = serde_json::to_vec(&IrohApiRequest { method, request })
394            .expect("Serialization to vec can't fail");
395
396        let (mut sink, mut stream) = self
397            .open_bi()
398            .await
399            .map_err(|e| PeerError::Transport(e.into()))?;
400
401        sink.write_all(&json)
402            .await
403            .map_err(|e| PeerError::Transport(e.into()))?;
404
405        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
406
407        let response = stream
408            .read_to_end(1_000_000)
409            .await
410            .map_err(|e| PeerError::Transport(e.into()))?;
411
412        // TODO: We should not be serializing Results on the wire
413        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
414            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
415
416        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
417    }
418
419    async fn await_disconnection(&self) {
420        self.closed().await;
421    }
422
423    fn is_connected(&self) -> bool {
424        self.close_reason().is_none()
425    }
426}
427
428#[async_trait]
429impl IGuardianConnection for iroh_next::endpoint::Connection {
430    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
431        let json = serde_json::to_vec(&IrohApiRequest { method, request })
432            .expect("Serialization to vec can't fail");
433
434        let (mut sink, mut stream) = self
435            .open_bi()
436            .await
437            .map_err(|e| PeerError::Transport(e.into()))?;
438
439        sink.write_all(&json)
440            .await
441            .map_err(|e| PeerError::Transport(e.into()))?;
442
443        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
444
445        let response = stream
446            .read_to_end(1_000_000)
447            .await
448            .map_err(|e| PeerError::Transport(e.into()))?;
449
450        // TODO: We should not be serializing Results on the wire
451        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
452            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
453
454        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
455    }
456
457    async fn await_disconnection(&self) {
458        self.closed().await;
459    }
460
461    fn is_connected(&self) -> bool {
462        self.close_reason().is_none()
463    }
464}