1use std::collections::BTreeMap;
2use std::fmt;
3use std::pin::Pin;
4use std::str::FromStr;
5
6use anyhow::{Context, bail};
7use async_trait::async_trait;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
10use fedimint_core::module::{
11 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::task::spawn;
14use fedimint_core::util::{FmtCompact as _, SafeUrl};
15use fedimint_logging::LOG_NET_IROH;
16use futures::Future;
17use futures::stream::{FuturesUnordered, StreamExt};
18use iroh::discovery::pkarr::PkarrResolver;
19use iroh::endpoint::Connection;
20use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
21use iroh_base::ticket::NodeTicket;
22use iroh_next::Watcher as _;
23use serde_json::Value;
24use tracing::{debug, trace, warn};
25use url::Url;
26
27use super::{DynGuaridianConnection, IGuardianConnection, PeerError, PeerResult};
28
29#[derive(Clone)]
30pub(crate) struct IrohConnector {
31 stable: iroh::endpoint::Endpoint,
32 next: Option<iroh_next::endpoint::Endpoint>,
33
34 connection_overrides: BTreeMap<NodeId, NodeAddr>,
40}
41
42impl fmt::Debug for IrohConnector {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("IrohEndpoint")
45 .field("stable-id", &self.stable.node_id())
46 .field(
47 "next-id",
48 &self.next.as_ref().map(iroh_next::Endpoint::node_id),
49 )
50 .finish_non_exhaustive()
51 }
52}
53
54impl IrohConnector {
55 pub async fn new(
56 iroh_dns: Option<SafeUrl>,
57 iroh_enable_dht: bool,
58 iroh_enable_next: bool,
59 ) -> anyhow::Result<Self> {
60 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
61 let mut s = Self::new_no_overrides(iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
62
63 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
64 s = s.with_connection_override(k, v.into());
65 }
66
67 Ok(s)
68 }
69
70 pub async fn new_no_overrides(
71 iroh_dns: Option<SafeUrl>,
72 iroh_enable_dht: bool,
73 iroh_enable_next: bool,
74 ) -> anyhow::Result<Self> {
75 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
76 || {
77 FM_IROH_DNS_FEDIMINT_PROD
78 .into_iter()
79 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
80 .collect()
81 },
82 |url| vec![url.to_unsafe()],
83 );
84
85 let endpoint_stable = Box::pin({
86 let iroh_dns_servers = iroh_dns_servers.clone();
87 async {
88 let mut builder = Endpoint::builder();
89
90 for iroh_dns in iroh_dns_servers {
91 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
92 }
93
94 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
96
97 #[cfg(not(target_family = "wasm"))]
98 if iroh_enable_dht {
99 builder = builder.discovery_dht();
100 }
101
102 {
104 #[cfg(target_family = "wasm")]
105 {
106 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
107 }
108
109 #[cfg(not(target_family = "wasm"))]
110 {
111 builder = builder.add_discovery(move |_| {
112 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
113 });
114 }
115 }
116
117 let endpoint = builder.bind().await?;
118 debug!(
119 target: LOG_NET_IROH,
120 node_id = %endpoint.node_id(),
121 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
122 "Iroh api client endpoint (stable)"
123 );
124 Ok::<_, anyhow::Error>(endpoint)
125 }
126 });
127 let endpoint_next = Box::pin(async {
128 let mut builder = iroh_next::Endpoint::builder();
129
130 for iroh_dns in iroh_dns_servers {
131 builder = builder.add_discovery(
132 iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
133 );
134 }
135
136 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
138
139 #[cfg(not(target_family = "wasm"))]
140 if iroh_enable_dht {
141 builder = builder.discovery_dht();
142 }
143
144 {
146 #[cfg(target_family = "wasm")]
148 {
149 builder =
150 builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
151 }
152 #[cfg(not(target_family = "wasm"))]
154 {
155 builder =
156 builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
157 }
158 }
159
160 let endpoint = builder.bind().await?;
161 debug!(
162 target: LOG_NET_IROH,
163 node_id = %endpoint.node_id(),
164 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
165 "Iroh api client endpoint (next)"
166 );
167 Ok(endpoint)
168 });
169
170 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
171 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
172 (s, Some(n))
173 } else {
174 (endpoint_stable.await?, None)
175 };
176
177 Ok(Self {
178 stable: endpoint_stable,
179 next: endpoint_next,
180 connection_overrides: BTreeMap::new(),
181 })
182 }
183
184 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
185 self.connection_overrides.insert(node, addr);
186 self
187 }
188
189 pub fn node_id_from_url(url: &SafeUrl) -> anyhow::Result<NodeId> {
190 if url.scheme() != "iroh" {
191 bail!(
192 "Unsupported scheme: {}, passed to iroh endpoint handler",
193 url.scheme()
194 );
195 }
196 let host = url.host_str().context("Missing host string in Iroh URL")?;
197
198 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
199
200 Ok(node_id)
201 }
202}
203
204#[async_trait::async_trait]
205impl crate::api::Connector for IrohConnector {
206 async fn connect_guardian(
207 &self,
208 url: &SafeUrl,
209 api_secret: Option<&str>,
210 ) -> PeerResult<DynGuaridianConnection> {
211 if api_secret.is_some() {
212 PeerError::Connection(anyhow::format_err!(
215 "Iroh api secrets currently not supported"
216 ));
217 }
218 let node_id = Self::node_id_from_url(url).map_err(|source| PeerError::InvalidPeerUrl {
219 source,
220 url: url.to_owned(),
221 })?;
222 let mut futures = FuturesUnordered::<
223 Pin<
224 Box<dyn Future<Output = (PeerResult<DynGuaridianConnection>, &'static str)> + Send>,
225 >,
226 >::new();
227 let connection_override = self.connection_overrides.get(&node_id).cloned();
228
229 let self_clone = self.clone();
230 futures.push(Box::pin({
231 let connection_override = connection_override.clone();
232 async move {
233 (
234 self_clone
235 .make_new_connection_stable(node_id, connection_override)
236 .await
237 .map(super::IGuardianConnection::into_dyn),
238 "stable",
239 )
240 }
241 }));
242
243 if let Some(endpoint_next) = &self.next {
244 let self_clone = self.clone();
245 let endpoint_next = endpoint_next.clone();
246 futures.push(Box::pin(async move {
247 (
248 self_clone
249 .make_new_connection_next(&endpoint_next, node_id, connection_override)
250 .await
251 .map(super::IGuardianConnection::into_dyn),
252 "next",
253 )
254 }));
255 }
256
257 let mut prev_err = None;
260
261 while let Some((result, iroh_stack)) = futures.next().await {
263 match result {
264 Ok(connection) => return Ok(connection),
265 Err(err) => {
266 warn!(
267 target: LOG_NET_IROH,
268 err = %err.fmt_compact(),
269 %iroh_stack,
270 "Join error in iroh connection task"
271 );
272 prev_err = Some(err);
273 }
274 }
275 }
276
277 Err(prev_err.unwrap_or_else(|| {
278 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
279 }))
280 }
281}
282
283impl IrohConnector {
284 #[cfg(not(target_family = "wasm"))]
285 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
286 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
287 #[allow(clippy::let_underscore_future)]
288 let _ = spawn("iroh connection (stable)", async move {
289 if let Ok(conn_type) = conn_type_watcher.get() {
290 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
291 }
292 while let Ok(event) = conn_type_watcher.updated().await {
293 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
294 }
295 });
296 }
297 }
298
299 #[cfg(not(target_family = "wasm"))]
300 fn spawn_connection_monitoring_next(
301 endpoint: &iroh_next::Endpoint,
302 node_addr: &iroh_next::NodeAddr,
303 ) {
304 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
305 let node_id = node_addr.node_id;
306 #[allow(clippy::let_underscore_future)]
307 let _ = spawn("iroh connection (next)", async move {
308 if let Ok(conn_type) = conn_type_watcher.get() {
309 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
310 }
311 while let Ok(event) = conn_type_watcher.updated().await {
312 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
313 }
314 });
315 }
316 }
317 async fn make_new_connection_stable(
318 &self,
319 node_id: NodeId,
320 node_addr: Option<NodeAddr>,
321 ) -> PeerResult<Connection> {
322 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
323 let conn = match node_addr.clone() {
324 Some(node_addr) => {
325 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
326 let conn = self.stable
327 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
328 .await;
329
330 #[cfg(not(target_family = "wasm"))]
331 if conn.is_ok() {
332 Self::spawn_connection_monitoring_stable(&self.stable, node_id);
333 }
334 conn
335 }
336 None => self.stable.connect(node_id, FEDIMINT_API_ALPN).await,
337 }.map_err(PeerError::Connection)?;
338
339 Ok(conn)
340 }
341
342 async fn make_new_connection_next(
343 &self,
344 endpoint_next: &iroh_next::Endpoint,
345 node_id: NodeId,
346 node_addr: Option<NodeAddr>,
347 ) -> PeerResult<iroh_next::endpoint::Connection> {
348 let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
349
350 let endpoint_next = endpoint_next.clone();
351
352 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
353 let conn = match node_addr.clone() {
354 Some(node_addr) => {
355 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
356 let node_addr = node_addr_stable_to_next(&node_addr);
357 let conn = endpoint_next
358 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
359 .await;
360
361 #[cfg(not(target_family = "wasm"))]
362 if conn.is_ok() {
363 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
364 }
365
366 conn
367 }
368 None => endpoint_next.connect(
369 next_node_id,
370 FEDIMINT_API_ALPN
371 ).await,
372 }
373 .map_err(Into::into)
374 .map_err(PeerError::Connection)?;
375
376 Ok(conn)
377 }
378}
379
380fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
381 iroh_next::NodeAddr {
382 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
383 relay_url: stable
384 .relay_url
385 .as_ref()
386 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
387 direct_addresses: stable.direct_addresses.clone(),
388 }
389}
390#[async_trait]
391impl IGuardianConnection for Connection {
392 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
393 let json = serde_json::to_vec(&IrohApiRequest { method, request })
394 .expect("Serialization to vec can't fail");
395
396 let (mut sink, mut stream) = self
397 .open_bi()
398 .await
399 .map_err(|e| PeerError::Transport(e.into()))?;
400
401 sink.write_all(&json)
402 .await
403 .map_err(|e| PeerError::Transport(e.into()))?;
404
405 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
406
407 let response = stream
408 .read_to_end(1_000_000)
409 .await
410 .map_err(|e| PeerError::Transport(e.into()))?;
411
412 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
414 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
415
416 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
417 }
418
419 async fn await_disconnection(&self) {
420 self.closed().await;
421 }
422
423 fn is_connected(&self) -> bool {
424 self.close_reason().is_none()
425 }
426}
427
428#[async_trait]
429impl IGuardianConnection for iroh_next::endpoint::Connection {
430 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
431 let json = serde_json::to_vec(&IrohApiRequest { method, request })
432 .expect("Serialization to vec can't fail");
433
434 let (mut sink, mut stream) = self
435 .open_bi()
436 .await
437 .map_err(|e| PeerError::Transport(e.into()))?;
438
439 sink.write_all(&json)
440 .await
441 .map_err(|e| PeerError::Transport(e.into()))?;
442
443 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
444
445 let response = stream
446 .read_to_end(1_000_000)
447 .await
448 .map_err(|e| PeerError::Transport(e.into()))?;
449
450 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
452 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
453
454 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
455 }
456
457 async fn await_disconnection(&self) {
458 self.closed().await;
459 }
460
461 fn is_connected(&self) -> bool {
462 self.close_reason().is_none()
463 }
464}