fedimint_api_client/api/
iroh.rs
1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use fedimint_core::PeerId;
7use fedimint_core::envs::parse_kv_list_from_env;
8use fedimint_core::module::{
9 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
10};
11use fedimint_core::util::SafeUrl;
12use fedimint_logging::LOG_NET_IROH;
13use iroh::endpoint::Connection;
14use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
15use iroh_base::ticket::NodeTicket;
16use serde_json::Value;
17use tracing::{debug, trace, warn};
18
19use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
20
21#[derive(Debug, Clone)]
22pub struct IrohConnector {
23 node_ids: BTreeMap<PeerId, NodeId>,
24 endpoint: Endpoint,
25
26 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
32}
33
34impl IrohConnector {
35 pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
36 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
37 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
38 let mut s = Self::new_no_overrides(peers).await?;
39
40 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
41 s = s.with_connection_override(k, v.into());
42 }
43
44 Ok(s)
45 }
46
47 pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
48 let node_ids = peers
49 .into_iter()
50 .map(|(peer, url)| {
51 let host = url.host_str().context("Url is missing host")?;
52
53 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
54
55 Ok((peer, node_id))
56 })
57 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
58
59 let builder = Endpoint::builder().discovery_n0();
60 #[cfg(not(target_family = "wasm"))]
61 let builder = builder.discovery_dht();
62 let endpoint = builder.bind().await?;
63 debug!(
64 target: LOG_NET_IROH,
65 node_id = %endpoint.node_id(),
66 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
67 "Iroh api client endpoint"
68 );
69
70 Ok(Self {
71 node_ids,
72 endpoint,
73 connection_overrides: BTreeMap::new(),
74 })
75 }
76
77 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
78 self.connection_overrides.insert(node, addr);
79 self
80 }
81}
82
83#[async_trait]
84impl IClientConnector for IrohConnector {
85 fn peers(&self) -> BTreeSet<PeerId> {
86 self.node_ids.keys().copied().collect()
87 }
88
89 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
90 let node_id = *self
91 .node_ids
92 .get(&peer_id)
93 .ok_or(PeerError::InvalidPeerId { peer_id })?;
94
95 let connection = match self.connection_overrides.get(&node_id) {
96 Some(node_addr) => {
97 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
98 self.endpoint
99 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
100 .await
101 }
102 None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
103 }.map_err(PeerError::Connection)?;
104
105 Ok(connection.into_dyn())
106 }
107}
108
109#[async_trait]
110impl IClientConnection for Connection {
111 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
112 let json = serde_json::to_vec(&IrohApiRequest { method, request })
113 .expect("Serialization to vec can't fail");
114
115 let (mut sink, mut stream) = self
116 .open_bi()
117 .await
118 .map_err(|e| PeerError::Transport(e.into()))?;
119
120 sink.write_all(&json)
121 .await
122 .map_err(|e| PeerError::Transport(e.into()))?;
123
124 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
125
126 let response = stream
127 .read_to_end(1_000_000)
128 .await
129 .map_err(|e| PeerError::Transport(e.into()))?;
130
131 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
133 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
134
135 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
136 }
137
138 async fn await_disconnection(&self) {
139 self.closed().await;
140 }
141}