Skip to main content

fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context, bail};
9use async_trait::async_trait;
10use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
11use fedimint_core::envs::{
12    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
13    parse_kv_list_from_env,
14};
15use fedimint_core::module::{
16    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
17    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
18};
19use fedimint_core::net::iroh::{IROH_IDLE_TIMEOUT, IROH_KEEP_ALIVE_INTERVAL};
20
21/// The maximum number of bytes we are willing to buffer when reading an API
22/// response from an iroh QUIC stream. This must be large enough to accommodate
23/// the largest possible signed session outcome. A session can contain up to
24/// `broadcast_rounds_per_session` (default 3600) rounds, each peer produces one
25/// unit per round, and each unit can be up to `ALEPH_BFT_UNIT_BYTE_LIMIT`
26/// bytes. The response is JSON-serialized which hex-encodes the consensus
27/// bytes, roughly doubling the size. We use 2x the raw max as a conservative
28/// upper bound. For a 4-peer federation this is ~1.44 GB.
29const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
30
31/// Wall-clock budget for a single iroh API request to make it through the QUIC
32/// bi-stream (open + write + finish + read response). If exceeded we close the
33/// underlying [`Connection`], which causes [`IConnection::is_connected`] to
34/// return false on the next pool lookup so a fresh connection is established
35/// for the retry. Used for endpoints that respond promptly (`block_count`,
36/// `status`, etc).
37const IROH_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(60);
38
39/// Wall-clock budget for an iroh API request to a server-side long-poll
40/// endpoint (`await_*` / `wait_*`). These wait on the server until an event
41/// fires (block height reached, contract cancelled, etc.) before responding,
42/// so they need a generous bound. Set well above realistic mainnet block
43/// intervals; if a long-poll legitimately needs longer than this the upstream
44/// `request_current_consensus_retry` loop will reconnect and retry.
45const IROH_REQUEST_TIMEOUT_LONG_POLL: Duration = Duration::from_secs(60 * 60);
46
47/// Application-level QUIC error code we use when closing a [`Connection`]
48/// after a request timeout. Recorded by the peer as the close reason; chosen
49/// arbitrarily but stable across stable and `iroh_next` impls so the two
50/// emit identical telemetry. The value 1 distinguishes us from a graceful
51/// close (0).
52const IROH_REQUEST_TIMEOUT_ERROR_CODE: u32 = 1;
53const IROH_REQUEST_TIMEOUT_ERROR_REASON: &[u8] = b"request timeout";
54
55/// Request timeout strategy: long-poll endpoints (`await_*` / `wait_*`)
56/// get the long bound, everything else gets the default. The string match
57/// is a heuristic; it covers all currently-defined fedimint long-poll
58/// endpoints and stays correct if new ones follow the existing naming
59/// convention. False positives (a non-long-poll endpoint that happens to
60/// match the prefix) just give that one method a longer leash; the worse
61/// case is a false negative — a long-poll method that doesn't match
62/// either prefix would get the 60s default and fail fast on legitimate
63/// waits, but the upstream retry loop would reconnect and try again.
64fn request_timeout_for_method(method: &ApiMethod) -> Duration {
65    let name = match method {
66        ApiMethod::Core(name) => name.as_str(),
67        ApiMethod::Module(_, name) => name.as_str(),
68    };
69    if name.starts_with("await_") || name.starts_with("wait_") {
70        IROH_REQUEST_TIMEOUT_LONG_POLL
71    } else {
72        IROH_REQUEST_TIMEOUT_DEFAULT
73    }
74}
75use fedimint_core::task::spawn;
76use fedimint_core::util::{FmtCompact as _, SafeUrl};
77use fedimint_core::{apply, async_trait_maybe_send};
78use fedimint_logging::LOG_NET_IROH;
79use futures::Future;
80use futures::stream::{FuturesUnordered, StreamExt};
81use iroh::discovery::pkarr::PkarrResolver;
82use iroh::endpoint::Connection;
83use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
84use iroh_base::ticket::NodeTicket;
85use reqwest::{Method, StatusCode};
86use serde_json::Value;
87use tokio::sync::watch;
88use tracing::{debug, trace, warn};
89
90use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
91use crate::{Connectivity, DynGatewayConnection, IConnection, IGatewayConnection};
92
93#[derive(Clone)]
94pub(crate) struct IrohConnector {
95    stable: iroh::endpoint::Endpoint,
96    next: Option<iroh_next::endpoint::Endpoint>,
97
98    /// List of overrides to use when attempting to connect to given
99    /// `NodeId`
100    ///
101    /// This is useful for testing, or forcing non-default network
102    /// connectivity.
103    connection_overrides: BTreeMap<NodeId, NodeAddr>,
104
105    /// Registry-owned signal bumped whenever any per-connection monitoring
106    /// task observes a transport-level path change (e.g. iroh relay →
107    /// direct). Consumers of [`crate::ConnectorRegistry`] subscribe via
108    /// [`crate::ConnectorRegistry::connectivity_change_notifier`].
109    path_change: Arc<watch::Sender<u64>>,
110}
111
112impl fmt::Debug for IrohConnector {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("IrohEndpoint")
115            .field("stable-id", &self.stable.node_id())
116            .field("next-id", &self.next.as_ref().map(iroh_next::Endpoint::id))
117            .finish_non_exhaustive()
118    }
119}
120
121impl IrohConnector {
122    pub async fn new(
123        iroh_dns: Option<SafeUrl>,
124        iroh_enable_dht: bool,
125        iroh_enable_next: bool,
126        path_change: Arc<watch::Sender<u64>>,
127    ) -> anyhow::Result<Self> {
128        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
129        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
130        let mut s =
131            Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next, path_change)
132                .await?;
133
134        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
135            s = s.with_connection_override(k, v.into());
136        }
137
138        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
139            s = s.with_connection_override(k, v.into());
140        }
141
142        Ok(s)
143    }
144
145    #[allow(clippy::too_many_lines)]
146    pub async fn new_no_overrides(
147        iroh_dns: Option<SafeUrl>,
148        iroh_enable_dht: bool,
149        iroh_enable_next: bool,
150        path_change: Arc<watch::Sender<u64>>,
151    ) -> anyhow::Result<Self> {
152        let endpoint_stable = Box::pin({
153            let iroh_dns = iroh_dns.clone();
154            async {
155                let mut builder = Endpoint::builder();
156
157                if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
158                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
159                }
160
161                // As a client, we don't need to register on any relays
162                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
163
164                #[cfg(not(target_family = "wasm"))]
165                if iroh_enable_dht {
166                    builder = builder.discovery_dht();
167                }
168
169                // Add only resolver services here; the stable n0 convenience also
170                // installs a publisher.
171                {
172                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
173                        builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
174                    } else {
175                        warn!(
176                            target: LOG_NET_IROH,
177                            "Iroh pkarr resolver is disabled"
178                        );
179                    }
180
181                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
182                        #[cfg(not(target_family = "wasm"))]
183                        {
184                            builder = builder.add_discovery(move |_| {
185                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
186                            });
187                        }
188                    } else {
189                        warn!(
190                            target: LOG_NET_IROH,
191                            "Iroh n0 discovery is disabled"
192                        );
193                    }
194                }
195
196                let endpoint = builder
197                    .transport_config(quic_transport_config())
198                    .bind()
199                    .await?;
200                debug!(
201                    target: LOG_NET_IROH,
202                    node_id = %endpoint.node_id(),
203                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
204                    "Iroh api client endpoint (stable)"
205                );
206                Ok::<_, anyhow::Error>(endpoint)
207            }
208        });
209        let endpoint_next = Box::pin(async {
210            let mut builder = iroh_next::Endpoint::builder(iroh_next::endpoint::presets::Minimal);
211
212            if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
213                builder = builder
214                    .address_lookup(iroh_next::address_lookup::PkarrResolver::builder(iroh_dns));
215            }
216
217            // As a client, we don't need to register on any relays
218            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
219
220            #[cfg(not(target_family = "wasm"))]
221            if iroh_enable_dht {
222                builder = builder
223                    .address_lookup(iroh_mainline_address_lookup::DhtAddressLookup::builder());
224            }
225
226            // Add only resolver services here; the iroh preset convenience also
227            // installs a publisher.
228            {
229                // Resolve using HTTPS requests to our DNS server's /pkarr path.
230                builder =
231                    builder.address_lookup(iroh_next::address_lookup::PkarrResolver::n0_dns());
232                // Resolve using DNS queries outside browsers.
233                #[cfg(not(target_family = "wasm"))]
234                {
235                    builder = builder
236                        .address_lookup(iroh_next::address_lookup::DnsAddressLookup::n0_dns());
237                }
238            }
239
240            let endpoint = builder
241                .transport_config(quic_transport_config_next())
242                .bind()
243                .await?;
244            debug!(
245                target: LOG_NET_IROH,
246                node_id = %endpoint.id(),
247                node_id_pkarr = %z32::encode(endpoint.id().as_bytes()),
248                "Iroh api client endpoint (next)"
249            );
250            Ok(endpoint)
251        });
252
253        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
254            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
255            (s, Some(n))
256        } else {
257            (endpoint_stable.await?, None)
258        };
259
260        Ok(Self {
261            stable: endpoint_stable,
262            next: endpoint_next,
263            connection_overrides: BTreeMap::new(),
264            path_change,
265        })
266    }
267
268    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
269        self.connection_overrides.insert(node, addr);
270        self
271    }
272
273    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
274        if url.scheme() != "iroh" {
275            bail!(
276                "Unsupported scheme: {}, passed to iroh endpoint handler",
277                url.scheme()
278            );
279        }
280        let host = url.host_str().context("Missing host string in Iroh URL")?;
281
282        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
283
284        Ok(node_id)
285    }
286}
287
288#[async_trait::async_trait]
289impl crate::Connector for IrohConnector {
290    async fn connect_guardian(
291        &self,
292        url: &SafeUrl,
293        api_secret: Option<&str>,
294    ) -> ServerResult<DynGuaridianConnection> {
295        if api_secret.is_some() {
296            // There seem to be no way to pass secret over current Iroh calling
297            // convention
298            ServerError::Connection(anyhow::format_err!(
299                "Iroh api secrets currently not supported"
300            ));
301        }
302        let node_id =
303            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
304                source,
305                url: url.to_owned(),
306            })?;
307        let mut futures = FuturesUnordered::<
308            Pin<
309                Box<
310                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
311                        + Send,
312                >,
313            >,
314        >::new();
315        let connection_override = self.connection_overrides.get(&node_id).cloned();
316
317        let self_clone = self.clone();
318        futures.push(Box::pin({
319            let connection_override = connection_override.clone();
320            async move {
321                (
322                    self_clone
323                        .make_new_connection_stable(node_id, connection_override)
324                        .await
325                        .map(super::IGuardianConnection::into_dyn),
326                    "stable",
327                )
328            }
329        }));
330
331        if let Some(endpoint_next) = &self.next {
332            let self_clone = self.clone();
333            let endpoint_next = endpoint_next.clone();
334            futures.push(Box::pin(async move {
335                (
336                    self_clone
337                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
338                        .await
339                        .map(super::IGuardianConnection::into_dyn),
340                    "next",
341                )
342            }));
343        }
344
345        // Remember last error, so we have something to return if
346        // neither connection works.
347        let mut prev_err = None;
348
349        // Loop until first success, or running out of connections.
350        while let Some((result, iroh_stack)) = futures.next().await {
351            match result {
352                Ok(connection) => return Ok(connection),
353                Err(err) => {
354                    warn!(
355                        target: LOG_NET_IROH,
356                        err = %err.fmt_compact(),
357                        %iroh_stack,
358                        "Join error in iroh connection task"
359                    );
360                    prev_err = Some(err);
361                }
362            }
363        }
364
365        Err(prev_err.unwrap_or_else(|| {
366            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
367        }))
368    }
369
370    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
371        let node_id = Self::node_id_from_url(url)?;
372        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
373            let conn = self
374                .stable
375                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
376                .await?;
377
378            #[cfg(not(target_family = "wasm"))]
379            Self::spawn_connection_monitoring_stable(
380                &self.stable,
381                node_id,
382                self.path_change.clone(),
383            );
384
385            Ok(IGatewayConnection::into_dyn(conn))
386        } else {
387            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
388            Ok(IGatewayConnection::into_dyn(conn))
389        }
390    }
391
392    fn connectivity(&self, url: &SafeUrl) -> Connectivity {
393        let Ok(node_id) = Self::node_id_from_url(url) else {
394            return Connectivity::Unknown;
395        };
396        let Ok(watcher) = self.stable.conn_type(node_id) else {
397            return Connectivity::Unknown;
398        };
399        match watcher.get() {
400            Ok(iroh::endpoint::ConnectionType::Direct(_)) => Connectivity::Direct,
401            Ok(iroh::endpoint::ConnectionType::Relay(_)) => Connectivity::Relay,
402            Ok(iroh::endpoint::ConnectionType::Mixed(..)) => Connectivity::Mixed,
403            Ok(iroh::endpoint::ConnectionType::None) | Err(_) => Connectivity::Unknown,
404        }
405    }
406}
407
408impl IrohConnector {
409    #[cfg(not(target_family = "wasm"))]
410    fn spawn_connection_monitoring_stable(
411        endpoint: &Endpoint,
412        node_id: NodeId,
413        path_change: Arc<watch::Sender<u64>>,
414    ) {
415        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
416            #[allow(clippy::let_underscore_future)]
417            let _ = spawn("iroh connection (stable)", async move {
418                if let Ok(conn_type) = conn_type_watcher.get() {
419                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
420                }
421                while let Ok(event) = conn_type_watcher.updated().await {
422                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
423                    path_change.send_modify(|c| *c = c.wrapping_add(1));
424                }
425            });
426        }
427    }
428
429    #[cfg(not(target_family = "wasm"))]
430    fn spawn_connection_monitoring_next(
431        conn: &iroh_next::endpoint::Connection,
432        node_id: iroh_next::EndpointId,
433        path_change: Arc<watch::Sender<u64>>,
434    ) {
435        let conn = conn.clone();
436        #[allow(clippy::let_underscore_future)]
437        let _ = spawn("iroh connection (next)", async move {
438            let mut paths = conn.paths_stream();
439            if let Some(paths) = paths.next().await {
440                debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths (initial)");
441            }
442            while let Some(paths) = paths.next().await {
443                debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths changed");
444                path_change.send_modify(|c| *c = c.wrapping_add(1));
445            }
446        });
447    }
448
449    async fn make_new_connection_stable(
450        &self,
451        node_id: NodeId,
452        node_addr: Option<NodeAddr>,
453    ) -> ServerResult<Connection> {
454        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
455        let conn = match node_addr.clone() {
456            Some(node_addr) => {
457                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
458                let conn = self.stable
459                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
460                    .await;
461
462                #[cfg(not(target_family = "wasm"))]
463                if conn.is_ok() {
464                    Self::spawn_connection_monitoring_stable(
465                        &self.stable,
466                        node_id,
467                        self.path_change.clone(),
468                    );
469                }
470                conn
471            }
472            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
473        }.map_err(ServerError::Connection)?;
474
475        Ok(conn)
476    }
477
478    async fn make_new_connection_next(
479        &self,
480        endpoint_next: &iroh_next::Endpoint,
481        node_id: NodeId,
482        node_addr: Option<NodeAddr>,
483    ) -> ServerResult<iroh_next::endpoint::Connection> {
484        let next_node_id =
485            iroh_next::EndpointId::from_bytes(node_id.as_bytes()).expect("Can't fail");
486
487        let endpoint_next = endpoint_next.clone();
488
489        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
490        let conn = match node_addr.clone() {
491            Some(node_addr) => {
492                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
493                let node_addr = node_addr_stable_to_next(&node_addr);
494                let conn = endpoint_next
495                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
496                    .await;
497
498                #[cfg(not(target_family = "wasm"))]
499                if let Ok(conn) = &conn {
500                    Self::spawn_connection_monitoring_next(
501                        conn,
502                        node_addr.id,
503                        self.path_change.clone(),
504                    );
505                }
506
507                conn
508            }
509            None => endpoint_next.connect(
510                next_node_id,
511                FEDIMINT_API_ALPN
512            ).await,
513        }
514        .map_err(Into::into)
515        .map_err(ServerError::Connection)?;
516
517        Ok(conn)
518    }
519}
520
521/// QUIC transport config with explicit idle timeout and keep-alive
522/// for the stable iroh endpoint.
523fn quic_transport_config() -> iroh::endpoint::TransportConfig {
524    let mut config = iroh::endpoint::TransportConfig::default();
525    config.max_idle_timeout(Some(
526        IROH_IDLE_TIMEOUT
527            .try_into()
528            .expect("idle timeout fits in IdleTimeout"),
529    ));
530    config.keep_alive_interval(Some(IROH_KEEP_ALIVE_INTERVAL));
531    config
532}
533
534/// QUIC transport config with explicit idle timeout and keep-alive
535/// for the next iroh endpoint.
536fn quic_transport_config_next() -> iroh_next::endpoint::QuicTransportConfig {
537    iroh_next::endpoint::QuicTransportConfig::builder()
538        .max_idle_timeout(Some(
539            IROH_IDLE_TIMEOUT
540                .try_into()
541                .expect("idle timeout fits in IdleTimeout"),
542        ))
543        .keep_alive_interval(IROH_KEEP_ALIVE_INTERVAL)
544        .build()
545}
546
547fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::EndpointAddr {
548    let next_node_id =
549        iroh_next::EndpointId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail");
550    let relay_addrs = stable.relay_url.iter().map(|u| {
551        iroh_next::TransportAddr::Relay(
552            iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail"),
553        )
554    });
555    let direct_addrs = stable
556        .direct_addresses
557        .iter()
558        .copied()
559        .map(iroh_next::TransportAddr::Ip);
560
561    iroh_next::EndpointAddr::from_parts(next_node_id, relay_addrs.chain(direct_addrs))
562}
563
564#[apply(async_trait_maybe_send!)]
565impl IConnection for Connection {
566    async fn await_disconnection(&self) {
567        self.closed().await;
568    }
569
570    fn is_connected(&self) -> bool {
571        self.close_reason().is_none()
572    }
573}
574
575#[async_trait]
576impl IGuardianConnection for Connection {
577    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
578        let timeout = request_timeout_for_method(&method);
579        let method_str = method.to_string();
580        let json = serde_json::to_vec(&IrohApiRequest { method, request })
581            .expect("Serialization to vec can't fail");
582
583        let result = fedimint_core::runtime::timeout(timeout, async {
584            let (mut sink, mut stream) = self
585                .open_bi()
586                .await
587                .map_err(|e| ServerError::Transport(e.into()))?;
588
589            sink.write_all(&json)
590                .await
591                .map_err(|e| ServerError::Transport(e.into()))?;
592
593            sink.finish()
594                .map_err(|e| ServerError::Transport(e.into()))?;
595
596            stream
597                .read_to_end(IROH_MAX_RESPONSE_BYTES)
598                .await
599                .map_err(|e| ServerError::Transport(e.into()))
600        })
601        .await;
602
603        let response = match result {
604            Ok(Ok(bytes)) => bytes,
605            Ok(Err(err)) => return Err(err),
606            Err(_) => {
607                // The bi-stream stalled past our budget. Close the QUIC
608                // connection so [`Self::is_connected`] (which reads
609                // `close_reason`) starts returning false; the connection
610                // pool's `get_or_init_pool_entry` will then evict this
611                // entry on the next access and the upstream retry loop
612                // will get a fresh connection.
613                warn!(
614                    target: LOG_NET_IROH,
615                    method = %method_str,
616                    timeout_secs = timeout.as_secs(),
617                    "iroh request timed out, closing connection",
618                );
619                self.close(
620                    iroh::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
621                    IROH_REQUEST_TIMEOUT_ERROR_REASON,
622                );
623                return Err(ServerError::Transport(anyhow::anyhow!(
624                    "iroh request {method_str} timed out after {timeout:?}"
625                )));
626            }
627        };
628
629        // TODO: We should not be serializing Results on the wire
630        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
631            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
632
633        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
634    }
635}
636
637#[apply(async_trait_maybe_send!)]
638impl IConnection for iroh_next::endpoint::Connection {
639    async fn await_disconnection(&self) {
640        self.closed().await;
641    }
642
643    fn is_connected(&self) -> bool {
644        self.close_reason().is_none()
645    }
646}
647
648#[async_trait]
649impl IGuardianConnection for iroh_next::endpoint::Connection {
650    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
651        let timeout = request_timeout_for_method(&method);
652        let method_str = method.to_string();
653        let json = serde_json::to_vec(&IrohApiRequest { method, request })
654            .expect("Serialization to vec can't fail");
655
656        let result = fedimint_core::runtime::timeout(timeout, async {
657            let (mut sink, mut stream) = self
658                .open_bi()
659                .await
660                .map_err(|e| ServerError::Transport(e.into()))?;
661
662            sink.write_all(&json)
663                .await
664                .map_err(|e| ServerError::Transport(e.into()))?;
665
666            sink.finish()
667                .map_err(|e| ServerError::Transport(e.into()))?;
668
669            stream
670                .read_to_end(IROH_MAX_RESPONSE_BYTES)
671                .await
672                .map_err(|e| ServerError::Transport(e.into()))
673        })
674        .await;
675
676        let response = match result {
677            Ok(Ok(bytes)) => bytes,
678            Ok(Err(err)) => return Err(err),
679            Err(_) => {
680                warn!(
681                    target: LOG_NET_IROH,
682                    method = %method_str,
683                    timeout_secs = timeout.as_secs(),
684                    "iroh request timed out, closing connection",
685                );
686                self.close(
687                    iroh_next::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
688                    IROH_REQUEST_TIMEOUT_ERROR_REASON,
689                );
690                return Err(ServerError::Transport(anyhow::anyhow!(
691                    "iroh request {method_str} timed out after {timeout:?}"
692                )));
693            }
694        };
695
696        // TODO: We should not be serializing Results on the wire
697        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
698            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
699
700        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
701    }
702}
703
704#[apply(async_trait_maybe_send!)]
705impl IGatewayConnection for Connection {
706    async fn request(
707        &self,
708        password: Option<String>,
709        _method: Method,
710        route: &str,
711        payload: Option<Value>,
712    ) -> ServerResult<Value> {
713        let iroh_request = IrohGatewayRequest {
714            route: route.to_string(),
715            params: payload,
716            password,
717        };
718        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
719
720        let (mut sink, mut stream) = self
721            .open_bi()
722            .await
723            .map_err(|e| ServerError::Transport(e.into()))?;
724
725        sink.write_all(&json)
726            .await
727            .map_err(|e| ServerError::Transport(e.into()))?;
728
729        sink.finish()
730            .map_err(|e| ServerError::Transport(e.into()))?;
731
732        let response = stream
733            .read_to_end(IROH_MAX_RESPONSE_BYTES)
734            .await
735            .map_err(|e| ServerError::Transport(e.into()))?;
736
737        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
738            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
739        match StatusCode::from_u16(response.status).map_err(|e| {
740            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
741        })? {
742            StatusCode::OK => Ok(response.body),
743            status => Err(ServerError::ServerError(anyhow::anyhow!(
744                "Server returned status code: {}",
745                status
746            ))),
747        }
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use fedimint_core::module::ApiMethod;
754
755    use super::{
756        IROH_REQUEST_TIMEOUT_DEFAULT, IROH_REQUEST_TIMEOUT_LONG_POLL, request_timeout_for_method,
757    };
758
759    /// Every `await_*` endpoint currently exposed by fedimint modules
760    /// should be classified as long-poll. If a new endpoint is added
761    /// without the prefix it will silently fall through to the default
762    /// 60s budget — this list documents the contract and will surface
763    /// renames as test churn.
764    const AWAIT_ENDPOINTS: &[&str] = &[
765        // fedimint-core
766        "await_output_outcome",
767        "await_outputs_outcomes",
768        "await_session_outcome",
769        "await_signed_session_outcome",
770        "await_transaction",
771        // fedimint-ln-common
772        "await_account",
773        "await_block_height",
774        "await_offer",
775        "await_outgoing_contract_cancelled",
776        "await_preimage_decryption",
777        // fedimint-lnv2-common
778        "await_incoming_contract",
779        "await_incoming_contracts",
780        "await_preimage",
781    ];
782
783    /// A representative sample of prompt endpoints — anything that is
784    /// expected to respond without server-side blocking.
785    const PROMPT_ENDPOINTS: &[&str] = &[
786        "block_count",
787        "session_count",
788        "session_status",
789        "status",
790        "version",
791        "client_config",
792        "audit",
793        "account",
794        "offer",
795        "list_gateways",
796        "submit_transaction",
797        "consensus_block_count",
798    ];
799
800    #[test]
801    fn await_prefix_gets_long_poll_timeout() {
802        for name in AWAIT_ENDPOINTS {
803            assert_eq!(
804                request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
805                IROH_REQUEST_TIMEOUT_LONG_POLL,
806                "core endpoint {name} should map to the long-poll timeout"
807            );
808            assert_eq!(
809                request_timeout_for_method(&ApiMethod::Module(0, (*name).to_owned())),
810                IROH_REQUEST_TIMEOUT_LONG_POLL,
811                "module endpoint {name} should map to the long-poll timeout"
812            );
813        }
814    }
815
816    #[test]
817    fn wait_prefix_also_gets_long_poll_timeout() {
818        // No fedimint endpoint currently uses this prefix, but the
819        // selector accepts it so future additions following the
820        // alternate naming convention don't silently get the default.
821        assert_eq!(
822            request_timeout_for_method(&ApiMethod::Core("wait_for_event".to_owned())),
823            IROH_REQUEST_TIMEOUT_LONG_POLL,
824        );
825    }
826
827    #[test]
828    fn prompt_endpoints_get_default_timeout() {
829        for name in PROMPT_ENDPOINTS {
830            assert_eq!(
831                request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
832                IROH_REQUEST_TIMEOUT_DEFAULT,
833                "endpoint {name} should map to the default timeout"
834            );
835        }
836    }
837
838    #[test]
839    fn endpoints_that_merely_contain_await_are_not_misclassified() {
840        // The selector is prefix-based, so an endpoint name with
841        // "await" elsewhere in the string must not get the long
842        // budget by accident.
843        assert_eq!(
844            request_timeout_for_method(&ApiMethod::Core("submit_await_thing".to_owned())),
845            IROH_REQUEST_TIMEOUT_DEFAULT,
846        );
847    }
848}