1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::{
9 FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
10 parse_kv_list_from_env,
11};
12use fedimint_core::module::{
13 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
14 IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
15};
16use fedimint_core::task::spawn;
17use fedimint_core::util::{FmtCompact as _, SafeUrl};
18use fedimint_core::{apply, async_trait_maybe_send};
19use fedimint_logging::LOG_NET_IROH;
20use futures::Future;
21use futures::stream::{FuturesUnordered, StreamExt};
22use iroh::discovery::pkarr::PkarrResolver;
23use iroh::endpoint::Connection;
24use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
25use iroh_base::ticket::NodeTicket;
26use iroh_next::Watcher as _;
27use reqwest::{Method, StatusCode};
28use serde_json::Value;
29use tracing::{debug, trace, warn};
30
31use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
32use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
33
34#[derive(Clone)]
35pub(crate) struct IrohConnector {
36 stable: iroh::endpoint::Endpoint,
37 next: Option<iroh_next::endpoint::Endpoint>,
38
39 connection_overrides: BTreeMap<NodeId, NodeAddr>,
45}
46
47impl fmt::Debug for IrohConnector {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 f.debug_struct("IrohEndpoint")
50 .field("stable-id", &self.stable.node_id())
51 .field(
52 "next-id",
53 &self.next.as_ref().map(iroh_next::Endpoint::node_id),
54 )
55 .finish_non_exhaustive()
56 }
57}
58
59impl IrohConnector {
60 pub async fn new(
61 iroh_dns: Option<SafeUrl>,
62 iroh_enable_dht: bool,
63 iroh_enable_next: bool,
64 ) -> anyhow::Result<Self> {
65 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
66 const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
67 let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
68
69 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
70 s = s.with_connection_override(k, v.into());
71 }
72
73 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
74 s = s.with_connection_override(k, v.into());
75 }
76
77 Ok(s)
78 }
79
80 #[allow(clippy::too_many_lines)]
81 pub async fn new_no_overrides(
82 iroh_dns: Option<SafeUrl>,
83 iroh_enable_dht: bool,
84 iroh_enable_next: bool,
85 ) -> anyhow::Result<Self> {
86 let endpoint_stable = Box::pin({
87 let iroh_dns = iroh_dns.clone();
88 async {
89 let mut builder = Endpoint::builder();
90
91 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
92 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
93 }
94
95 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
97
98 #[cfg(not(target_family = "wasm"))]
99 if iroh_enable_dht {
100 builder = builder.discovery_dht();
101 }
102
103 {
105 if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
106 #[cfg(target_family = "wasm")]
107 {
108 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
109 }
110 } else {
111 warn!(
112 target: LOG_NET_IROH,
113 "Iroh pkarr resolver is disabled"
114 );
115 }
116
117 if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
118 #[cfg(not(target_family = "wasm"))]
119 {
120 builder = builder.add_discovery(move |_| {
121 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
122 });
123 }
124 } else {
125 warn!(
126 target: LOG_NET_IROH,
127 "Iroh n0 discovery is disabled"
128 );
129 }
130 }
131
132 let endpoint = builder.bind().await?;
133 debug!(
134 target: LOG_NET_IROH,
135 node_id = %endpoint.node_id(),
136 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
137 "Iroh api client endpoint (stable)"
138 );
139 Ok::<_, anyhow::Error>(endpoint)
140 }
141 });
142 let endpoint_next = Box::pin(async {
143 let mut builder = iroh_next::Endpoint::builder();
144
145 if let Some(iroh_dns) = iroh_dns.map(SafeUrl::to_unsafe) {
146 builder = builder.add_discovery(
147 iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
148 );
149 }
150
151 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
153
154 #[cfg(not(target_family = "wasm"))]
155 if iroh_enable_dht {
156 builder = builder.discovery_dht();
157 }
158
159 {
161 #[cfg(target_family = "wasm")]
163 {
164 builder =
165 builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
166 }
167 #[cfg(not(target_family = "wasm"))]
169 {
170 builder =
171 builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
172 }
173 }
174
175 let endpoint = builder.bind().await?;
176 debug!(
177 target: LOG_NET_IROH,
178 node_id = %endpoint.node_id(),
179 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
180 "Iroh api client endpoint (next)"
181 );
182 Ok(endpoint)
183 });
184
185 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
186 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
187 (s, Some(n))
188 } else {
189 (endpoint_stable.await?, None)
190 };
191
192 Ok(Self {
193 stable: endpoint_stable,
194 next: endpoint_next,
195 connection_overrides: BTreeMap::new(),
196 })
197 }
198
199 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
200 self.connection_overrides.insert(node, addr);
201 self
202 }
203
204 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
205 if url.scheme() != "iroh" {
206 bail!(
207 "Unsupported scheme: {}, passed to iroh endpoint handler",
208 url.scheme()
209 );
210 }
211 let host = url.host_str().context("Missing host string in Iroh URL")?;
212
213 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
214
215 Ok(node_id)
216 }
217}
218
219#[async_trait::async_trait]
220impl crate::Connector for IrohConnector {
221 async fn connect_guardian(
222 &self,
223 url: &SafeUrl,
224 api_secret: Option<&str>,
225 ) -> ServerResult<DynGuaridianConnection> {
226 if api_secret.is_some() {
227 ServerError::Connection(anyhow::format_err!(
230 "Iroh api secrets currently not supported"
231 ));
232 }
233 let node_id =
234 Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
235 source,
236 url: url.to_owned(),
237 })?;
238 let mut futures = FuturesUnordered::<
239 Pin<
240 Box<
241 dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
242 + Send,
243 >,
244 >,
245 >::new();
246 let connection_override = self.connection_overrides.get(&node_id).cloned();
247
248 let self_clone = self.clone();
249 futures.push(Box::pin({
250 let connection_override = connection_override.clone();
251 async move {
252 (
253 self_clone
254 .make_new_connection_stable(node_id, connection_override)
255 .await
256 .map(super::IGuardianConnection::into_dyn),
257 "stable",
258 )
259 }
260 }));
261
262 if let Some(endpoint_next) = &self.next {
263 let self_clone = self.clone();
264 let endpoint_next = endpoint_next.clone();
265 futures.push(Box::pin(async move {
266 (
267 self_clone
268 .make_new_connection_next(&endpoint_next, node_id, connection_override)
269 .await
270 .map(super::IGuardianConnection::into_dyn),
271 "next",
272 )
273 }));
274 }
275
276 let mut prev_err = None;
279
280 while let Some((result, iroh_stack)) = futures.next().await {
282 match result {
283 Ok(connection) => return Ok(connection),
284 Err(err) => {
285 warn!(
286 target: LOG_NET_IROH,
287 err = %err.fmt_compact(),
288 %iroh_stack,
289 "Join error in iroh connection task"
290 );
291 prev_err = Some(err);
292 }
293 }
294 }
295
296 Err(prev_err.unwrap_or_else(|| {
297 ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
298 }))
299 }
300
301 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
302 let node_id = Self::node_id_from_url(url)?;
303 if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
304 let conn = self
305 .stable
306 .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
307 .await?;
308
309 #[cfg(not(target_family = "wasm"))]
310 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
311
312 Ok(IGatewayConnection::into_dyn(conn))
313 } else {
314 let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
315 Ok(IGatewayConnection::into_dyn(conn))
316 }
317 }
318}
319
320impl IrohConnector {
321 #[cfg(not(target_family = "wasm"))]
322 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
323 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
324 #[allow(clippy::let_underscore_future)]
325 let _ = spawn("iroh connection (stable)", async move {
326 if let Ok(conn_type) = conn_type_watcher.get() {
327 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
328 }
329 while let Ok(event) = conn_type_watcher.updated().await {
330 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
331 }
332 });
333 }
334 }
335
336 #[cfg(not(target_family = "wasm"))]
337 fn spawn_connection_monitoring_next(
338 endpoint: &iroh_next::Endpoint,
339 node_addr: &iroh_next::NodeAddr,
340 ) {
341 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
342 let node_id = node_addr.node_id;
343 #[allow(clippy::let_underscore_future)]
344 let _ = spawn("iroh connection (next)", async move {
345 if let Ok(conn_type) = conn_type_watcher.get() {
346 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
347 }
348 while let Ok(event) = conn_type_watcher.updated().await {
349 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
350 }
351 });
352 }
353 }
354
355 async fn make_new_connection_stable(
356 &self,
357 node_id: NodeId,
358 node_addr: Option<NodeAddr>,
359 ) -> ServerResult<Connection> {
360 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
361 let conn = match node_addr.clone() {
362 Some(node_addr) => {
363 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
364 let conn = self.stable
365 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
366 .await;
367
368 #[cfg(not(target_family = "wasm"))]
369 if conn.is_ok() {
370 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
371 }
372 conn
373 }
374 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
375 }.map_err(ServerError::Connection)?;
376
377 Ok(conn)
378 }
379
380 async fn make_new_connection_next(
381 &self,
382 endpoint_next: &iroh_next::Endpoint,
383 node_id: NodeId,
384 node_addr: Option<NodeAddr>,
385 ) -> ServerResult<iroh_next::endpoint::Connection> {
386 let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
387
388 let endpoint_next = endpoint_next.clone();
389
390 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
391 let conn = match node_addr.clone() {
392 Some(node_addr) => {
393 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
394 let node_addr = node_addr_stable_to_next(&node_addr);
395 let conn = endpoint_next
396 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
397 .await;
398
399 #[cfg(not(target_family = "wasm"))]
400 if conn.is_ok() {
401 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
402 }
403
404 conn
405 }
406 None => endpoint_next.connect(
407 next_node_id,
408 FEDIMINT_API_ALPN
409 ).await,
410 }
411 .map_err(Into::into)
412 .map_err(ServerError::Connection)?;
413
414 Ok(conn)
415 }
416}
417
418fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
419 iroh_next::NodeAddr {
420 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
421 relay_url: stable
422 .relay_url
423 .as_ref()
424 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
425 direct_addresses: stable.direct_addresses.clone(),
426 }
427}
428
429#[apply(async_trait_maybe_send!)]
430impl IConnection for Connection {
431 async fn await_disconnection(&self) {
432 self.closed().await;
433 }
434
435 fn is_connected(&self) -> bool {
436 self.close_reason().is_none()
437 }
438}
439
440#[async_trait]
441impl IGuardianConnection for Connection {
442 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
443 let json = serde_json::to_vec(&IrohApiRequest { method, request })
444 .expect("Serialization to vec can't fail");
445
446 let (mut sink, mut stream) = self
447 .open_bi()
448 .await
449 .map_err(|e| ServerError::Transport(e.into()))?;
450
451 sink.write_all(&json)
452 .await
453 .map_err(|e| ServerError::Transport(e.into()))?;
454
455 sink.finish()
456 .map_err(|e| ServerError::Transport(e.into()))?;
457
458 let response = stream
459 .read_to_end(1_000_000)
460 .await
461 .map_err(|e| ServerError::Transport(e.into()))?;
462
463 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
465 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
466
467 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
468 }
469}
470
471#[apply(async_trait_maybe_send!)]
472impl IConnection for iroh_next::endpoint::Connection {
473 async fn await_disconnection(&self) {
474 self.closed().await;
475 }
476
477 fn is_connected(&self) -> bool {
478 self.close_reason().is_none()
479 }
480}
481
482#[async_trait]
483impl IGuardianConnection for iroh_next::endpoint::Connection {
484 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
485 let json = serde_json::to_vec(&IrohApiRequest { method, request })
486 .expect("Serialization to vec can't fail");
487
488 let (mut sink, mut stream) = self
489 .open_bi()
490 .await
491 .map_err(|e| ServerError::Transport(e.into()))?;
492
493 sink.write_all(&json)
494 .await
495 .map_err(|e| ServerError::Transport(e.into()))?;
496
497 sink.finish()
498 .map_err(|e| ServerError::Transport(e.into()))?;
499
500 let response = stream
501 .read_to_end(1_000_000)
502 .await
503 .map_err(|e| ServerError::Transport(e.into()))?;
504
505 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
507 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
508
509 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
510 }
511}
512
513#[apply(async_trait_maybe_send!)]
514impl IGatewayConnection for Connection {
515 async fn request(
516 &self,
517 password: Option<String>,
518 _method: Method,
519 route: &str,
520 payload: Option<Value>,
521 ) -> ServerResult<Value> {
522 let iroh_request = IrohGatewayRequest {
523 route: route.to_string(),
524 params: payload,
525 password,
526 };
527 let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
528
529 let (mut sink, mut stream) = self
530 .open_bi()
531 .await
532 .map_err(|e| ServerError::Transport(e.into()))?;
533
534 sink.write_all(&json)
535 .await
536 .map_err(|e| ServerError::Transport(e.into()))?;
537
538 sink.finish()
539 .map_err(|e| ServerError::Transport(e.into()))?;
540
541 let response = stream
542 .read_to_end(1_000_000)
543 .await
544 .map_err(|e| ServerError::Transport(e.into()))?;
545
546 let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
547 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
548 match StatusCode::from_u16(response.status).map_err(|e| {
549 ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
550 })? {
551 StatusCode::OK => Ok(response.body),
552 status => Err(ServerError::ServerError(anyhow::anyhow!(
553 "Server returned status code: {}",
554 status
555 ))),
556 }
557 }
558}