Skip to main content

fedimint_connectors/
iroh.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context, bail};
9use async_trait::async_trait;
10use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
11use fedimint_core::envs::{
12    FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
13    parse_kv_list_from_env,
14};
15use fedimint_core::module::{
16    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
17    IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
18};
19
20/// The maximum number of bytes we are willing to buffer when reading an API
21/// response from an iroh QUIC stream. This must be large enough to accommodate
22/// the largest possible signed session outcome. A session can contain up to
23/// `broadcast_rounds_per_session` (default 3600) rounds, each peer produces one
24/// unit per round, and each unit can be up to `ALEPH_BFT_UNIT_BYTE_LIMIT`
25/// bytes. The response is JSON-serialized which hex-encodes the consensus
26/// bytes, roughly doubling the size. We use 2x the raw max as a conservative
27/// upper bound. For a 4-peer federation this is ~1.44 GB.
28const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
29
30/// Wall-clock budget for a single iroh API request to make it through the QUIC
31/// bi-stream (open + write + finish + read response). If exceeded we close the
32/// underlying [`Connection`], which causes [`IConnection::is_connected`] to
33/// return false on the next pool lookup so a fresh connection is established
34/// for the retry. Used for endpoints that respond promptly (`block_count`,
35/// `status`, etc).
36const IROH_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(60);
37
38/// Wall-clock budget for an iroh API request to a server-side long-poll
39/// endpoint (`await_*` / `wait_*`). These wait on the server until an event
40/// fires (block height reached, contract cancelled, etc.) before responding,
41/// so they need a generous bound. Set well above realistic mainnet block
42/// intervals; if a long-poll legitimately needs longer than this the upstream
43/// `request_current_consensus_retry` loop will reconnect and retry.
44const IROH_REQUEST_TIMEOUT_LONG_POLL: Duration = Duration::from_secs(60 * 60);
45
46/// Application-level QUIC error code we use when closing a [`Connection`]
47/// after a request timeout. Recorded by the peer as the close reason; chosen
48/// arbitrarily but stable across stable and `iroh_next` impls so the two
49/// emit identical telemetry. The value 1 distinguishes us from a graceful
50/// close (0).
51const IROH_REQUEST_TIMEOUT_ERROR_CODE: u32 = 1;
52const IROH_REQUEST_TIMEOUT_ERROR_REASON: &[u8] = b"request timeout";
53
54/// Request timeout strategy: long-poll endpoints (`await_*` / `wait_*`)
55/// get the long bound, everything else gets the default. The string match
56/// is a heuristic; it covers all currently-defined fedimint long-poll
57/// endpoints and stays correct if new ones follow the existing naming
58/// convention. False positives (a non-long-poll endpoint that happens to
59/// match the prefix) just give that one method a longer leash; the worse
60/// case is a false negative — a long-poll method that doesn't match
61/// either prefix would get the 60s default and fail fast on legitimate
62/// waits, but the upstream retry loop would reconnect and try again.
63fn request_timeout_for_method(method: &ApiMethod) -> Duration {
64    let name = match method {
65        ApiMethod::Core(name) => name.as_str(),
66        ApiMethod::Module(_, name) => name.as_str(),
67    };
68    if name.starts_with("await_") || name.starts_with("wait_") {
69        IROH_REQUEST_TIMEOUT_LONG_POLL
70    } else {
71        IROH_REQUEST_TIMEOUT_DEFAULT
72    }
73}
74use fedimint_core::task::spawn;
75use fedimint_core::util::{FmtCompact as _, SafeUrl};
76use fedimint_core::{apply, async_trait_maybe_send};
77use fedimint_logging::LOG_NET_IROH;
78use futures::Future;
79use futures::stream::{FuturesUnordered, StreamExt};
80use iroh::discovery::pkarr::PkarrResolver;
81use iroh::endpoint::Connection;
82use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
83use iroh_base::ticket::NodeTicket;
84use iroh_next::Watcher as _;
85use reqwest::{Method, StatusCode};
86use serde_json::Value;
87use tokio::sync::watch;
88use tracing::{debug, trace, warn};
89
90use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
91use crate::{Connectivity, DynGatewayConnection, IConnection, IGatewayConnection};
92
93#[derive(Clone)]
94pub(crate) struct IrohConnector {
95    stable: iroh::endpoint::Endpoint,
96    next: Option<iroh_next::endpoint::Endpoint>,
97
98    /// List of overrides to use when attempting to connect to given
99    /// `NodeId`
100    ///
101    /// This is useful for testing, or forcing non-default network
102    /// connectivity.
103    connection_overrides: BTreeMap<NodeId, NodeAddr>,
104
105    /// Registry-owned signal bumped whenever any per-connection monitoring
106    /// task observes a transport-level path change (e.g. iroh relay →
107    /// direct). Consumers of [`crate::ConnectorRegistry`] subscribe via
108    /// [`crate::ConnectorRegistry::connectivity_change_notifier`].
109    path_change: Arc<watch::Sender<u64>>,
110}
111
112impl fmt::Debug for IrohConnector {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("IrohEndpoint")
115            .field("stable-id", &self.stable.node_id())
116            .field("next-id", &self.next.as_ref().map(iroh_next::Endpoint::id))
117            .finish_non_exhaustive()
118    }
119}
120
121impl IrohConnector {
122    pub async fn new(
123        iroh_dns: Option<SafeUrl>,
124        iroh_enable_dht: bool,
125        iroh_enable_next: bool,
126        path_change: Arc<watch::Sender<u64>>,
127    ) -> anyhow::Result<Self> {
128        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
129        const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
130        let mut s =
131            Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next, path_change)
132                .await?;
133
134        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
135            s = s.with_connection_override(k, v.into());
136        }
137
138        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
139            s = s.with_connection_override(k, v.into());
140        }
141
142        Ok(s)
143    }
144
145    #[allow(clippy::too_many_lines)]
146    pub async fn new_no_overrides(
147        iroh_dns: Option<SafeUrl>,
148        iroh_enable_dht: bool,
149        iroh_enable_next: bool,
150        path_change: Arc<watch::Sender<u64>>,
151    ) -> anyhow::Result<Self> {
152        let endpoint_stable = Box::pin({
153            let iroh_dns = iroh_dns.clone();
154            async {
155                let mut builder = Endpoint::builder();
156
157                if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
158                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
159                }
160
161                // As a client, we don't need to register on any relays
162                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
163
164                #[cfg(not(target_family = "wasm"))]
165                if iroh_enable_dht {
166                    builder = builder.discovery_dht();
167                }
168
169                // Add only resolver services here; the stable n0 convenience also
170                // installs a publisher.
171                {
172                    if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
173                        #[cfg(target_family = "wasm")]
174                        {
175                            builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
176                        }
177                    } else {
178                        warn!(
179                            target: LOG_NET_IROH,
180                            "Iroh pkarr resolver is disabled"
181                        );
182                    }
183
184                    if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
185                        #[cfg(not(target_family = "wasm"))]
186                        {
187                            builder = builder.add_discovery(move |_| {
188                                Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
189                            });
190                        }
191                    } else {
192                        warn!(
193                            target: LOG_NET_IROH,
194                            "Iroh n0 discovery is disabled"
195                        );
196                    }
197                }
198
199                let endpoint = builder.bind().await?;
200                debug!(
201                    target: LOG_NET_IROH,
202                    node_id = %endpoint.node_id(),
203                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
204                    "Iroh api client endpoint (stable)"
205                );
206                Ok::<_, anyhow::Error>(endpoint)
207            }
208        });
209        let endpoint_next = Box::pin(async {
210            let mut builder = iroh_next::Endpoint::builder();
211
212            if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
213                builder = builder.address_lookup(
214                    iroh_next::address_lookup::PkarrResolver::builder(iroh_dns).build(),
215                );
216            }
217
218            // As a client, we don't need to register on any relays
219            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
220
221            #[cfg(not(target_family = "wasm"))]
222            if iroh_enable_dht {
223                builder =
224                    builder.address_lookup(iroh_next::address_lookup::DhtAddressLookup::builder());
225            }
226
227            // Add only resolver services here; the iroh 0.96
228            // `.preset(presets::N0)` convenience also installs a publisher.
229            {
230                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
231                #[cfg(target_family = "wasm")]
232                {
233                    builder =
234                        builder.address_lookup(iroh_next::address_lookup::PkarrResolver::n0_dns());
235                }
236                // Resolve using DNS queries outside browsers.
237                #[cfg(not(target_family = "wasm"))]
238                {
239                    builder = builder
240                        .address_lookup(iroh_next::address_lookup::DnsAddressLookup::n0_dns());
241                }
242            }
243
244            let endpoint = builder.bind().await?;
245            debug!(
246                target: LOG_NET_IROH,
247                node_id = %endpoint.id(),
248                node_id_pkarr = %z32::encode(endpoint.id().as_bytes()),
249                "Iroh api client endpoint (next)"
250            );
251            Ok(endpoint)
252        });
253
254        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
255            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
256            (s, Some(n))
257        } else {
258            (endpoint_stable.await?, None)
259        };
260
261        Ok(Self {
262            stable: endpoint_stable,
263            next: endpoint_next,
264            connection_overrides: BTreeMap::new(),
265            path_change,
266        })
267    }
268
269    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
270        self.connection_overrides.insert(node, addr);
271        self
272    }
273
274    pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
275        if url.scheme() != "iroh" {
276            bail!(
277                "Unsupported scheme: {}, passed to iroh endpoint handler",
278                url.scheme()
279            );
280        }
281        let host = url.host_str().context("Missing host string in Iroh URL")?;
282
283        let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
284
285        Ok(node_id)
286    }
287}
288
289#[async_trait::async_trait]
290impl crate::Connector for IrohConnector {
291    async fn connect_guardian(
292        &self,
293        url: &SafeUrl,
294        api_secret: Option<&str>,
295    ) -> ServerResult<DynGuaridianConnection> {
296        if api_secret.is_some() {
297            // There seem to be no way to pass secret over current Iroh calling
298            // convention
299            ServerError::Connection(anyhow::format_err!(
300                "Iroh api secrets currently not supported"
301            ));
302        }
303        let node_id =
304            Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
305                source,
306                url: url.to_owned(),
307            })?;
308        let mut futures = FuturesUnordered::<
309            Pin<
310                Box<
311                    dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
312                        + Send,
313                >,
314            >,
315        >::new();
316        let connection_override = self.connection_overrides.get(&node_id).cloned();
317
318        let self_clone = self.clone();
319        futures.push(Box::pin({
320            let connection_override = connection_override.clone();
321            async move {
322                (
323                    self_clone
324                        .make_new_connection_stable(node_id, connection_override)
325                        .await
326                        .map(super::IGuardianConnection::into_dyn),
327                    "stable",
328                )
329            }
330        }));
331
332        if let Some(endpoint_next) = &self.next {
333            let self_clone = self.clone();
334            let endpoint_next = endpoint_next.clone();
335            futures.push(Box::pin(async move {
336                (
337                    self_clone
338                        .make_new_connection_next(&endpoint_next, node_id, connection_override)
339                        .await
340                        .map(super::IGuardianConnection::into_dyn),
341                    "next",
342                )
343            }));
344        }
345
346        // Remember last error, so we have something to return if
347        // neither connection works.
348        let mut prev_err = None;
349
350        // Loop until first success, or running out of connections.
351        while let Some((result, iroh_stack)) = futures.next().await {
352            match result {
353                Ok(connection) => return Ok(connection),
354                Err(err) => {
355                    warn!(
356                        target: LOG_NET_IROH,
357                        err = %err.fmt_compact(),
358                        %iroh_stack,
359                        "Join error in iroh connection task"
360                    );
361                    prev_err = Some(err);
362                }
363            }
364        }
365
366        Err(prev_err.unwrap_or_else(|| {
367            ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
368        }))
369    }
370
371    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
372        let node_id = Self::node_id_from_url(url)?;
373        if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
374            let conn = self
375                .stable
376                .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
377                .await?;
378
379            #[cfg(not(target_family = "wasm"))]
380            Self::spawn_connection_monitoring_stable(
381                &self.stable,
382                node_id,
383                self.path_change.clone(),
384            );
385
386            Ok(IGatewayConnection::into_dyn(conn))
387        } else {
388            let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
389            Ok(IGatewayConnection::into_dyn(conn))
390        }
391    }
392
393    fn connectivity(&self, url: &SafeUrl) -> Connectivity {
394        let Ok(node_id) = Self::node_id_from_url(url) else {
395            return Connectivity::Unknown;
396        };
397        let Ok(watcher) = self.stable.conn_type(node_id) else {
398            return Connectivity::Unknown;
399        };
400        match watcher.get() {
401            Ok(iroh::endpoint::ConnectionType::Direct(_)) => Connectivity::Direct,
402            Ok(iroh::endpoint::ConnectionType::Relay(_)) => Connectivity::Relay,
403            Ok(iroh::endpoint::ConnectionType::Mixed(..)) => Connectivity::Mixed,
404            Ok(iroh::endpoint::ConnectionType::None) | Err(_) => Connectivity::Unknown,
405        }
406    }
407}
408
409impl IrohConnector {
410    #[cfg(not(target_family = "wasm"))]
411    fn spawn_connection_monitoring_stable(
412        endpoint: &Endpoint,
413        node_id: NodeId,
414        path_change: Arc<watch::Sender<u64>>,
415    ) {
416        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
417            #[allow(clippy::let_underscore_future)]
418            let _ = spawn("iroh connection (stable)", async move {
419                if let Ok(conn_type) = conn_type_watcher.get() {
420                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
421                }
422                while let Ok(event) = conn_type_watcher.updated().await {
423                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
424                    path_change.send_modify(|c| *c = c.wrapping_add(1));
425                }
426            });
427        }
428    }
429
430    #[cfg(not(target_family = "wasm"))]
431    fn spawn_connection_monitoring_next(
432        conn: &iroh_next::endpoint::Connection,
433        node_id: iroh_next::EndpointId,
434        path_change: Arc<watch::Sender<u64>>,
435    ) {
436        let mut paths_watcher = conn.paths();
437        #[allow(clippy::let_underscore_future)]
438        let _ = spawn("iroh connection (next)", async move {
439            let paths = paths_watcher.get();
440            debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths (initial)");
441            while let Ok(paths) = paths_watcher.updated().await {
442                debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths changed");
443                path_change.send_modify(|c| *c = c.wrapping_add(1));
444            }
445        });
446    }
447
448    async fn make_new_connection_stable(
449        &self,
450        node_id: NodeId,
451        node_addr: Option<NodeAddr>,
452    ) -> ServerResult<Connection> {
453        trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
454        let conn = match node_addr.clone() {
455            Some(node_addr) => {
456                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
457                let conn = self.stable
458                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
459                    .await;
460
461                #[cfg(not(target_family = "wasm"))]
462                if conn.is_ok() {
463                    Self::spawn_connection_monitoring_stable(
464                        &self.stable,
465                        node_id,
466                        self.path_change.clone(),
467                    );
468                }
469                conn
470            }
471            None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
472        }.map_err(ServerError::Connection)?;
473
474        Ok(conn)
475    }
476
477    async fn make_new_connection_next(
478        &self,
479        endpoint_next: &iroh_next::Endpoint,
480        node_id: NodeId,
481        node_addr: Option<NodeAddr>,
482    ) -> ServerResult<iroh_next::endpoint::Connection> {
483        let next_node_id =
484            iroh_next::EndpointId::from_bytes(node_id.as_bytes()).expect("Can't fail");
485
486        let endpoint_next = endpoint_next.clone();
487
488        trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
489        let conn = match node_addr.clone() {
490            Some(node_addr) => {
491                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
492                let node_addr = node_addr_stable_to_next(&node_addr);
493                let conn = endpoint_next
494                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
495                    .await;
496
497                #[cfg(not(target_family = "wasm"))]
498                if let Ok(conn) = &conn {
499                    Self::spawn_connection_monitoring_next(
500                        conn,
501                        node_addr.id,
502                        self.path_change.clone(),
503                    );
504                }
505
506                conn
507            }
508            None => endpoint_next.connect(
509                next_node_id,
510                FEDIMINT_API_ALPN
511            ).await,
512        }
513        .map_err(Into::into)
514        .map_err(ServerError::Connection)?;
515
516        Ok(conn)
517    }
518}
519
520fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::EndpointAddr {
521    let next_node_id =
522        iroh_next::EndpointId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail");
523    let relay_addrs = stable.relay_url.iter().map(|u| {
524        iroh_next::TransportAddr::Relay(
525            iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail"),
526        )
527    });
528    let direct_addrs = stable
529        .direct_addresses
530        .iter()
531        .copied()
532        .map(iroh_next::TransportAddr::Ip);
533
534    iroh_next::EndpointAddr::from_parts(next_node_id, relay_addrs.chain(direct_addrs))
535}
536
537#[apply(async_trait_maybe_send!)]
538impl IConnection for Connection {
539    async fn await_disconnection(&self) {
540        self.closed().await;
541    }
542
543    fn is_connected(&self) -> bool {
544        self.close_reason().is_none()
545    }
546}
547
548#[async_trait]
549impl IGuardianConnection for Connection {
550    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
551        let timeout = request_timeout_for_method(&method);
552        let method_str = method.to_string();
553        let json = serde_json::to_vec(&IrohApiRequest { method, request })
554            .expect("Serialization to vec can't fail");
555
556        let result = fedimint_core::runtime::timeout(timeout, async {
557            let (mut sink, mut stream) = self
558                .open_bi()
559                .await
560                .map_err(|e| ServerError::Transport(e.into()))?;
561
562            sink.write_all(&json)
563                .await
564                .map_err(|e| ServerError::Transport(e.into()))?;
565
566            sink.finish()
567                .map_err(|e| ServerError::Transport(e.into()))?;
568
569            stream
570                .read_to_end(IROH_MAX_RESPONSE_BYTES)
571                .await
572                .map_err(|e| ServerError::Transport(e.into()))
573        })
574        .await;
575
576        let response = match result {
577            Ok(Ok(bytes)) => bytes,
578            Ok(Err(err)) => return Err(err),
579            Err(_) => {
580                // The bi-stream stalled past our budget. Close the QUIC
581                // connection so [`Self::is_connected`] (which reads
582                // `close_reason`) starts returning false; the connection
583                // pool's `get_or_init_pool_entry` will then evict this
584                // entry on the next access and the upstream retry loop
585                // will get a fresh connection.
586                warn!(
587                    target: LOG_NET_IROH,
588                    method = %method_str,
589                    timeout_secs = timeout.as_secs(),
590                    "iroh request timed out, closing connection",
591                );
592                self.close(
593                    iroh::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
594                    IROH_REQUEST_TIMEOUT_ERROR_REASON,
595                );
596                return Err(ServerError::Transport(anyhow::anyhow!(
597                    "iroh request {method_str} timed out after {timeout:?}"
598                )));
599            }
600        };
601
602        // TODO: We should not be serializing Results on the wire
603        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
604            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
605
606        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
607    }
608}
609
610#[apply(async_trait_maybe_send!)]
611impl IConnection for iroh_next::endpoint::Connection {
612    async fn await_disconnection(&self) {
613        self.closed().await;
614    }
615
616    fn is_connected(&self) -> bool {
617        self.close_reason().is_none()
618    }
619}
620
621#[async_trait]
622impl IGuardianConnection for iroh_next::endpoint::Connection {
623    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
624        let timeout = request_timeout_for_method(&method);
625        let method_str = method.to_string();
626        let json = serde_json::to_vec(&IrohApiRequest { method, request })
627            .expect("Serialization to vec can't fail");
628
629        let result = fedimint_core::runtime::timeout(timeout, async {
630            let (mut sink, mut stream) = self
631                .open_bi()
632                .await
633                .map_err(|e| ServerError::Transport(e.into()))?;
634
635            sink.write_all(&json)
636                .await
637                .map_err(|e| ServerError::Transport(e.into()))?;
638
639            sink.finish()
640                .map_err(|e| ServerError::Transport(e.into()))?;
641
642            stream
643                .read_to_end(IROH_MAX_RESPONSE_BYTES)
644                .await
645                .map_err(|e| ServerError::Transport(e.into()))
646        })
647        .await;
648
649        let response = match result {
650            Ok(Ok(bytes)) => bytes,
651            Ok(Err(err)) => return Err(err),
652            Err(_) => {
653                warn!(
654                    target: LOG_NET_IROH,
655                    method = %method_str,
656                    timeout_secs = timeout.as_secs(),
657                    "iroh request timed out, closing connection",
658                );
659                self.close(
660                    iroh_next::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
661                    IROH_REQUEST_TIMEOUT_ERROR_REASON,
662                );
663                return Err(ServerError::Transport(anyhow::anyhow!(
664                    "iroh request {method_str} timed out after {timeout:?}"
665                )));
666            }
667        };
668
669        // TODO: We should not be serializing Results on the wire
670        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
671            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
672
673        response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
674    }
675}
676
677#[apply(async_trait_maybe_send!)]
678impl IGatewayConnection for Connection {
679    async fn request(
680        &self,
681        password: Option<String>,
682        _method: Method,
683        route: &str,
684        payload: Option<Value>,
685    ) -> ServerResult<Value> {
686        let iroh_request = IrohGatewayRequest {
687            route: route.to_string(),
688            params: payload,
689            password,
690        };
691        let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
692
693        let (mut sink, mut stream) = self
694            .open_bi()
695            .await
696            .map_err(|e| ServerError::Transport(e.into()))?;
697
698        sink.write_all(&json)
699            .await
700            .map_err(|e| ServerError::Transport(e.into()))?;
701
702        sink.finish()
703            .map_err(|e| ServerError::Transport(e.into()))?;
704
705        let response = stream
706            .read_to_end(IROH_MAX_RESPONSE_BYTES)
707            .await
708            .map_err(|e| ServerError::Transport(e.into()))?;
709
710        let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
711            .map_err(|e| ServerError::InvalidResponse(e.into()))?;
712        match StatusCode::from_u16(response.status).map_err(|e| {
713            ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
714        })? {
715            StatusCode::OK => Ok(response.body),
716            status => Err(ServerError::ServerError(anyhow::anyhow!(
717                "Server returned status code: {}",
718                status
719            ))),
720        }
721    }
722}
723
724#[cfg(test)]
725mod tests {
726    use fedimint_core::module::ApiMethod;
727
728    use super::{
729        IROH_REQUEST_TIMEOUT_DEFAULT, IROH_REQUEST_TIMEOUT_LONG_POLL, request_timeout_for_method,
730    };
731
732    /// Every `await_*` endpoint currently exposed by fedimint modules
733    /// should be classified as long-poll. If a new endpoint is added
734    /// without the prefix it will silently fall through to the default
735    /// 60s budget — this list documents the contract and will surface
736    /// renames as test churn.
737    const AWAIT_ENDPOINTS: &[&str] = &[
738        // fedimint-core
739        "await_output_outcome",
740        "await_outputs_outcomes",
741        "await_session_outcome",
742        "await_signed_session_outcome",
743        "await_transaction",
744        // fedimint-ln-common
745        "await_account",
746        "await_block_height",
747        "await_offer",
748        "await_outgoing_contract_cancelled",
749        "await_preimage_decryption",
750        // fedimint-lnv2-common
751        "await_incoming_contract",
752        "await_incoming_contracts",
753        "await_preimage",
754    ];
755
756    /// A representative sample of prompt endpoints — anything that is
757    /// expected to respond without server-side blocking.
758    const PROMPT_ENDPOINTS: &[&str] = &[
759        "block_count",
760        "session_count",
761        "session_status",
762        "status",
763        "version",
764        "client_config",
765        "audit",
766        "account",
767        "offer",
768        "list_gateways",
769        "submit_transaction",
770        "consensus_block_count",
771    ];
772
773    #[test]
774    fn await_prefix_gets_long_poll_timeout() {
775        for name in AWAIT_ENDPOINTS {
776            assert_eq!(
777                request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
778                IROH_REQUEST_TIMEOUT_LONG_POLL,
779                "core endpoint {name} should map to the long-poll timeout"
780            );
781            assert_eq!(
782                request_timeout_for_method(&ApiMethod::Module(0, (*name).to_owned())),
783                IROH_REQUEST_TIMEOUT_LONG_POLL,
784                "module endpoint {name} should map to the long-poll timeout"
785            );
786        }
787    }
788
789    #[test]
790    fn wait_prefix_also_gets_long_poll_timeout() {
791        // No fedimint endpoint currently uses this prefix, but the
792        // selector accepts it so future additions following the
793        // alternate naming convention don't silently get the default.
794        assert_eq!(
795            request_timeout_for_method(&ApiMethod::Core("wait_for_event".to_owned())),
796            IROH_REQUEST_TIMEOUT_LONG_POLL,
797        );
798    }
799
800    #[test]
801    fn prompt_endpoints_get_default_timeout() {
802        for name in PROMPT_ENDPOINTS {
803            assert_eq!(
804                request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
805                IROH_REQUEST_TIMEOUT_DEFAULT,
806                "endpoint {name} should map to the default timeout"
807            );
808        }
809    }
810
811    #[test]
812    fn endpoints_that_merely_contain_await_are_not_misclassified() {
813        // The selector is prefix-based, so an endpoint name with
814        // "await" elsewhere in the string must not get the long
815        // budget by accident.
816        assert_eq!(
817            request_timeout_for_method(&ApiMethod::Core("submit_await_thing".to_owned())),
818            IROH_REQUEST_TIMEOUT_DEFAULT,
819        );
820    }
821}