1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::task::spawn;
14use fedimint_core::util::{FmtCompact as _, SafeUrl};
15use fedimint_logging::LOG_NET_IROH;
16use futures::Future;
17use futures::stream::{FuturesUnordered, StreamExt};
18use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
19use iroh::endpoint::Connection;
20use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
21use iroh_base::ticket::NodeTicket;
22use iroh_next::Watcher as _;
23use serde_json::Value;
24use tracing::{debug, trace, warn};
25use url::Url;
26
27use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
28
29#[derive(Debug, Clone)]
30pub struct IrohConnector {
31 node_ids: BTreeMap<PeerId, NodeId>,
32 endpoint_stable: Endpoint,
33 endpoint_next: iroh_next::Endpoint,
34
35 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
41}
42
43impl IrohConnector {
44 #[cfg(not(target_family = "wasm"))]
45 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
46 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
47 #[allow(clippy::let_underscore_future)]
48 let _ = spawn("iroh connection (stable)", async move {
49 if let Ok(conn_type) = conn_type_watcher.get() {
50 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
51 }
52 while let Ok(event) = conn_type_watcher.updated().await {
53 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
54 }
55 });
56 }
57 }
58
59 #[cfg(not(target_family = "wasm"))]
60 fn spawn_connection_monitoring_next(
61 endpoint: &iroh_next::Endpoint,
62 node_addr: &iroh_next::NodeAddr,
63 ) {
64 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
65 let node_id = node_addr.node_id;
66 #[allow(clippy::let_underscore_future)]
67 let _ = spawn("iroh connection (next)", async move {
68 if let Ok(conn_type) = conn_type_watcher.get() {
69 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
70 }
71 while let Ok(event) = conn_type_watcher.updated().await {
72 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
73 }
74 });
75 }
76 }
77
78 pub async fn new(
79 peers: BTreeMap<PeerId, SafeUrl>,
80 iroh_dns: Option<SafeUrl>,
81 ) -> anyhow::Result<Self> {
82 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
83 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
84 let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
85
86 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
87 s = s.with_connection_override(k, v.into());
88 }
89
90 Ok(s)
91 }
92
93 pub async fn new_no_overrides(
94 peers: BTreeMap<PeerId, SafeUrl>,
95 iroh_dns: Option<SafeUrl>,
96 ) -> anyhow::Result<Self> {
97 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
98 || {
99 FM_DNS_PKARR_RELAY_PROD
100 .into_iter()
101 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
102 .collect()
103 },
104 |url| vec![url.to_unsafe()],
105 );
106 let node_ids = peers
107 .into_iter()
108 .map(|(peer, url)| {
109 let host = url.host_str().context("Url is missing host")?;
110
111 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
112
113 Ok((peer, node_id))
114 })
115 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
116
117 let endpoint_stable = {
118 let mut builder = Endpoint::builder();
119
120 for iroh_dns in iroh_dns_servers {
121 builder = builder
122 .add_discovery({
123 let iroh_dns = iroh_dns.clone();
124 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
125 })
126 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
127 }
128
129 #[cfg(not(target_family = "wasm"))]
130 let builder = builder.discovery_dht().discovery_n0();
131
132 let endpoint = builder.discovery_n0().bind().await?;
133 debug!(
134 target: LOG_NET_IROH,
135 node_id = %endpoint.node_id(),
136 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
137 "Iroh api client endpoint (stable)"
138 );
139 endpoint
140 };
141 let endpoint_next = {
142 let builder = iroh_next::Endpoint::builder().discovery_n0();
143 #[cfg(not(target_family = "wasm"))]
144 let builder = builder.discovery_dht();
145 let endpoint = builder.bind().await?;
146 debug!(
147 target: LOG_NET_IROH,
148 node_id = %endpoint.node_id(),
149 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
150 "Iroh api client endpoint (next)"
151 );
152 endpoint
153 };
154
155 Ok(Self {
156 node_ids,
157 endpoint_stable,
158 endpoint_next,
159 connection_overrides: BTreeMap::new(),
160 })
161 }
162
163 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
164 self.connection_overrides.insert(node, addr);
165 self
166 }
167}
168
169#[async_trait]
170impl IClientConnector for IrohConnector {
171 fn peers(&self) -> BTreeSet<PeerId> {
172 self.node_ids.keys().copied().collect()
173 }
174
175 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
176 let node_id = *self
177 .node_ids
178 .get(&peer_id)
179 .ok_or(PeerError::InvalidPeerId { peer_id })?;
180
181 let mut futures = FuturesUnordered::<
182 Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
183 >::new();
184 let connection_override = self.connection_overrides.get(&node_id).cloned();
185 let endpoint_stable = self.endpoint_stable.clone();
186 let endpoint_next = self.endpoint_next.clone();
187
188 futures.push(Box::pin({
189 let connection_override = connection_override.clone();
190 async move {
191 match connection_override {
192 Some(node_addr) => {
193 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
194 let conn = endpoint_stable
195 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
196 .await;
197
198 #[cfg(not(target_family = "wasm"))]
199 if conn.is_ok() {
200 Self::spawn_connection_monitoring_stable(&endpoint_stable, node_id);
201 }
202
203 conn
204 }
205 None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
206 }.map_err(PeerError::Connection)
207 .map(super::IClientConnection::into_dyn)
208 }
209 }));
210
211 futures.push(Box::pin(async move {
212 match connection_override {
213 Some(node_addr) => {
214 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
215 let node_addr = node_addr_stable_to_next(&node_addr);
216 let conn = endpoint_next
217 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
218 .await;
219
220 #[cfg(not(target_family = "wasm"))]
221 if conn.is_ok() {
222 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
223 }
224
225 conn
226 }
227 None => endpoint_next.connect(
228 iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
229 FEDIMINT_API_ALPN
230 ).await,
231 }
232 .map_err(Into::into)
233 .map_err(PeerError::Connection)
234 .map(super::IClientConnection::into_dyn)
235 }));
236
237 let mut prev_err = None;
240
241 while let Some(result) = futures.next().await {
243 match result {
244 Ok(connection) => return Ok(connection),
245 Err(err) => {
246 warn!(
247 target: LOG_NET_IROH,
248 err = %err.fmt_compact(),
249 "Join error in iroh connection task"
250 );
251 prev_err = Some(err);
252 }
253 }
254 }
255
256 Err(prev_err.unwrap_or_else(|| {
257 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
258 }))
259 }
260}
261
262fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
263 iroh_next::NodeAddr {
264 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
265 relay_url: stable
266 .relay_url
267 .as_ref()
268 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
269 direct_addresses: stable.direct_addresses.clone(),
270 }
271}
272#[async_trait]
273impl IClientConnection for Connection {
274 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
275 let json = serde_json::to_vec(&IrohApiRequest { method, request })
276 .expect("Serialization to vec can't fail");
277
278 let (mut sink, mut stream) = self
279 .open_bi()
280 .await
281 .map_err(|e| PeerError::Transport(e.into()))?;
282
283 sink.write_all(&json)
284 .await
285 .map_err(|e| PeerError::Transport(e.into()))?;
286
287 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
288
289 let response = stream
290 .read_to_end(1_000_000)
291 .await
292 .map_err(|e| PeerError::Transport(e.into()))?;
293
294 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
296 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
297
298 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
299 }
300
301 async fn await_disconnection(&self) {
302 self.closed().await;
303 }
304}
305
306#[async_trait]
307impl IClientConnection for iroh_next::endpoint::Connection {
308 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
309 let json = serde_json::to_vec(&IrohApiRequest { method, request })
310 .expect("Serialization to vec can't fail");
311
312 let (mut sink, mut stream) = self
313 .open_bi()
314 .await
315 .map_err(|e| PeerError::Transport(e.into()))?;
316
317 sink.write_all(&json)
318 .await
319 .map_err(|e| PeerError::Transport(e.into()))?;
320
321 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
322
323 let response = stream
324 .read_to_end(1_000_000)
325 .await
326 .map_err(|e| PeerError::Transport(e.into()))?;
327
328 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
330 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
331
332 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
333 }
334
335 async fn await_disconnection(&self) {
336 self.closed().await;
337 }
338}