fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29    node_ids: BTreeMap<PeerId, NodeId>,
30    endpoint_stable: Endpoint,
31    endpoint_next: iroh_next::Endpoint,
32
33    /// List of overrides to use when attempting to connect to given
34    /// `NodeId`
35    ///
36    /// This is useful for testing, or forcing non-default network
37    /// connectivity.
38    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42    pub async fn new(
43        peers: BTreeMap<PeerId, SafeUrl>,
44        iroh_dns: Option<SafeUrl>,
45    ) -> anyhow::Result<Self> {
46        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48        let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51            s = s.with_connection_override(k, v.into());
52        }
53
54        Ok(s)
55    }
56
57    pub async fn new_no_overrides(
58        peers: BTreeMap<PeerId, SafeUrl>,
59        iroh_dns: Option<SafeUrl>,
60    ) -> anyhow::Result<Self> {
61        let iroh_dns = iroh_dns.map_or(
62            Url::parse(FM_DNS_PKARR_RELAY_PROD).expect("Hardcoded, can't fail"),
63            SafeUrl::to_unsafe,
64        );
65        let node_ids = peers
66            .into_iter()
67            .map(|(peer, url)| {
68                let host = url.host_str().context("Url is missing host")?;
69
70                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
71
72                Ok((peer, node_id))
73            })
74            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
75
76        let endpoint_stable = {
77            let builder = Endpoint::builder()
78                .add_discovery({
79                    let iroh_dns = iroh_dns.clone();
80                    move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
81                })
82                .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
83            #[cfg(not(target_family = "wasm"))]
84            let builder = builder.discovery_dht();
85            let endpoint = builder.bind().await?;
86            debug!(
87                target: LOG_NET_IROH,
88                node_id = %endpoint.node_id(),
89                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
90                "Iroh api client endpoint (stable)"
91            );
92            endpoint
93        };
94        let endpoint_next = {
95            let builder = iroh_next::Endpoint::builder().discovery_n0();
96            #[cfg(not(target_family = "wasm"))]
97            let builder = builder.discovery_dht();
98            let endpoint = builder.bind().await?;
99            debug!(
100                target: LOG_NET_IROH,
101                node_id = %endpoint.node_id(),
102                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
103                "Iroh api client endpoint (next)"
104            );
105            endpoint
106        };
107
108        Ok(Self {
109            node_ids,
110            endpoint_stable,
111            endpoint_next,
112            connection_overrides: BTreeMap::new(),
113        })
114    }
115
116    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
117        self.connection_overrides.insert(node, addr);
118        self
119    }
120}
121
122#[async_trait]
123impl IClientConnector for IrohConnector {
124    fn peers(&self) -> BTreeSet<PeerId> {
125        self.node_ids.keys().copied().collect()
126    }
127
128    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
129        let node_id = *self
130            .node_ids
131            .get(&peer_id)
132            .ok_or(PeerError::InvalidPeerId { peer_id })?;
133
134        let mut futures = FuturesUnordered::<
135            Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
136        >::new();
137        let connection_override = self.connection_overrides.get(&node_id).cloned();
138        let endpoint_stable = self.endpoint_stable.clone();
139        let endpoint_next = self.endpoint_next.clone();
140
141        futures.push(Box::pin({
142            let connection_override = connection_override.clone();
143            async move {
144                match connection_override {
145                    Some(node_addr) => {
146                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
147                        endpoint_stable
148                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
149                            .await
150                    }
151                    None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
152                }.map_err(PeerError::Connection)
153                .map(super::IClientConnection::into_dyn)
154            }
155        }));
156
157        futures.push(Box::pin(async move {
158            match connection_override {
159                Some(node_addr) => {
160                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
161                    endpoint_next
162                        .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
163                        .await
164                }
165                None => endpoint_next.connect(
166                        iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
167                        FEDIMINT_API_ALPN
168                    ).await,
169                }
170                .map_err(Into::into)
171                .map_err(PeerError::Connection)
172                .map(super::IClientConnection::into_dyn)
173        }));
174
175        // Remember last error, so we have something to return if
176        // neither connection works.
177        let mut prev_err = None;
178
179        // Loop until first success, or running out of connections.
180        while let Some(result) = futures.next().await {
181            match result {
182                Ok(connection) => return Ok(connection),
183                Err(err) => {
184                    warn!(
185                        target: LOG_NET_IROH,
186                        err = %err.fmt_compact(),
187                        "Join error in iroh connection task"
188                    );
189                    prev_err = Some(err);
190                }
191            }
192        }
193
194        Err(prev_err.unwrap_or_else(|| {
195            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
196        }))
197    }
198}
199
200fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
201    iroh_next::NodeAddr {
202        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
203        relay_url: stable
204            .relay_url
205            .as_ref()
206            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
207        direct_addresses: stable.direct_addresses.clone(),
208    }
209}
210#[async_trait]
211impl IClientConnection for Connection {
212    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
213        let json = serde_json::to_vec(&IrohApiRequest { method, request })
214            .expect("Serialization to vec can't fail");
215
216        let (mut sink, mut stream) = self
217            .open_bi()
218            .await
219            .map_err(|e| PeerError::Transport(e.into()))?;
220
221        sink.write_all(&json)
222            .await
223            .map_err(|e| PeerError::Transport(e.into()))?;
224
225        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
226
227        let response = stream
228            .read_to_end(1_000_000)
229            .await
230            .map_err(|e| PeerError::Transport(e.into()))?;
231
232        // TODO: We should not be serializing Results on the wire
233        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
234            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
235
236        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
237    }
238
239    async fn await_disconnection(&self) {
240        self.closed().await;
241    }
242}
243
244#[async_trait]
245impl IClientConnection for iroh_next::endpoint::Connection {
246    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
247        let json = serde_json::to_vec(&IrohApiRequest { method, request })
248            .expect("Serialization to vec can't fail");
249
250        let (mut sink, mut stream) = self
251            .open_bi()
252            .await
253            .map_err(|e| PeerError::Transport(e.into()))?;
254
255        sink.write_all(&json)
256            .await
257            .map_err(|e| PeerError::Transport(e.into()))?;
258
259        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
260
261        let response = stream
262            .read_to_end(1_000_000)
263            .await
264            .map_err(|e| PeerError::Transport(e.into()))?;
265
266        // TODO: We should not be serializing Results on the wire
267        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
268            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
269
270        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
271    }
272
273    async fn await_disconnection(&self) {
274        self.closed().await;
275    }
276}