1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29 node_ids: BTreeMap<PeerId, NodeId>,
30 endpoint_stable: Endpoint,
31 endpoint_next: iroh_next::Endpoint,
32
33 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42 pub async fn new(
43 peers: BTreeMap<PeerId, SafeUrl>,
44 iroh_dns: Option<SafeUrl>,
45 ) -> anyhow::Result<Self> {
46 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48 let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51 s = s.with_connection_override(k, v.into());
52 }
53
54 Ok(s)
55 }
56
57 pub async fn new_no_overrides(
58 peers: BTreeMap<PeerId, SafeUrl>,
59 iroh_dns: Option<SafeUrl>,
60 ) -> anyhow::Result<Self> {
61 let iroh_dns = iroh_dns.map_or(
62 Url::parse(FM_DNS_PKARR_RELAY_PROD).expect("Hardcoded, can't fail"),
63 SafeUrl::to_unsafe,
64 );
65 let node_ids = peers
66 .into_iter()
67 .map(|(peer, url)| {
68 let host = url.host_str().context("Url is missing host")?;
69
70 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
71
72 Ok((peer, node_id))
73 })
74 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
75
76 let endpoint_stable = {
77 let builder = Endpoint::builder()
78 .add_discovery({
79 let iroh_dns = iroh_dns.clone();
80 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
81 })
82 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
83 #[cfg(not(target_family = "wasm"))]
84 let builder = builder.discovery_dht();
85 let endpoint = builder.bind().await?;
86 debug!(
87 target: LOG_NET_IROH,
88 node_id = %endpoint.node_id(),
89 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
90 "Iroh api client endpoint (stable)"
91 );
92 endpoint
93 };
94 let endpoint_next = {
95 let builder = iroh_next::Endpoint::builder().discovery_n0();
96 #[cfg(not(target_family = "wasm"))]
97 let builder = builder.discovery_dht();
98 let endpoint = builder.bind().await?;
99 debug!(
100 target: LOG_NET_IROH,
101 node_id = %endpoint.node_id(),
102 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
103 "Iroh api client endpoint (next)"
104 );
105 endpoint
106 };
107
108 Ok(Self {
109 node_ids,
110 endpoint_stable,
111 endpoint_next,
112 connection_overrides: BTreeMap::new(),
113 })
114 }
115
116 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
117 self.connection_overrides.insert(node, addr);
118 self
119 }
120}
121
122#[async_trait]
123impl IClientConnector for IrohConnector {
124 fn peers(&self) -> BTreeSet<PeerId> {
125 self.node_ids.keys().copied().collect()
126 }
127
128 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
129 let node_id = *self
130 .node_ids
131 .get(&peer_id)
132 .ok_or(PeerError::InvalidPeerId { peer_id })?;
133
134 let mut futures = FuturesUnordered::<
135 Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
136 >::new();
137 let connection_override = self.connection_overrides.get(&node_id).cloned();
138 let endpoint_stable = self.endpoint_stable.clone();
139 let endpoint_next = self.endpoint_next.clone();
140
141 futures.push(Box::pin({
142 let connection_override = connection_override.clone();
143 async move {
144 match connection_override {
145 Some(node_addr) => {
146 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
147 endpoint_stable
148 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
149 .await
150 }
151 None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
152 }.map_err(PeerError::Connection)
153 .map(super::IClientConnection::into_dyn)
154 }
155 }));
156
157 futures.push(Box::pin(async move {
158 match connection_override {
159 Some(node_addr) => {
160 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
161 endpoint_next
162 .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
163 .await
164 }
165 None => endpoint_next.connect(
166 iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
167 FEDIMINT_API_ALPN
168 ).await,
169 }
170 .map_err(Into::into)
171 .map_err(PeerError::Connection)
172 .map(super::IClientConnection::into_dyn)
173 }));
174
175 let mut prev_err = None;
178
179 while let Some(result) = futures.next().await {
181 match result {
182 Ok(connection) => return Ok(connection),
183 Err(err) => {
184 warn!(
185 target: LOG_NET_IROH,
186 err = %err.fmt_compact(),
187 "Join error in iroh connection task"
188 );
189 prev_err = Some(err);
190 }
191 }
192 }
193
194 Err(prev_err.unwrap_or_else(|| {
195 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
196 }))
197 }
198}
199
200fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
201 iroh_next::NodeAddr {
202 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
203 relay_url: stable
204 .relay_url
205 .as_ref()
206 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
207 direct_addresses: stable.direct_addresses.clone(),
208 }
209}
210#[async_trait]
211impl IClientConnection for Connection {
212 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
213 let json = serde_json::to_vec(&IrohApiRequest { method, request })
214 .expect("Serialization to vec can't fail");
215
216 let (mut sink, mut stream) = self
217 .open_bi()
218 .await
219 .map_err(|e| PeerError::Transport(e.into()))?;
220
221 sink.write_all(&json)
222 .await
223 .map_err(|e| PeerError::Transport(e.into()))?;
224
225 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
226
227 let response = stream
228 .read_to_end(1_000_000)
229 .await
230 .map_err(|e| PeerError::Transport(e.into()))?;
231
232 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
234 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
235
236 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
237 }
238
239 async fn await_disconnection(&self) {
240 self.closed().await;
241 }
242}
243
244#[async_trait]
245impl IClientConnection for iroh_next::endpoint::Connection {
246 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
247 let json = serde_json::to_vec(&IrohApiRequest { method, request })
248 .expect("Serialization to vec can't fail");
249
250 let (mut sink, mut stream) = self
251 .open_bi()
252 .await
253 .map_err(|e| PeerError::Transport(e.into()))?;
254
255 sink.write_all(&json)
256 .await
257 .map_err(|e| PeerError::Transport(e.into()))?;
258
259 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
260
261 let response = stream
262 .read_to_end(1_000_000)
263 .await
264 .map_err(|e| PeerError::Transport(e.into()))?;
265
266 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
268 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
269
270 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
271 }
272
273 async fn await_disconnection(&self) {
274 self.closed().await;
275 }
276}