fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use fedimint_core::PeerId;
7use fedimint_core::envs::parse_kv_list_from_env;
8use fedimint_core::module::{
9    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
10};
11use fedimint_core::util::SafeUrl;
12use fedimint_logging::LOG_NET_IROH;
13use iroh::endpoint::Connection;
14use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
15use iroh_base::ticket::NodeTicket;
16use serde_json::Value;
17use tracing::{debug, trace, warn};
18
19use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
20
21#[derive(Debug, Clone)]
22pub struct IrohConnector {
23    node_ids: BTreeMap<PeerId, NodeId>,
24    endpoint: Endpoint,
25
26    /// List of overrides to use when attempting to connect to given
27    /// `NodeId`
28    ///
29    /// This is useful for testing, or forcing non-default network
30    /// connectivity.
31    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
32}
33
34impl IrohConnector {
35    pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
36        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
37        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
38        let mut s = Self::new_no_overrides(peers).await?;
39
40        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
41            s = s.with_connection_override(k, v.into());
42        }
43
44        Ok(s)
45    }
46
47    pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
48        let node_ids = peers
49            .into_iter()
50            .map(|(peer, url)| {
51                let host = url.host_str().context("Url is missing host")?;
52
53                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
54
55                Ok((peer, node_id))
56            })
57            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
58
59        let builder = Endpoint::builder().discovery_n0();
60        #[cfg(not(target_family = "wasm"))]
61        let builder = builder.discovery_dht();
62        let endpoint = builder.bind().await?;
63        debug!(
64            target: LOG_NET_IROH,
65            node_id = %endpoint.node_id(),
66            node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
67            "Iroh api client endpoint"
68        );
69
70        Ok(Self {
71            node_ids,
72            endpoint,
73            connection_overrides: BTreeMap::new(),
74        })
75    }
76
77    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
78        self.connection_overrides.insert(node, addr);
79        self
80    }
81}
82
83#[async_trait]
84impl IClientConnector for IrohConnector {
85    fn peers(&self) -> BTreeSet<PeerId> {
86        self.node_ids.keys().copied().collect()
87    }
88
89    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
90        let node_id = *self
91            .node_ids
92            .get(&peer_id)
93            .ok_or(PeerError::InvalidPeerId { peer_id })?;
94
95        let connection = match self.connection_overrides.get(&node_id) {
96            Some(node_addr) => {
97                trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
98                self.endpoint
99                    .connect(node_addr.clone(), FEDIMINT_API_ALPN)
100                    .await
101            }
102            None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
103        }.map_err(PeerError::Connection)?;
104
105        Ok(connection.into_dyn())
106    }
107}
108
109#[async_trait]
110impl IClientConnection for Connection {
111    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
112        let json = serde_json::to_vec(&IrohApiRequest { method, request })
113            .expect("Serialization to vec can't fail");
114
115        let (mut sink, mut stream) = self
116            .open_bi()
117            .await
118            .map_err(|e| PeerError::Transport(e.into()))?;
119
120        sink.write_all(&json)
121            .await
122            .map_err(|e| PeerError::Transport(e.into()))?;
123
124        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
125
126        let response = stream
127            .read_to_end(1_000_000)
128            .await
129            .map_err(|e| PeerError::Transport(e.into()))?;
130
131        // TODO: We should not be serializing Results on the wire
132        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
133            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
134
135        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
136    }
137
138    async fn await_disconnection(&self) {
139        self.closed().await;
140    }
141}