1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context, bail};
9use async_trait::async_trait;
10use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
11use fedimint_core::envs::{
12 FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
13 parse_kv_list_from_env,
14};
15use fedimint_core::module::{
16 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
17 IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
18};
19use fedimint_core::net::iroh::{IROH_IDLE_TIMEOUT, IROH_KEEP_ALIVE_INTERVAL};
20
21const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
30
31const IROH_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(60);
38
39const IROH_REQUEST_TIMEOUT_LONG_POLL: Duration = Duration::from_secs(60 * 60);
46
47const IROH_REQUEST_TIMEOUT_ERROR_CODE: u32 = 1;
53const IROH_REQUEST_TIMEOUT_ERROR_REASON: &[u8] = b"request timeout";
54
55fn request_timeout_for_method(method: &ApiMethod) -> Duration {
65 let name = match method {
66 ApiMethod::Core(name) => name.as_str(),
67 ApiMethod::Module(_, name) => name.as_str(),
68 };
69 if name.starts_with("await_") || name.starts_with("wait_") {
70 IROH_REQUEST_TIMEOUT_LONG_POLL
71 } else {
72 IROH_REQUEST_TIMEOUT_DEFAULT
73 }
74}
75use fedimint_core::task::spawn;
76use fedimint_core::util::{FmtCompact as _, SafeUrl};
77use fedimint_core::{apply, async_trait_maybe_send};
78use fedimint_logging::LOG_NET_IROH;
79use futures::Future;
80use futures::stream::{FuturesUnordered, StreamExt};
81use iroh::discovery::pkarr::PkarrResolver;
82use iroh::endpoint::Connection;
83use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
84use iroh_base::ticket::NodeTicket;
85use reqwest::{Method, StatusCode};
86use serde_json::Value;
87use tokio::sync::watch;
88use tracing::{debug, trace, warn};
89
90use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
91use crate::{Connectivity, DynGatewayConnection, IConnection, IGatewayConnection};
92
93#[derive(Clone)]
94pub(crate) struct IrohConnector {
95 stable: iroh::endpoint::Endpoint,
96 next: Option<iroh_next::endpoint::Endpoint>,
97
98 connection_overrides: BTreeMap<NodeId, NodeAddr>,
104
105 path_change: Arc<watch::Sender<u64>>,
110}
111
112impl fmt::Debug for IrohConnector {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 f.debug_struct("IrohEndpoint")
115 .field("stable-id", &self.stable.node_id())
116 .field("next-id", &self.next.as_ref().map(iroh_next::Endpoint::id))
117 .finish_non_exhaustive()
118 }
119}
120
121impl IrohConnector {
122 pub async fn new(
123 iroh_dns: Option<SafeUrl>,
124 iroh_enable_dht: bool,
125 iroh_enable_next: bool,
126 path_change: Arc<watch::Sender<u64>>,
127 ) -> anyhow::Result<Self> {
128 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
129 const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
130 let mut s =
131 Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next, path_change)
132 .await?;
133
134 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
135 s = s.with_connection_override(k, v.into());
136 }
137
138 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
139 s = s.with_connection_override(k, v.into());
140 }
141
142 Ok(s)
143 }
144
145 #[allow(clippy::too_many_lines)]
146 pub async fn new_no_overrides(
147 iroh_dns: Option<SafeUrl>,
148 iroh_enable_dht: bool,
149 iroh_enable_next: bool,
150 path_change: Arc<watch::Sender<u64>>,
151 ) -> anyhow::Result<Self> {
152 let endpoint_stable = Box::pin({
153 let iroh_dns = iroh_dns.clone();
154 async {
155 let mut builder = Endpoint::builder();
156
157 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
158 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
159 }
160
161 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
163
164 #[cfg(not(target_family = "wasm"))]
165 if iroh_enable_dht {
166 builder = builder.discovery_dht();
167 }
168
169 {
172 if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
173 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
174 } else {
175 warn!(
176 target: LOG_NET_IROH,
177 "Iroh pkarr resolver is disabled"
178 );
179 }
180
181 if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
182 #[cfg(not(target_family = "wasm"))]
183 {
184 builder = builder.add_discovery(move |_| {
185 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
186 });
187 }
188 } else {
189 warn!(
190 target: LOG_NET_IROH,
191 "Iroh n0 discovery is disabled"
192 );
193 }
194 }
195
196 let endpoint = builder
197 .transport_config(quic_transport_config())
198 .bind()
199 .await?;
200 debug!(
201 target: LOG_NET_IROH,
202 node_id = %endpoint.node_id(),
203 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
204 "Iroh api client endpoint (stable)"
205 );
206 Ok::<_, anyhow::Error>(endpoint)
207 }
208 });
209 let endpoint_next = Box::pin(async {
210 let mut builder = iroh_next::Endpoint::builder(iroh_next::endpoint::presets::Minimal);
211
212 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
213 builder = builder
214 .address_lookup(iroh_next::address_lookup::PkarrResolver::builder(iroh_dns));
215 }
216
217 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
219
220 #[cfg(not(target_family = "wasm"))]
221 if iroh_enable_dht {
222 builder = builder
223 .address_lookup(iroh_mainline_address_lookup::DhtAddressLookup::builder());
224 }
225
226 {
229 builder =
231 builder.address_lookup(iroh_next::address_lookup::PkarrResolver::n0_dns());
232 #[cfg(not(target_family = "wasm"))]
234 {
235 builder = builder
236 .address_lookup(iroh_next::address_lookup::DnsAddressLookup::n0_dns());
237 }
238 }
239
240 let endpoint = builder
241 .transport_config(quic_transport_config_next())
242 .bind()
243 .await?;
244 debug!(
245 target: LOG_NET_IROH,
246 node_id = %endpoint.id(),
247 node_id_pkarr = %z32::encode(endpoint.id().as_bytes()),
248 "Iroh api client endpoint (next)"
249 );
250 Ok(endpoint)
251 });
252
253 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
254 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
255 (s, Some(n))
256 } else {
257 (endpoint_stable.await?, None)
258 };
259
260 Ok(Self {
261 stable: endpoint_stable,
262 next: endpoint_next,
263 connection_overrides: BTreeMap::new(),
264 path_change,
265 })
266 }
267
268 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
269 self.connection_overrides.insert(node, addr);
270 self
271 }
272
273 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
274 if url.scheme() != "iroh" {
275 bail!(
276 "Unsupported scheme: {}, passed to iroh endpoint handler",
277 url.scheme()
278 );
279 }
280 let host = url.host_str().context("Missing host string in Iroh URL")?;
281
282 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
283
284 Ok(node_id)
285 }
286}
287
288#[async_trait::async_trait]
289impl crate::Connector for IrohConnector {
290 async fn connect_guardian(
291 &self,
292 url: &SafeUrl,
293 api_secret: Option<&str>,
294 ) -> ServerResult<DynGuaridianConnection> {
295 if api_secret.is_some() {
296 ServerError::Connection(anyhow::format_err!(
299 "Iroh api secrets currently not supported"
300 ));
301 }
302 let node_id =
303 Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
304 source,
305 url: url.to_owned(),
306 })?;
307 let mut futures = FuturesUnordered::<
308 Pin<
309 Box<
310 dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
311 + Send,
312 >,
313 >,
314 >::new();
315 let connection_override = self.connection_overrides.get(&node_id).cloned();
316
317 let self_clone = self.clone();
318 futures.push(Box::pin({
319 let connection_override = connection_override.clone();
320 async move {
321 (
322 self_clone
323 .make_new_connection_stable(node_id, connection_override)
324 .await
325 .map(super::IGuardianConnection::into_dyn),
326 "stable",
327 )
328 }
329 }));
330
331 if let Some(endpoint_next) = &self.next {
332 let self_clone = self.clone();
333 let endpoint_next = endpoint_next.clone();
334 futures.push(Box::pin(async move {
335 (
336 self_clone
337 .make_new_connection_next(&endpoint_next, node_id, connection_override)
338 .await
339 .map(super::IGuardianConnection::into_dyn),
340 "next",
341 )
342 }));
343 }
344
345 let mut prev_err = None;
348
349 while let Some((result, iroh_stack)) = futures.next().await {
351 match result {
352 Ok(connection) => return Ok(connection),
353 Err(err) => {
354 warn!(
355 target: LOG_NET_IROH,
356 err = %err.fmt_compact(),
357 %iroh_stack,
358 "Join error in iroh connection task"
359 );
360 prev_err = Some(err);
361 }
362 }
363 }
364
365 Err(prev_err.unwrap_or_else(|| {
366 ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
367 }))
368 }
369
370 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
371 let node_id = Self::node_id_from_url(url)?;
372 if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
373 let conn = self
374 .stable
375 .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
376 .await?;
377
378 #[cfg(not(target_family = "wasm"))]
379 Self::spawn_connection_monitoring_stable(
380 &self.stable,
381 node_id,
382 self.path_change.clone(),
383 );
384
385 Ok(IGatewayConnection::into_dyn(conn))
386 } else {
387 let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
388 Ok(IGatewayConnection::into_dyn(conn))
389 }
390 }
391
392 fn connectivity(&self, url: &SafeUrl) -> Connectivity {
393 let Ok(node_id) = Self::node_id_from_url(url) else {
394 return Connectivity::Unknown;
395 };
396 let Ok(watcher) = self.stable.conn_type(node_id) else {
397 return Connectivity::Unknown;
398 };
399 match watcher.get() {
400 Ok(iroh::endpoint::ConnectionType::Direct(_)) => Connectivity::Direct,
401 Ok(iroh::endpoint::ConnectionType::Relay(_)) => Connectivity::Relay,
402 Ok(iroh::endpoint::ConnectionType::Mixed(..)) => Connectivity::Mixed,
403 Ok(iroh::endpoint::ConnectionType::None) | Err(_) => Connectivity::Unknown,
404 }
405 }
406}
407
408impl IrohConnector {
409 #[cfg(not(target_family = "wasm"))]
410 fn spawn_connection_monitoring_stable(
411 endpoint: &Endpoint,
412 node_id: NodeId,
413 path_change: Arc<watch::Sender<u64>>,
414 ) {
415 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
416 #[allow(clippy::let_underscore_future)]
417 let _ = spawn("iroh connection (stable)", async move {
418 if let Ok(conn_type) = conn_type_watcher.get() {
419 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
420 }
421 while let Ok(event) = conn_type_watcher.updated().await {
422 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
423 path_change.send_modify(|c| *c = c.wrapping_add(1));
424 }
425 });
426 }
427 }
428
429 #[cfg(not(target_family = "wasm"))]
430 fn spawn_connection_monitoring_next(
431 conn: &iroh_next::endpoint::Connection,
432 node_id: iroh_next::EndpointId,
433 path_change: Arc<watch::Sender<u64>>,
434 ) {
435 let conn = conn.clone();
436 #[allow(clippy::let_underscore_future)]
437 let _ = spawn("iroh connection (next)", async move {
438 let mut paths = conn.paths_stream();
439 if let Some(paths) = paths.next().await {
440 debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths (initial)");
441 }
442 while let Some(paths) = paths.next().await {
443 debug!(target: LOG_NET_IROH, %node_id, ?paths, "Connection paths changed");
444 path_change.send_modify(|c| *c = c.wrapping_add(1));
445 }
446 });
447 }
448
449 async fn make_new_connection_stable(
450 &self,
451 node_id: NodeId,
452 node_addr: Option<NodeAddr>,
453 ) -> ServerResult<Connection> {
454 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
455 let conn = match node_addr.clone() {
456 Some(node_addr) => {
457 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
458 let conn = self.stable
459 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
460 .await;
461
462 #[cfg(not(target_family = "wasm"))]
463 if conn.is_ok() {
464 Self::spawn_connection_monitoring_stable(
465 &self.stable,
466 node_id,
467 self.path_change.clone(),
468 );
469 }
470 conn
471 }
472 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
473 }.map_err(ServerError::Connection)?;
474
475 Ok(conn)
476 }
477
478 async fn make_new_connection_next(
479 &self,
480 endpoint_next: &iroh_next::Endpoint,
481 node_id: NodeId,
482 node_addr: Option<NodeAddr>,
483 ) -> ServerResult<iroh_next::endpoint::Connection> {
484 let next_node_id =
485 iroh_next::EndpointId::from_bytes(node_id.as_bytes()).expect("Can't fail");
486
487 let endpoint_next = endpoint_next.clone();
488
489 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
490 let conn = match node_addr.clone() {
491 Some(node_addr) => {
492 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
493 let node_addr = node_addr_stable_to_next(&node_addr);
494 let conn = endpoint_next
495 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
496 .await;
497
498 #[cfg(not(target_family = "wasm"))]
499 if let Ok(conn) = &conn {
500 Self::spawn_connection_monitoring_next(
501 conn,
502 node_addr.id,
503 self.path_change.clone(),
504 );
505 }
506
507 conn
508 }
509 None => endpoint_next.connect(
510 next_node_id,
511 FEDIMINT_API_ALPN
512 ).await,
513 }
514 .map_err(Into::into)
515 .map_err(ServerError::Connection)?;
516
517 Ok(conn)
518 }
519}
520
521fn quic_transport_config() -> iroh::endpoint::TransportConfig {
524 let mut config = iroh::endpoint::TransportConfig::default();
525 config.max_idle_timeout(Some(
526 IROH_IDLE_TIMEOUT
527 .try_into()
528 .expect("idle timeout fits in IdleTimeout"),
529 ));
530 config.keep_alive_interval(Some(IROH_KEEP_ALIVE_INTERVAL));
531 config
532}
533
534fn quic_transport_config_next() -> iroh_next::endpoint::QuicTransportConfig {
537 iroh_next::endpoint::QuicTransportConfig::builder()
538 .max_idle_timeout(Some(
539 IROH_IDLE_TIMEOUT
540 .try_into()
541 .expect("idle timeout fits in IdleTimeout"),
542 ))
543 .keep_alive_interval(IROH_KEEP_ALIVE_INTERVAL)
544 .build()
545}
546
547fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::EndpointAddr {
548 let next_node_id =
549 iroh_next::EndpointId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail");
550 let relay_addrs = stable.relay_url.iter().map(|u| {
551 iroh_next::TransportAddr::Relay(
552 iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail"),
553 )
554 });
555 let direct_addrs = stable
556 .direct_addresses
557 .iter()
558 .copied()
559 .map(iroh_next::TransportAddr::Ip);
560
561 iroh_next::EndpointAddr::from_parts(next_node_id, relay_addrs.chain(direct_addrs))
562}
563
564#[apply(async_trait_maybe_send!)]
565impl IConnection for Connection {
566 async fn await_disconnection(&self) {
567 self.closed().await;
568 }
569
570 fn is_connected(&self) -> bool {
571 self.close_reason().is_none()
572 }
573}
574
575#[async_trait]
576impl IGuardianConnection for Connection {
577 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
578 let timeout = request_timeout_for_method(&method);
579 let method_str = method.to_string();
580 let json = serde_json::to_vec(&IrohApiRequest { method, request })
581 .expect("Serialization to vec can't fail");
582
583 let result = fedimint_core::runtime::timeout(timeout, async {
584 let (mut sink, mut stream) = self
585 .open_bi()
586 .await
587 .map_err(|e| ServerError::Transport(e.into()))?;
588
589 sink.write_all(&json)
590 .await
591 .map_err(|e| ServerError::Transport(e.into()))?;
592
593 sink.finish()
594 .map_err(|e| ServerError::Transport(e.into()))?;
595
596 stream
597 .read_to_end(IROH_MAX_RESPONSE_BYTES)
598 .await
599 .map_err(|e| ServerError::Transport(e.into()))
600 })
601 .await;
602
603 let response = match result {
604 Ok(Ok(bytes)) => bytes,
605 Ok(Err(err)) => return Err(err),
606 Err(_) => {
607 warn!(
614 target: LOG_NET_IROH,
615 method = %method_str,
616 timeout_secs = timeout.as_secs(),
617 "iroh request timed out, closing connection",
618 );
619 self.close(
620 iroh::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
621 IROH_REQUEST_TIMEOUT_ERROR_REASON,
622 );
623 return Err(ServerError::Transport(anyhow::anyhow!(
624 "iroh request {method_str} timed out after {timeout:?}"
625 )));
626 }
627 };
628
629 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
631 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
632
633 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
634 }
635}
636
637#[apply(async_trait_maybe_send!)]
638impl IConnection for iroh_next::endpoint::Connection {
639 async fn await_disconnection(&self) {
640 self.closed().await;
641 }
642
643 fn is_connected(&self) -> bool {
644 self.close_reason().is_none()
645 }
646}
647
648#[async_trait]
649impl IGuardianConnection for iroh_next::endpoint::Connection {
650 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
651 let timeout = request_timeout_for_method(&method);
652 let method_str = method.to_string();
653 let json = serde_json::to_vec(&IrohApiRequest { method, request })
654 .expect("Serialization to vec can't fail");
655
656 let result = fedimint_core::runtime::timeout(timeout, async {
657 let (mut sink, mut stream) = self
658 .open_bi()
659 .await
660 .map_err(|e| ServerError::Transport(e.into()))?;
661
662 sink.write_all(&json)
663 .await
664 .map_err(|e| ServerError::Transport(e.into()))?;
665
666 sink.finish()
667 .map_err(|e| ServerError::Transport(e.into()))?;
668
669 stream
670 .read_to_end(IROH_MAX_RESPONSE_BYTES)
671 .await
672 .map_err(|e| ServerError::Transport(e.into()))
673 })
674 .await;
675
676 let response = match result {
677 Ok(Ok(bytes)) => bytes,
678 Ok(Err(err)) => return Err(err),
679 Err(_) => {
680 warn!(
681 target: LOG_NET_IROH,
682 method = %method_str,
683 timeout_secs = timeout.as_secs(),
684 "iroh request timed out, closing connection",
685 );
686 self.close(
687 iroh_next::endpoint::VarInt::from_u32(IROH_REQUEST_TIMEOUT_ERROR_CODE),
688 IROH_REQUEST_TIMEOUT_ERROR_REASON,
689 );
690 return Err(ServerError::Transport(anyhow::anyhow!(
691 "iroh request {method_str} timed out after {timeout:?}"
692 )));
693 }
694 };
695
696 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
698 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
699
700 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
701 }
702}
703
704#[apply(async_trait_maybe_send!)]
705impl IGatewayConnection for Connection {
706 async fn request(
707 &self,
708 password: Option<String>,
709 _method: Method,
710 route: &str,
711 payload: Option<Value>,
712 ) -> ServerResult<Value> {
713 let iroh_request = IrohGatewayRequest {
714 route: route.to_string(),
715 params: payload,
716 password,
717 };
718 let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
719
720 let (mut sink, mut stream) = self
721 .open_bi()
722 .await
723 .map_err(|e| ServerError::Transport(e.into()))?;
724
725 sink.write_all(&json)
726 .await
727 .map_err(|e| ServerError::Transport(e.into()))?;
728
729 sink.finish()
730 .map_err(|e| ServerError::Transport(e.into()))?;
731
732 let response = stream
733 .read_to_end(IROH_MAX_RESPONSE_BYTES)
734 .await
735 .map_err(|e| ServerError::Transport(e.into()))?;
736
737 let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
738 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
739 match StatusCode::from_u16(response.status).map_err(|e| {
740 ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
741 })? {
742 StatusCode::OK => Ok(response.body),
743 status => Err(ServerError::ServerError(anyhow::anyhow!(
744 "Server returned status code: {}",
745 status
746 ))),
747 }
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use fedimint_core::module::ApiMethod;
754
755 use super::{
756 IROH_REQUEST_TIMEOUT_DEFAULT, IROH_REQUEST_TIMEOUT_LONG_POLL, request_timeout_for_method,
757 };
758
759 const AWAIT_ENDPOINTS: &[&str] = &[
765 "await_output_outcome",
767 "await_outputs_outcomes",
768 "await_session_outcome",
769 "await_signed_session_outcome",
770 "await_transaction",
771 "await_account",
773 "await_block_height",
774 "await_offer",
775 "await_outgoing_contract_cancelled",
776 "await_preimage_decryption",
777 "await_incoming_contract",
779 "await_incoming_contracts",
780 "await_preimage",
781 ];
782
783 const PROMPT_ENDPOINTS: &[&str] = &[
786 "block_count",
787 "session_count",
788 "session_status",
789 "status",
790 "version",
791 "client_config",
792 "audit",
793 "account",
794 "offer",
795 "list_gateways",
796 "submit_transaction",
797 "consensus_block_count",
798 ];
799
800 #[test]
801 fn await_prefix_gets_long_poll_timeout() {
802 for name in AWAIT_ENDPOINTS {
803 assert_eq!(
804 request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
805 IROH_REQUEST_TIMEOUT_LONG_POLL,
806 "core endpoint {name} should map to the long-poll timeout"
807 );
808 assert_eq!(
809 request_timeout_for_method(&ApiMethod::Module(0, (*name).to_owned())),
810 IROH_REQUEST_TIMEOUT_LONG_POLL,
811 "module endpoint {name} should map to the long-poll timeout"
812 );
813 }
814 }
815
816 #[test]
817 fn wait_prefix_also_gets_long_poll_timeout() {
818 assert_eq!(
822 request_timeout_for_method(&ApiMethod::Core("wait_for_event".to_owned())),
823 IROH_REQUEST_TIMEOUT_LONG_POLL,
824 );
825 }
826
827 #[test]
828 fn prompt_endpoints_get_default_timeout() {
829 for name in PROMPT_ENDPOINTS {
830 assert_eq!(
831 request_timeout_for_method(&ApiMethod::Core((*name).to_owned())),
832 IROH_REQUEST_TIMEOUT_DEFAULT,
833 "endpoint {name} should map to the default timeout"
834 );
835 }
836 }
837
838 #[test]
839 fn endpoints_that_merely_contain_await_are_not_misclassified() {
840 assert_eq!(
844 request_timeout_for_method(&ApiMethod::Core("submit_await_thing".to_owned())),
845 IROH_REQUEST_TIMEOUT_DEFAULT,
846 );
847 }
848}