1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
9use fedimint_core::envs::{
10 FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
11 parse_kv_list_from_env,
12};
13use fedimint_core::module::{
14 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
15 IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
16};
17
18const IROH_MAX_RESPONSE_BYTES: usize = ALEPH_BFT_UNIT_BYTE_LIMIT * 3600 * 4 * 2;
27use fedimint_core::task::spawn;
28use fedimint_core::util::{FmtCompact as _, SafeUrl};
29use fedimint_core::{apply, async_trait_maybe_send};
30use fedimint_logging::LOG_NET_IROH;
31use futures::Future;
32use futures::stream::{FuturesUnordered, StreamExt};
33use iroh::discovery::pkarr::PkarrResolver;
34use iroh::endpoint::Connection;
35use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
36use iroh_base::ticket::NodeTicket;
37use iroh_next::Watcher as _;
38use reqwest::{Method, StatusCode};
39use serde_json::Value;
40use tracing::{debug, trace, warn};
41
42use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
43use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
44
45#[derive(Clone)]
46pub(crate) struct IrohConnector {
47 stable: iroh::endpoint::Endpoint,
48 next: Option<iroh_next::endpoint::Endpoint>,
49
50 connection_overrides: BTreeMap<NodeId, NodeAddr>,
56}
57
58impl fmt::Debug for IrohConnector {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("IrohEndpoint")
61 .field("stable-id", &self.stable.node_id())
62 .field(
63 "next-id",
64 &self.next.as_ref().map(iroh_next::Endpoint::node_id),
65 )
66 .finish_non_exhaustive()
67 }
68}
69
70impl IrohConnector {
71 pub async fn new(
72 iroh_dns: Option<SafeUrl>,
73 iroh_enable_dht: bool,
74 iroh_enable_next: bool,
75 ) -> anyhow::Result<Self> {
76 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
77 const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
78 let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
79
80 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
81 s = s.with_connection_override(k, v.into());
82 }
83
84 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
85 s = s.with_connection_override(k, v.into());
86 }
87
88 Ok(s)
89 }
90
91 #[allow(clippy::too_many_lines)]
92 pub async fn new_no_overrides(
93 iroh_dns: Option<SafeUrl>,
94 iroh_enable_dht: bool,
95 iroh_enable_next: bool,
96 ) -> anyhow::Result<Self> {
97 let endpoint_stable = Box::pin({
98 let iroh_dns = iroh_dns.clone();
99 async {
100 let mut builder = Endpoint::builder();
101
102 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
103 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
104 }
105
106 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
108
109 #[cfg(not(target_family = "wasm"))]
110 if iroh_enable_dht {
111 builder = builder.discovery_dht();
112 }
113
114 {
116 if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
117 #[cfg(target_family = "wasm")]
118 {
119 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
120 }
121 } else {
122 warn!(
123 target: LOG_NET_IROH,
124 "Iroh pkarr resolver is disabled"
125 );
126 }
127
128 if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
129 #[cfg(not(target_family = "wasm"))]
130 {
131 builder = builder.add_discovery(move |_| {
132 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
133 });
134 }
135 } else {
136 warn!(
137 target: LOG_NET_IROH,
138 "Iroh n0 discovery is disabled"
139 );
140 }
141 }
142
143 let endpoint = builder.bind().await?;
144 debug!(
145 target: LOG_NET_IROH,
146 node_id = %endpoint.node_id(),
147 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
148 "Iroh api client endpoint (stable)"
149 );
150 Ok::<_, anyhow::Error>(endpoint)
151 }
152 });
153 let endpoint_next = Box::pin(async {
154 let mut builder = iroh_next::Endpoint::builder();
155
156 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
157 builder = builder.add_discovery(
158 iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
159 );
160 }
161
162 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
164
165 #[cfg(not(target_family = "wasm"))]
166 if iroh_enable_dht {
167 builder = builder.discovery_dht();
168 }
169
170 {
172 #[cfg(target_family = "wasm")]
174 {
175 builder =
176 builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
177 }
178 #[cfg(not(target_family = "wasm"))]
180 {
181 builder =
182 builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
183 }
184 }
185
186 let endpoint = builder.bind().await?;
187 debug!(
188 target: LOG_NET_IROH,
189 node_id = %endpoint.node_id(),
190 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
191 "Iroh api client endpoint (next)"
192 );
193 Ok(endpoint)
194 });
195
196 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
197 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
198 (s, Some(n))
199 } else {
200 (endpoint_stable.await?, None)
201 };
202
203 Ok(Self {
204 stable: endpoint_stable,
205 next: endpoint_next,
206 connection_overrides: BTreeMap::new(),
207 })
208 }
209
210 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
211 self.connection_overrides.insert(node, addr);
212 self
213 }
214
215 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
216 if url.scheme() != "iroh" {
217 bail!(
218 "Unsupported scheme: {}, passed to iroh endpoint handler",
219 url.scheme()
220 );
221 }
222 let host = url.host_str().context("Missing host string in Iroh URL")?;
223
224 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
225
226 Ok(node_id)
227 }
228}
229
230#[async_trait::async_trait]
231impl crate::Connector for IrohConnector {
232 async fn connect_guardian(
233 &self,
234 url: &SafeUrl,
235 api_secret: Option<&str>,
236 ) -> ServerResult<DynGuaridianConnection> {
237 if api_secret.is_some() {
238 ServerError::Connection(anyhow::format_err!(
241 "Iroh api secrets currently not supported"
242 ));
243 }
244 let node_id =
245 Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
246 source,
247 url: url.to_owned(),
248 })?;
249 let mut futures = FuturesUnordered::<
250 Pin<
251 Box<
252 dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
253 + Send,
254 >,
255 >,
256 >::new();
257 let connection_override = self.connection_overrides.get(&node_id).cloned();
258
259 let self_clone = self.clone();
260 futures.push(Box::pin({
261 let connection_override = connection_override.clone();
262 async move {
263 (
264 self_clone
265 .make_new_connection_stable(node_id, connection_override)
266 .await
267 .map(super::IGuardianConnection::into_dyn),
268 "stable",
269 )
270 }
271 }));
272
273 if let Some(endpoint_next) = &self.next {
274 let self_clone = self.clone();
275 let endpoint_next = endpoint_next.clone();
276 futures.push(Box::pin(async move {
277 (
278 self_clone
279 .make_new_connection_next(&endpoint_next, node_id, connection_override)
280 .await
281 .map(super::IGuardianConnection::into_dyn),
282 "next",
283 )
284 }));
285 }
286
287 let mut prev_err = None;
290
291 while let Some((result, iroh_stack)) = futures.next().await {
293 match result {
294 Ok(connection) => return Ok(connection),
295 Err(err) => {
296 warn!(
297 target: LOG_NET_IROH,
298 err = %err.fmt_compact(),
299 %iroh_stack,
300 "Join error in iroh connection task"
301 );
302 prev_err = Some(err);
303 }
304 }
305 }
306
307 Err(prev_err.unwrap_or_else(|| {
308 ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
309 }))
310 }
311
312 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
313 let node_id = Self::node_id_from_url(url)?;
314 if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
315 let conn = self
316 .stable
317 .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
318 .await?;
319
320 #[cfg(not(target_family = "wasm"))]
321 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
322
323 Ok(IGatewayConnection::into_dyn(conn))
324 } else {
325 let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
326 Ok(IGatewayConnection::into_dyn(conn))
327 }
328 }
329}
330
331impl IrohConnector {
332 #[cfg(not(target_family = "wasm"))]
333 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
334 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
335 #[allow(clippy::let_underscore_future)]
336 let _ = spawn("iroh connection (stable)", async move {
337 if let Ok(conn_type) = conn_type_watcher.get() {
338 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
339 }
340 while let Ok(event) = conn_type_watcher.updated().await {
341 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
342 }
343 });
344 }
345 }
346
347 #[cfg(not(target_family = "wasm"))]
348 fn spawn_connection_monitoring_next(
349 endpoint: &iroh_next::Endpoint,
350 node_addr: &iroh_next::NodeAddr,
351 ) {
352 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
353 let node_id = node_addr.node_id;
354 #[allow(clippy::let_underscore_future)]
355 let _ = spawn("iroh connection (next)", async move {
356 if let Ok(conn_type) = conn_type_watcher.get() {
357 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
358 }
359 while let Ok(event) = conn_type_watcher.updated().await {
360 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
361 }
362 });
363 }
364 }
365
366 async fn make_new_connection_stable(
367 &self,
368 node_id: NodeId,
369 node_addr: Option<NodeAddr>,
370 ) -> ServerResult<Connection> {
371 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
372 let conn = match node_addr.clone() {
373 Some(node_addr) => {
374 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
375 let conn = self.stable
376 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
377 .await;
378
379 #[cfg(not(target_family = "wasm"))]
380 if conn.is_ok() {
381 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
382 }
383 conn
384 }
385 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
386 }.map_err(ServerError::Connection)?;
387
388 Ok(conn)
389 }
390
391 async fn make_new_connection_next(
392 &self,
393 endpoint_next: &iroh_next::Endpoint,
394 node_id: NodeId,
395 node_addr: Option<NodeAddr>,
396 ) -> ServerResult<iroh_next::endpoint::Connection> {
397 let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
398
399 let endpoint_next = endpoint_next.clone();
400
401 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
402 let conn = match node_addr.clone() {
403 Some(node_addr) => {
404 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
405 let node_addr = node_addr_stable_to_next(&node_addr);
406 let conn = endpoint_next
407 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
408 .await;
409
410 #[cfg(not(target_family = "wasm"))]
411 if conn.is_ok() {
412 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
413 }
414
415 conn
416 }
417 None => endpoint_next.connect(
418 next_node_id,
419 FEDIMINT_API_ALPN
420 ).await,
421 }
422 .map_err(Into::into)
423 .map_err(ServerError::Connection)?;
424
425 Ok(conn)
426 }
427}
428
429fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
430 iroh_next::NodeAddr {
431 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
432 relay_url: stable
433 .relay_url
434 .as_ref()
435 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
436 direct_addresses: stable.direct_addresses.clone(),
437 }
438}
439
440#[apply(async_trait_maybe_send!)]
441impl IConnection for Connection {
442 async fn await_disconnection(&self) {
443 self.closed().await;
444 }
445
446 fn is_connected(&self) -> bool {
447 self.close_reason().is_none()
448 }
449}
450
451#[async_trait]
452impl IGuardianConnection for Connection {
453 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
454 let json = serde_json::to_vec(&IrohApiRequest { method, request })
455 .expect("Serialization to vec can't fail");
456
457 let (mut sink, mut stream) = self
458 .open_bi()
459 .await
460 .map_err(|e| ServerError::Transport(e.into()))?;
461
462 sink.write_all(&json)
463 .await
464 .map_err(|e| ServerError::Transport(e.into()))?;
465
466 sink.finish()
467 .map_err(|e| ServerError::Transport(e.into()))?;
468
469 let response = stream
470 .read_to_end(IROH_MAX_RESPONSE_BYTES)
471 .await
472 .map_err(|e| ServerError::Transport(e.into()))?;
473
474 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
476 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
477
478 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
479 }
480}
481
482#[apply(async_trait_maybe_send!)]
483impl IConnection for iroh_next::endpoint::Connection {
484 async fn await_disconnection(&self) {
485 self.closed().await;
486 }
487
488 fn is_connected(&self) -> bool {
489 self.close_reason().is_none()
490 }
491}
492
493#[async_trait]
494impl IGuardianConnection for iroh_next::endpoint::Connection {
495 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
496 let json = serde_json::to_vec(&IrohApiRequest { method, request })
497 .expect("Serialization to vec can't fail");
498
499 let (mut sink, mut stream) = self
500 .open_bi()
501 .await
502 .map_err(|e| ServerError::Transport(e.into()))?;
503
504 sink.write_all(&json)
505 .await
506 .map_err(|e| ServerError::Transport(e.into()))?;
507
508 sink.finish()
509 .map_err(|e| ServerError::Transport(e.into()))?;
510
511 let response = stream
512 .read_to_end(IROH_MAX_RESPONSE_BYTES)
513 .await
514 .map_err(|e| ServerError::Transport(e.into()))?;
515
516 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
518 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
519
520 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
521 }
522}
523
524#[apply(async_trait_maybe_send!)]
525impl IGatewayConnection for Connection {
526 async fn request(
527 &self,
528 password: Option<String>,
529 _method: Method,
530 route: &str,
531 payload: Option<Value>,
532 ) -> ServerResult<Value> {
533 let iroh_request = IrohGatewayRequest {
534 route: route.to_string(),
535 params: payload,
536 password,
537 };
538 let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
539
540 let (mut sink, mut stream) = self
541 .open_bi()
542 .await
543 .map_err(|e| ServerError::Transport(e.into()))?;
544
545 sink.write_all(&json)
546 .await
547 .map_err(|e| ServerError::Transport(e.into()))?;
548
549 sink.finish()
550 .map_err(|e| ServerError::Transport(e.into()))?;
551
552 let response = stream
553 .read_to_end(IROH_MAX_RESPONSE_BYTES)
554 .await
555 .map_err(|e| ServerError::Transport(e.into()))?;
556
557 let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
558 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
559 match StatusCode::from_u16(response.status).map_err(|e| {
560 ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
561 })? {
562 StatusCode::OK => Ok(response.body),
563 status => Err(ServerError::ServerError(anyhow::anyhow!(
564 "Server returned status code: {}",
565 status
566 ))),
567 }
568 }
569}