1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::{
9 FM_IROH_N0_DISCOVERY_ENABLE_ENV, FM_IROH_PKARR_RESOLVER_ENABLE_ENV, is_env_var_set_opt,
10 parse_kv_list_from_env,
11};
12use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
13use fedimint_core::module::{
14 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, FEDIMINT_GATEWAY_ALPN,
15 IrohApiRequest, IrohGatewayRequest, IrohGatewayResponse,
16};
17use fedimint_core::task::spawn;
18use fedimint_core::util::{FmtCompact as _, SafeUrl};
19use fedimint_core::{apply, async_trait_maybe_send};
20use fedimint_logging::LOG_NET_IROH;
21use futures::Future;
22use futures::stream::{FuturesUnordered, StreamExt};
23use iroh::discovery::pkarr::PkarrResolver;
24use iroh::endpoint::Connection;
25use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
26use iroh_base::ticket::NodeTicket;
27use iroh_next::Watcher as _;
28use reqwest::{Method, StatusCode};
29use serde_json::Value;
30use tracing::{debug, trace, warn};
31use url::Url;
32
33use super::{DynGuaridianConnection, IGuardianConnection, ServerError, ServerResult};
34use crate::{DynGatewayConnection, IConnection, IGatewayConnection};
35
36#[derive(Clone)]
37pub(crate) struct IrohConnector {
38 stable: iroh::endpoint::Endpoint,
39 next: Option<iroh_next::endpoint::Endpoint>,
40
41 connection_overrides: BTreeMap<NodeId, NodeAddr>,
47}
48
49impl fmt::Debug for IrohConnector {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("IrohEndpoint")
52 .field("stable-id", &self.stable.node_id())
53 .field(
54 "next-id",
55 &self.next.as_ref().map(iroh_next::Endpoint::node_id),
56 )
57 .finish_non_exhaustive()
58 }
59}
60
61impl IrohConnector {
62 pub async fn new(
63 iroh_dns: Option<SafeUrl>,
64 iroh_enable_dht: bool,
65 iroh_enable_next: bool,
66 ) -> anyhow::Result<Self> {
67 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
68 const FM_GW_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_GW_IROH_CONNECT_OVERRIDES";
69 let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
70
71 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
72 s = s.with_connection_override(k, v.into());
73 }
74
75 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_GW_IROH_CONNECT_OVERRIDES_ENV)? {
76 s = s.with_connection_override(k, v.into());
77 }
78
79 Ok(s)
80 }
81
82 #[allow(clippy::too_many_lines)]
83 pub async fn new_no_overrides(
84 iroh_dns: Option<SafeUrl>,
85 iroh_enable_dht: bool,
86 iroh_enable_next: bool,
87 ) -> anyhow::Result<Self> {
88 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
89 || {
90 FM_IROH_DNS_FEDIMINT_PROD
91 .into_iter()
92 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
93 .collect()
94 },
95 |url| vec![url.to_unsafe()],
96 );
97
98 let endpoint_stable = Box::pin({
99 let iroh_dns_servers = iroh_dns_servers.clone();
100 async {
101 let mut builder = Endpoint::builder();
102
103 for iroh_dns in iroh_dns_servers {
104 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
105 }
106
107 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
109
110 #[cfg(not(target_family = "wasm"))]
111 if iroh_enable_dht {
112 builder = builder.discovery_dht();
113 }
114
115 {
117 if is_env_var_set_opt(FM_IROH_PKARR_RESOLVER_ENABLE_ENV).unwrap_or(true) {
118 #[cfg(target_family = "wasm")]
119 {
120 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
121 }
122 } else {
123 warn!(
124 target: LOG_NET_IROH,
125 "Iroh pkarr resolver is disabled"
126 );
127 }
128
129 if is_env_var_set_opt(FM_IROH_N0_DISCOVERY_ENABLE_ENV).unwrap_or(true) {
130 #[cfg(not(target_family = "wasm"))]
131 {
132 builder = builder.add_discovery(move |_| {
133 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
134 });
135 }
136 } else {
137 warn!(
138 target: LOG_NET_IROH,
139 "Iroh n0 discovery is disabled"
140 );
141 }
142 }
143
144 let endpoint = builder.bind().await?;
145 debug!(
146 target: LOG_NET_IROH,
147 node_id = %endpoint.node_id(),
148 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
149 "Iroh api client endpoint (stable)"
150 );
151 Ok::<_, anyhow::Error>(endpoint)
152 }
153 });
154 let endpoint_next = Box::pin(async {
155 let mut builder = iroh_next::Endpoint::builder();
156
157 for iroh_dns in iroh_dns_servers {
158 builder = builder.add_discovery(
159 iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
160 );
161 }
162
163 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
165
166 #[cfg(not(target_family = "wasm"))]
167 if iroh_enable_dht {
168 builder = builder.discovery_dht();
169 }
170
171 {
173 #[cfg(target_family = "wasm")]
175 {
176 builder =
177 builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
178 }
179 #[cfg(not(target_family = "wasm"))]
181 {
182 builder =
183 builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
184 }
185 }
186
187 let endpoint = builder.bind().await?;
188 debug!(
189 target: LOG_NET_IROH,
190 node_id = %endpoint.node_id(),
191 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
192 "Iroh api client endpoint (next)"
193 );
194 Ok(endpoint)
195 });
196
197 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
198 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
199 (s, Some(n))
200 } else {
201 (endpoint_stable.await?, None)
202 };
203
204 Ok(Self {
205 stable: endpoint_stable,
206 next: endpoint_next,
207 connection_overrides: BTreeMap::new(),
208 })
209 }
210
211 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
212 self.connection_overrides.insert(node, addr);
213 self
214 }
215
216 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
217 if url.scheme() != "iroh" {
218 bail!(
219 "Unsupported scheme: {}, passed to iroh endpoint handler",
220 url.scheme()
221 );
222 }
223 let host = url.host_str().context("Missing host string in Iroh URL")?;
224
225 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
226
227 Ok(node_id)
228 }
229}
230
231#[async_trait::async_trait]
232impl crate::Connector for IrohConnector {
233 async fn connect_guardian(
234 &self,
235 url: &SafeUrl,
236 api_secret: Option<&str>,
237 ) -> ServerResult<DynGuaridianConnection> {
238 if api_secret.is_some() {
239 ServerError::Connection(anyhow::format_err!(
242 "Iroh api secrets currently not supported"
243 ));
244 }
245 let node_id =
246 Self::node_id_from_url(url).map_err(|source| ServerError::InvalidPeerUrl {
247 source,
248 url: url.to_owned(),
249 })?;
250 let mut futures = FuturesUnordered::<
251 Pin<
252 Box<
253 dyn Future<Output = (ServerResult<DynGuaridianConnection>, &'static str)>
254 + Send,
255 >,
256 >,
257 >::new();
258 let connection_override = self.connection_overrides.get(&node_id).cloned();
259
260 let self_clone = self.clone();
261 futures.push(Box::pin({
262 let connection_override = connection_override.clone();
263 async move {
264 (
265 self_clone
266 .make_new_connection_stable(node_id, connection_override)
267 .await
268 .map(super::IGuardianConnection::into_dyn),
269 "stable",
270 )
271 }
272 }));
273
274 if let Some(endpoint_next) = &self.next {
275 let self_clone = self.clone();
276 let endpoint_next = endpoint_next.clone();
277 futures.push(Box::pin(async move {
278 (
279 self_clone
280 .make_new_connection_next(&endpoint_next, node_id, connection_override)
281 .await
282 .map(super::IGuardianConnection::into_dyn),
283 "next",
284 )
285 }));
286 }
287
288 let mut prev_err = None;
291
292 while let Some((result, iroh_stack)) = futures.next().await {
294 match result {
295 Ok(connection) => return Ok(connection),
296 Err(err) => {
297 warn!(
298 target: LOG_NET_IROH,
299 err = %err.fmt_compact(),
300 %iroh_stack,
301 "Join error in iroh connection task"
302 );
303 prev_err = Some(err);
304 }
305 }
306 }
307
308 Err(prev_err.unwrap_or_else(|| {
309 ServerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
310 }))
311 }
312
313 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
314 let node_id = Self::node_id_from_url(url)?;
315 if let Some(node_addr) = self.connection_overrides.get(&node_id).cloned() {
316 let conn = self
317 .stable
318 .connect(node_addr.clone(), FEDIMINT_GATEWAY_ALPN)
319 .await?;
320
321 #[cfg(not(target_family = "wasm"))]
322 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
323
324 Ok(IGatewayConnection::into_dyn(conn))
325 } else {
326 let conn = self.stable.connect(node_id, FEDIMINT_GATEWAY_ALPN).await?;
327 Ok(IGatewayConnection::into_dyn(conn))
328 }
329 }
330}
331
332impl IrohConnector {
333 #[cfg(not(target_family = "wasm"))]
334 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
335 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
336 #[allow(clippy::let_underscore_future)]
337 let _ = spawn("iroh connection (stable)", async move {
338 if let Ok(conn_type) = conn_type_watcher.get() {
339 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
340 }
341 while let Ok(event) = conn_type_watcher.updated().await {
342 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
343 }
344 });
345 }
346 }
347
348 #[cfg(not(target_family = "wasm"))]
349 fn spawn_connection_monitoring_next(
350 endpoint: &iroh_next::Endpoint,
351 node_addr: &iroh_next::NodeAddr,
352 ) {
353 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
354 let node_id = node_addr.node_id;
355 #[allow(clippy::let_underscore_future)]
356 let _ = spawn("iroh connection (next)", async move {
357 if let Ok(conn_type) = conn_type_watcher.get() {
358 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
359 }
360 while let Ok(event) = conn_type_watcher.updated().await {
361 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
362 }
363 });
364 }
365 }
366
367 async fn make_new_connection_stable(
368 &self,
369 node_id: NodeId,
370 node_addr: Option<NodeAddr>,
371 ) -> ServerResult<Connection> {
372 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
373 let conn = match node_addr.clone() {
374 Some(node_addr) => {
375 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
376 let conn = self.stable
377 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
378 .await;
379
380 #[cfg(not(target_family = "wasm"))]
381 if conn.is_ok() {
382 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
383 }
384 conn
385 }
386 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
387 }.map_err(ServerError::Connection)?;
388
389 Ok(conn)
390 }
391
392 async fn make_new_connection_next(
393 &self,
394 endpoint_next: &iroh_next::Endpoint,
395 node_id: NodeId,
396 node_addr: Option<NodeAddr>,
397 ) -> ServerResult<iroh_next::endpoint::Connection> {
398 let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
399
400 let endpoint_next = endpoint_next.clone();
401
402 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
403 let conn = match node_addr.clone() {
404 Some(node_addr) => {
405 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
406 let node_addr = node_addr_stable_to_next(&node_addr);
407 let conn = endpoint_next
408 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
409 .await;
410
411 #[cfg(not(target_family = "wasm"))]
412 if conn.is_ok() {
413 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
414 }
415
416 conn
417 }
418 None => endpoint_next.connect(
419 next_node_id,
420 FEDIMINT_API_ALPN
421 ).await,
422 }
423 .map_err(Into::into)
424 .map_err(ServerError::Connection)?;
425
426 Ok(conn)
427 }
428}
429
430fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
431 iroh_next::NodeAddr {
432 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
433 relay_url: stable
434 .relay_url
435 .as_ref()
436 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
437 direct_addresses: stable.direct_addresses.clone(),
438 }
439}
440
441#[apply(async_trait_maybe_send!)]
442impl IConnection for Connection {
443 async fn await_disconnection(&self) {
444 self.closed().await;
445 }
446
447 fn is_connected(&self) -> bool {
448 self.close_reason().is_none()
449 }
450}
451
452#[async_trait]
453impl IGuardianConnection for Connection {
454 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
455 let json = serde_json::to_vec(&IrohApiRequest { method, request })
456 .expect("Serialization to vec can't fail");
457
458 let (mut sink, mut stream) = self
459 .open_bi()
460 .await
461 .map_err(|e| ServerError::Transport(e.into()))?;
462
463 sink.write_all(&json)
464 .await
465 .map_err(|e| ServerError::Transport(e.into()))?;
466
467 sink.finish()
468 .map_err(|e| ServerError::Transport(e.into()))?;
469
470 let response = stream
471 .read_to_end(1_000_000)
472 .await
473 .map_err(|e| ServerError::Transport(e.into()))?;
474
475 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
477 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
478
479 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
480 }
481}
482
483#[apply(async_trait_maybe_send!)]
484impl IConnection for iroh_next::endpoint::Connection {
485 async fn await_disconnection(&self) {
486 self.closed().await;
487 }
488
489 fn is_connected(&self) -> bool {
490 self.close_reason().is_none()
491 }
492}
493
494#[async_trait]
495impl IGuardianConnection for iroh_next::endpoint::Connection {
496 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
497 let json = serde_json::to_vec(&IrohApiRequest { method, request })
498 .expect("Serialization to vec can't fail");
499
500 let (mut sink, mut stream) = self
501 .open_bi()
502 .await
503 .map_err(|e| ServerError::Transport(e.into()))?;
504
505 sink.write_all(&json)
506 .await
507 .map_err(|e| ServerError::Transport(e.into()))?;
508
509 sink.finish()
510 .map_err(|e| ServerError::Transport(e.into()))?;
511
512 let response = stream
513 .read_to_end(1_000_000)
514 .await
515 .map_err(|e| ServerError::Transport(e.into()))?;
516
517 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
519 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
520
521 response.map_err(|e| ServerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
522 }
523}
524
525#[apply(async_trait_maybe_send!)]
526impl IGatewayConnection for Connection {
527 async fn request(
528 &self,
529 password: Option<String>,
530 _method: Method,
531 route: &str,
532 payload: Option<Value>,
533 ) -> ServerResult<Value> {
534 let iroh_request = IrohGatewayRequest {
535 route: route.to_string(),
536 params: payload,
537 password,
538 };
539 let json = serde_json::to_vec(&iroh_request).expect("serialization cant fail");
540
541 let (mut sink, mut stream) = self
542 .open_bi()
543 .await
544 .map_err(|e| ServerError::Transport(e.into()))?;
545
546 sink.write_all(&json)
547 .await
548 .map_err(|e| ServerError::Transport(e.into()))?;
549
550 sink.finish()
551 .map_err(|e| ServerError::Transport(e.into()))?;
552
553 let response = stream
554 .read_to_end(1_000_000)
555 .await
556 .map_err(|e| ServerError::Transport(e.into()))?;
557
558 let response = serde_json::from_slice::<IrohGatewayResponse>(&response)
559 .map_err(|e| ServerError::InvalidResponse(e.into()))?;
560 match StatusCode::from_u16(response.status).map_err(|e| {
561 ServerError::InvalidResponse(anyhow::anyhow!("Invalid status code: {}", e))
562 })? {
563 StatusCode::OK => Ok(response.body),
564 status => Err(ServerError::ServerError(anyhow::anyhow!(
565 "Server returned status code: {}",
566 status
567 ))),
568 }
569 }
570}