fedimint_server/net/p2p_connector/
iroh.rs1use std::collections::BTreeMap;
2use std::net::SocketAddr;
3
4use anyhow::{Context as _, ensure};
5use async_trait::async_trait;
6use fedimint_core::PeerId;
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::envs::{FM_IROH_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
9use fedimint_core::net::STANDARD_FEDIMINT_P2P_PORT;
10use fedimint_core::net::iroh::build_iroh_endpoint;
11use fedimint_core::util::SafeUrl;
12use fedimint_logging::LOG_NET_IROH;
13use fedimint_server_core::dashboard_ui::ConnectionType;
14use iroh::{Endpoint, NodeAddr, NodeId, SecretKey};
15use iroh_base::ticket::NodeTicket;
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use tracing::trace;
19
20use super::IP2PConnector;
21use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection as _};
22
23pub fn parse_p2p(url: &SafeUrl) -> anyhow::Result<String> {
25 ensure!(url.scheme() == "fedimint", "p2p url has invalid scheme");
26
27 let host = url.host_str().context("p2p url is missing host")?;
28
29 let port = url.port().unwrap_or(STANDARD_FEDIMINT_P2P_PORT);
30
31 Ok(format!("{host}:{port}"))
32}
33
34#[derive(Debug, Clone)]
35pub struct IrohConnector {
36 pub(crate) node_ids: BTreeMap<PeerId, NodeId>,
38 pub(crate) endpoint: Endpoint,
40 pub(crate) connection_overrides: BTreeMap<NodeId, NodeAddr>,
44}
45
46pub(crate) const FEDIMINT_P2P_ALPN: &[u8] = b"FEDIMINT_P2P_ALPN";
47
48impl IrohConnector {
49 pub async fn new(
50 secret_key: SecretKey,
51 p2p_bind_addr: SocketAddr,
52 iroh_dns: Option<SafeUrl>,
53 iroh_relays: Vec<SafeUrl>,
54 node_ids: BTreeMap<PeerId, NodeId>,
55 ) -> anyhow::Result<Self> {
56 let mut s =
57 Self::new_no_overrides(secret_key, p2p_bind_addr, iroh_dns, iroh_relays, node_ids)
58 .await?;
59
60 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
61 s = s.with_connection_override(k, v.into());
62 }
63
64 Ok(s)
65 }
66
67 pub async fn new_no_overrides(
68 secret_key: SecretKey,
69 bind_addr: SocketAddr,
70 iroh_dns: Option<SafeUrl>,
71 iroh_relays: Vec<SafeUrl>,
72 node_ids: BTreeMap<PeerId, NodeId>,
73 ) -> anyhow::Result<Self> {
74 let identity = *node_ids
75 .iter()
76 .find(|entry| entry.1 == &secret_key.public())
77 .expect("Our public key is not part of the keyset")
78 .0;
79
80 let endpoint = build_iroh_endpoint(
81 secret_key,
82 bind_addr,
83 iroh_dns,
84 iroh_relays,
85 FEDIMINT_P2P_ALPN,
86 )
87 .await?;
88
89 Ok(Self {
90 node_ids: node_ids
91 .into_iter()
92 .filter(|entry| entry.0 != identity)
93 .collect(),
94 endpoint,
95 connection_overrides: BTreeMap::default(),
96 })
97 }
98
99 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
100 self.connection_overrides.insert(node, addr);
101 self
102 }
103}
104
105#[async_trait]
106impl<M> IP2PConnector<M> for IrohConnector
107where
108 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
109{
110 fn peers(&self) -> Vec<PeerId> {
111 self.node_ids.keys().copied().collect()
112 }
113
114 async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
115 let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
116
117 let connection = match self.connection_overrides.get(&node_id) {
118 Some(node_addr) => {
119 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
120 self.endpoint
121 .connect(node_addr.clone(), FEDIMINT_P2P_ALPN)
122 .await?
123 }
124 None => self.endpoint.connect(node_id, FEDIMINT_P2P_ALPN).await?,
125 };
126
127 Ok(connection.into_dyn())
128 }
129
130 async fn accept(&self) -> anyhow::Result<(PeerId, DynP2PConnection<M>)> {
131 let connection = self
132 .endpoint
133 .accept()
134 .await
135 .context("Listener closed unexpectedly")?
136 .accept()?
137 .await?;
138
139 let node_id = connection.remote_node_id()?;
140
141 let auth_peer = self
142 .node_ids
143 .iter()
144 .find(|entry| entry.1 == &node_id)
145 .with_context(|| format!("Node id {node_id} is unknown"))?
146 .0;
147
148 Ok((*auth_peer, connection.into_dyn()))
149 }
150
151 async fn connection_type(&self, peer: PeerId) -> ConnectionType {
152 let node_id = *self.node_ids.get(&peer).expect("No node id found for peer");
153
154 let conn_type_watcher = if let Ok(watcher) = self.endpoint.conn_type(node_id) {
156 watcher
157 } else {
158 return ConnectionType::Unknown;
160 };
161
162 let conn_type = if let Ok(conn_type) = conn_type_watcher.get() {
163 conn_type
164 } else {
165 return ConnectionType::Unknown;
167 };
168
169 match conn_type {
170 iroh::endpoint::ConnectionType::Relay(_) => ConnectionType::Relay,
171 iroh::endpoint::ConnectionType::Direct(_)
172 | iroh::endpoint::ConnectionType::Mixed(_, _) => ConnectionType::Direct, iroh::endpoint::ConnectionType::None => ConnectionType::Unknown,
174 }
175 }
176}