fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::task::spawn;
14use fedimint_core::util::{FmtCompact as _, SafeUrl};
15use fedimint_logging::LOG_NET_IROH;
16use futures::Future;
17use futures::stream::{FuturesUnordered, StreamExt};
18use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
19use iroh::endpoint::Connection;
20use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
21use iroh_base::ticket::NodeTicket;
22use iroh_next::Watcher as _;
23use serde_json::Value;
24use tracing::{debug, trace, warn};
25use url::Url;
26
27use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
28
29#[derive(Debug, Clone)]
30pub struct IrohConnector {
31    node_ids: BTreeMap<PeerId, NodeId>,
32    endpoint_stable: Endpoint,
33    endpoint_next: iroh_next::Endpoint,
34
35    /// List of overrides to use when attempting to connect to given
36    /// `NodeId`
37    ///
38    /// This is useful for testing, or forcing non-default network
39    /// connectivity.
40    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
41}
42
43impl IrohConnector {
44    #[cfg(not(target_family = "wasm"))]
45    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
46        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
47            #[allow(clippy::let_underscore_future)]
48            let _ = spawn("iroh connection (stable)", async move {
49                if let Ok(conn_type) = conn_type_watcher.get() {
50                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
51                }
52                while let Ok(event) = conn_type_watcher.updated().await {
53                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
54                }
55            });
56        }
57    }
58
59    #[cfg(not(target_family = "wasm"))]
60    fn spawn_connection_monitoring_next(
61        endpoint: &iroh_next::Endpoint,
62        node_addr: &iroh_next::NodeAddr,
63    ) {
64        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
65            let node_id = node_addr.node_id;
66            #[allow(clippy::let_underscore_future)]
67            let _ = spawn("iroh connection (next)", async move {
68                if let Ok(conn_type) = conn_type_watcher.get() {
69                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
70                }
71                while let Ok(event) = conn_type_watcher.updated().await {
72                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
73                }
74            });
75        }
76    }
77
78    pub async fn new(
79        peers: BTreeMap<PeerId, SafeUrl>,
80        iroh_dns: Option<SafeUrl>,
81    ) -> anyhow::Result<Self> {
82        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
83        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
84        let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
85
86        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
87            s = s.with_connection_override(k, v.into());
88        }
89
90        Ok(s)
91    }
92
93    pub async fn new_no_overrides(
94        peers: BTreeMap<PeerId, SafeUrl>,
95        iroh_dns: Option<SafeUrl>,
96    ) -> anyhow::Result<Self> {
97        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
98            || {
99                FM_DNS_PKARR_RELAY_PROD
100                    .into_iter()
101                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
102                    .collect()
103            },
104            |url| vec![url.to_unsafe()],
105        );
106        let node_ids = peers
107            .into_iter()
108            .map(|(peer, url)| {
109                let host = url.host_str().context("Url is missing host")?;
110
111                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
112
113                Ok((peer, node_id))
114            })
115            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
116
117        let endpoint_stable = {
118            let mut builder = Endpoint::builder();
119
120            for iroh_dns in iroh_dns_servers {
121                builder = builder
122                    .add_discovery({
123                        let iroh_dns = iroh_dns.clone();
124                        move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
125                    })
126                    .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
127            }
128
129            #[cfg(not(target_family = "wasm"))]
130            let builder = builder.discovery_dht().discovery_n0();
131
132            let endpoint = builder.discovery_n0().bind().await?;
133            debug!(
134                target: LOG_NET_IROH,
135                node_id = %endpoint.node_id(),
136                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
137                "Iroh api client endpoint (stable)"
138            );
139            endpoint
140        };
141        let endpoint_next = {
142            let builder = iroh_next::Endpoint::builder().discovery_n0();
143            #[cfg(not(target_family = "wasm"))]
144            let builder = builder.discovery_dht();
145            let endpoint = builder.bind().await?;
146            debug!(
147                target: LOG_NET_IROH,
148                node_id = %endpoint.node_id(),
149                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
150                "Iroh api client endpoint (next)"
151            );
152            endpoint
153        };
154
155        Ok(Self {
156            node_ids,
157            endpoint_stable,
158            endpoint_next,
159            connection_overrides: BTreeMap::new(),
160        })
161    }
162
163    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
164        self.connection_overrides.insert(node, addr);
165        self
166    }
167}
168
169#[async_trait]
170impl IClientConnector for IrohConnector {
171    fn peers(&self) -> BTreeSet<PeerId> {
172        self.node_ids.keys().copied().collect()
173    }
174
175    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
176        let node_id = *self
177            .node_ids
178            .get(&peer_id)
179            .ok_or(PeerError::InvalidPeerId { peer_id })?;
180
181        let mut futures = FuturesUnordered::<
182            Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
183        >::new();
184        let connection_override = self.connection_overrides.get(&node_id).cloned();
185        let endpoint_stable = self.endpoint_stable.clone();
186        let endpoint_next = self.endpoint_next.clone();
187
188        futures.push(Box::pin({
189            let connection_override = connection_override.clone();
190            async move {
191                match connection_override {
192                    Some(node_addr) => {
193                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
194                        let conn = endpoint_stable
195                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
196                            .await;
197
198                        #[cfg(not(target_family = "wasm"))]
199                        if conn.is_ok() {
200                            Self::spawn_connection_monitoring_stable(&endpoint_stable, node_id);
201                        }
202
203                        conn
204                    }
205                    None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
206                }.map_err(PeerError::Connection)
207                .map(super::IClientConnection::into_dyn)
208            }
209        }));
210
211        futures.push(Box::pin(async move {
212            match connection_override {
213                Some(node_addr) => {
214                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
215                    let node_addr = node_addr_stable_to_next(&node_addr);
216                    let conn = endpoint_next
217                        .connect(node_addr.clone(), FEDIMINT_API_ALPN)
218                        .await;
219
220                    #[cfg(not(target_family = "wasm"))]
221                    if conn.is_ok() {
222                        Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
223                    }
224
225                    conn
226                }
227                None => endpoint_next.connect(
228                        iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
229                        FEDIMINT_API_ALPN
230                    ).await,
231                }
232                .map_err(Into::into)
233                .map_err(PeerError::Connection)
234                .map(super::IClientConnection::into_dyn)
235        }));
236
237        // Remember last error, so we have something to return if
238        // neither connection works.
239        let mut prev_err = None;
240
241        // Loop until first success, or running out of connections.
242        while let Some(result) = futures.next().await {
243            match result {
244                Ok(connection) => return Ok(connection),
245                Err(err) => {
246                    warn!(
247                        target: LOG_NET_IROH,
248                        err = %err.fmt_compact(),
249                        "Join error in iroh connection task"
250                    );
251                    prev_err = Some(err);
252                }
253            }
254        }
255
256        Err(prev_err.unwrap_or_else(|| {
257            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
258        }))
259    }
260}
261
262fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
263    iroh_next::NodeAddr {
264        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
265        relay_url: stable
266            .relay_url
267            .as_ref()
268            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
269        direct_addresses: stable.direct_addresses.clone(),
270    }
271}
272#[async_trait]
273impl IClientConnection for Connection {
274    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
275        let json = serde_json::to_vec(&IrohApiRequest { method, request })
276            .expect("Serialization to vec can't fail");
277
278        let (mut sink, mut stream) = self
279            .open_bi()
280            .await
281            .map_err(|e| PeerError::Transport(e.into()))?;
282
283        sink.write_all(&json)
284            .await
285            .map_err(|e| PeerError::Transport(e.into()))?;
286
287        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
288
289        let response = stream
290            .read_to_end(1_000_000)
291            .await
292            .map_err(|e| PeerError::Transport(e.into()))?;
293
294        // TODO: We should not be serializing Results on the wire
295        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
296            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
297
298        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
299    }
300
301    async fn await_disconnection(&self) {
302        self.closed().await;
303    }
304}
305
306#[async_trait]
307impl IClientConnection for iroh_next::endpoint::Connection {
308    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
309        let json = serde_json::to_vec(&IrohApiRequest { method, request })
310            .expect("Serialization to vec can't fail");
311
312        let (mut sink, mut stream) = self
313            .open_bi()
314            .await
315            .map_err(|e| PeerError::Transport(e.into()))?;
316
317        sink.write_all(&json)
318            .await
319            .map_err(|e| PeerError::Transport(e.into()))?;
320
321        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
322
323        let response = stream
324            .read_to_end(1_000_000)
325            .await
326            .map_err(|e| PeerError::Transport(e.into()))?;
327
328        // TODO: We should not be serializing Results on the wire
329        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
330            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
331
332        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
333    }
334
335    async fn await_disconnection(&self) {
336        self.closed().await;
337    }
338}