1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
19use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
24use fedimint_core::module::audit::AuditSummary;
25use fedimint_core::module::registry::ModuleDecoderRegistry;
26use fedimint_core::module::{
27 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
28};
29use fedimint_core::net::api_announcement::SignedApiAnnouncement;
30use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
31use fedimint_core::task::{MaybeSend, MaybeSync};
32use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
33use fedimint_core::util::backoff_util::api_networking_backoff;
34use fedimint_core::util::{FmtCompact as _, SafeUrl};
35use fedimint_core::{
36 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
37};
38use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
39use futures::channel::oneshot;
40use futures::future::pending;
41use futures::stream::FuturesUnordered;
42use futures::{Future, StreamExt};
43use global_api::with_cache::GlobalFederationApiWithCache;
44use jsonrpsee_core::DeserializeOwned;
45use jsonrpsee_core::client::ClientT;
46pub use jsonrpsee_core::client::Error as JsonRpcClientError;
47use jsonrpsee_types::ErrorCode;
48#[cfg(target_family = "wasm")]
49use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
50#[cfg(not(target_family = "wasm"))]
51use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
52#[cfg(not(target_family = "wasm"))]
53use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56#[cfg(not(target_family = "wasm"))]
57use tokio_rustls::rustls::RootCertStore;
58#[cfg(all(feature = "tor", not(target_family = "wasm")))]
59use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
60use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
61
62use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
63mod error;
64pub mod global_api;
65pub mod net;
66
67pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
68
69pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
70 ApiVersion { major: 0, minor: 1 };
71
72pub type PeerResult<T> = Result<T, PeerError>;
73pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
74pub type FederationResult<T> = Result<T, FederationError>;
75pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
76
77pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
78
79#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
83pub struct ApiVersionSet {
84 pub core: ApiVersion,
85 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
86}
87
88#[apply(async_trait_maybe_send!)]
90pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
91 fn all_peers(&self) -> &BTreeSet<PeerId>;
99
100 fn self_peer(&self) -> Option<PeerId>;
105
106 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
107
108 async fn request_raw(
110 &self,
111 peer_id: PeerId,
112 method: &str,
113 params: &ApiRequestErased,
114 ) -> PeerResult<Value>;
115}
116
117#[apply(async_trait_maybe_send!)]
120pub trait FederationApiExt: IRawFederationApi {
121 async fn request_single_peer<Ret>(
122 &self,
123 method: String,
124 params: ApiRequestErased,
125 peer: PeerId,
126 ) -> PeerResult<Ret>
127 where
128 Ret: DeserializeOwned,
129 {
130 self.request_raw(peer, &method, ¶ms)
131 .await
132 .and_then(|v| {
133 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
134 })
135 }
136
137 async fn request_single_peer_federation<FedRet>(
138 &self,
139 method: String,
140 params: ApiRequestErased,
141 peer_id: PeerId,
142 ) -> FederationResult<FedRet>
143 where
144 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
145 {
146 self.request_raw(peer_id, &method, ¶ms)
147 .await
148 .and_then(|v| {
149 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
150 })
151 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
152 }
153
154 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
157 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
158 &self,
159 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
160 method: String,
161 params: ApiRequestErased,
162 ) -> FederationResult<FR> {
163 #[cfg(not(target_family = "wasm"))]
167 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
168 #[cfg(target_family = "wasm")]
169 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
170
171 for peer in self.all_peers() {
172 futures.push(Box::pin({
173 let method = &method;
174 let params = ¶ms;
175 async move {
176 let result = self
177 .request_single_peer(method.clone(), params.clone(), *peer)
178 .await;
179
180 (*peer, result)
181 }
182 }));
183 }
184
185 let mut peer_errors = BTreeMap::new();
186 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
187
188 loop {
189 let (peer, result) = futures
190 .next()
191 .await
192 .expect("Query strategy ran out of peers to query without returning a result");
193
194 match result {
195 Ok(response) => match strategy.process(peer, response) {
196 QueryStep::Retry(peers) => {
197 for peer in peers {
198 futures.push(Box::pin({
199 let method = &method;
200 let params = ¶ms;
201 async move {
202 let result = self
203 .request_single_peer(method.clone(), params.clone(), peer)
204 .await;
205
206 (peer, result)
207 }
208 }));
209 }
210 }
211 QueryStep::Success(response) => return Ok(response),
212 QueryStep::Failure(e) => {
213 peer_errors.insert(peer, e);
214 }
215 QueryStep::Continue => {}
216 },
217 Err(e) => {
218 e.report_if_unusual(peer, "RequestWithStrategy");
219 peer_errors.insert(peer, e);
220 }
221 }
222
223 if peer_errors.len() == peer_error_threshold {
224 return Err(FederationError::peer_errors(
225 method.clone(),
226 params.params.clone(),
227 peer_errors,
228 ));
229 }
230 }
231 }
232
233 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
234 &self,
235 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
236 method: String,
237 params: ApiRequestErased,
238 ) -> FR {
239 #[cfg(not(target_family = "wasm"))]
243 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
244 #[cfg(target_family = "wasm")]
245 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
246
247 for peer in self.all_peers() {
248 futures.push(Box::pin({
249 let method = &method;
250 let params = ¶ms;
251 async move {
252 let response = util::retry(
253 "api-request-{method}-{peer}",
254 api_networking_backoff(),
255 || async {
256 self.request_single_peer(method.clone(), params.clone(), *peer)
257 .await
258 .inspect_err(|e| {
259 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
260 })
261 .map_err(|e| anyhow!(e.to_string()))
262 },
263 )
264 .await
265 .expect("Number of retries has no limit");
266
267 (*peer, response)
268 }
269 }));
270 }
271
272 loop {
273 let (peer, response) = match futures.next().await {
274 Some(t) => t,
275 None => pending().await,
276 };
277
278 match strategy.process(peer, response) {
279 QueryStep::Retry(peers) => {
280 for peer in peers {
281 futures.push(Box::pin({
282 let method = &method;
283 let params = ¶ms;
284 async move {
285 let response = util::retry(
286 "api-request-{method}-{peer}",
287 api_networking_backoff(),
288 || async {
289 self.request_single_peer(
290 method.clone(),
291 params.clone(),
292 peer,
293 )
294 .await
295 .inspect_err(|err| {
296 if err.is_unusual() {
297 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
298 }
299 })
300 .map_err(|e| anyhow!(e.to_string()))
301 },
302 )
303 .await
304 .expect("Number of retries has no limit");
305
306 (peer, response)
307 }
308 }));
309 }
310 }
311 QueryStep::Success(response) => return response,
312 QueryStep::Failure(e) => {
313 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
314 }
315 QueryStep::Continue => {}
316 }
317 }
318 }
319
320 async fn request_current_consensus<Ret>(
321 &self,
322 method: String,
323 params: ApiRequestErased,
324 ) -> FederationResult<Ret>
325 where
326 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
327 {
328 self.request_with_strategy(
329 ThresholdConsensus::new(self.all_peers().to_num_peers()),
330 method,
331 params,
332 )
333 .await
334 }
335
336 async fn request_current_consensus_retry<Ret>(
337 &self,
338 method: String,
339 params: ApiRequestErased,
340 ) -> Ret
341 where
342 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
343 {
344 self.request_with_strategy_retry(
345 ThresholdConsensus::new(self.all_peers().to_num_peers()),
346 method,
347 params,
348 )
349 .await
350 }
351
352 async fn request_admin<Ret>(
353 &self,
354 method: &str,
355 params: ApiRequestErased,
356 auth: ApiAuth,
357 ) -> FederationResult<Ret>
358 where
359 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
360 {
361 let Some(self_peer_id) = self.self_peer() else {
362 return Err(FederationError::general(
363 method,
364 params,
365 anyhow::format_err!("Admin peer_id not set"),
366 ));
367 };
368
369 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
370 .await
371 }
372
373 async fn request_admin_no_auth<Ret>(
374 &self,
375 method: &str,
376 params: ApiRequestErased,
377 ) -> FederationResult<Ret>
378 where
379 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
380 {
381 let Some(self_peer_id) = self.self_peer() else {
382 return Err(FederationError::general(
383 method,
384 params,
385 anyhow::format_err!("Admin peer_id not set"),
386 ));
387 };
388
389 self.request_single_peer_federation(method.into(), params, self_peer_id)
390 .await
391 }
392}
393
394#[apply(async_trait_maybe_send!)]
395impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
396
397pub trait IModuleFederationApi: IRawFederationApi {}
399
400dyn_newtype_define! {
401 #[derive(Clone)]
402 pub DynModuleApi(Arc<IModuleFederationApi>)
403}
404
405dyn_newtype_define! {
406 #[derive(Clone)]
407 pub DynGlobalApi(Arc<IGlobalFederationApi>)
408}
409
410impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
411 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
412 self.inner.as_ref()
413 }
414}
415
416impl DynGlobalApi {
417 pub async fn new_admin(
418 peer: PeerId,
419 url: SafeUrl,
420 api_secret: &Option<String>,
421 ) -> anyhow::Result<DynGlobalApi> {
422 Ok(GlobalFederationApiWithCache::new(
423 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
424 .await?,
425 )
426 .into())
427 }
428
429 pub async fn from_setup_endpoint(
433 url: SafeUrl,
434 api_secret: &Option<String>,
435 ) -> anyhow::Result<Self> {
436 Self::new_admin(PeerId::from(1024), url, api_secret).await
440 }
441
442 pub async fn from_endpoints(
443 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
444 api_secret: &Option<String>,
445 ) -> anyhow::Result<Self> {
446 Ok(GlobalFederationApiWithCache::new(
447 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
448 )
449 .into())
450 }
451}
452
453#[apply(async_trait_maybe_send!)]
455pub trait IGlobalFederationApi: IRawFederationApi {
456 async fn submit_transaction(
457 &self,
458 tx: Transaction,
459 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
460
461 async fn await_block(
462 &self,
463 block_index: u64,
464 decoders: &ModuleDecoderRegistry,
465 ) -> anyhow::Result<SessionOutcome>;
466
467 async fn get_session_status(
468 &self,
469 block_index: u64,
470 decoders: &ModuleDecoderRegistry,
471 core_api_version: ApiVersion,
472 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
473 ) -> anyhow::Result<SessionStatus>;
474
475 async fn session_count(&self) -> FederationResult<u64>;
476
477 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
478
479 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
480
481 async fn download_backup(
482 &self,
483 id: &secp256k1::PublicKey,
484 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
485
486 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
490
491 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
492
493 async fn set_local_params(
494 &self,
495 name: String,
496 federation_name: Option<String>,
497 auth: ApiAuth,
498 ) -> FederationResult<String>;
499
500 async fn add_peer_connection_info(
501 &self,
502 info: String,
503 auth: ApiAuth,
504 ) -> FederationResult<String>;
505
506 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
514
515 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
520
521 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
525
526 async fn get_verify_config_hash(
529 &self,
530 auth: ApiAuth,
531 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
532
533 async fn verified_configs(
536 &self,
537 auth: ApiAuth,
538 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
539
540 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
546
547 async fn status(&self) -> FederationResult<StatusResponse>;
549
550 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
552
553 async fn guardian_config_backup(&self, auth: ApiAuth)
555 -> FederationResult<GuardianConfigBackup>;
556
557 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
559
560 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
561
562 async fn submit_api_announcement(
564 &self,
565 peer_id: PeerId,
566 announcement: SignedApiAnnouncement,
567 ) -> FederationResult<()>;
568
569 async fn api_announcements(
570 &self,
571 guardian: PeerId,
572 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
573
574 async fn sign_api_announcement(
575 &self,
576 api_url: SafeUrl,
577 auth: ApiAuth,
578 ) -> FederationResult<SignedApiAnnouncement>;
579
580 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
581
582 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
584
585 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
587}
588
589pub fn deserialize_outcome<R>(
590 outcome: &SerdeOutputOutcome,
591 module_decoder: &Decoder,
592) -> OutputOutcomeResult<R>
593where
594 R: OutputOutcome + MaybeSend,
595{
596 let dyn_outcome = outcome
597 .try_into_inner_known_module_kind(module_decoder)
598 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
599
600 let source_instance = dyn_outcome.module_instance_id();
601
602 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
603 let target_type = std::any::type_name::<R>();
604 OutputOutcomeError::ResponseDeserialization(anyhow!(
605 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
606 ))
607 })
608}
609
610#[derive(Debug, Clone)]
611pub struct WebsocketConnector {
612 peers: BTreeMap<PeerId, SafeUrl>,
613 api_secret: Option<String>,
614
615 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
621}
622
623impl WebsocketConnector {
624 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
625 let mut s = Self::new_no_overrides(peers, api_secret);
626
627 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
628 s = s.with_connection_override(k, v);
629 }
630
631 Ok(s)
632 }
633 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
634 self.connection_overrides.insert(peer_id, url);
635 self
636 }
637 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
638 Self {
639 peers,
640 api_secret,
641 connection_overrides: BTreeMap::default(),
642 }
643 }
644}
645
646#[async_trait]
647impl IClientConnector for WebsocketConnector {
648 fn peers(&self) -> BTreeSet<PeerId> {
649 self.peers.keys().copied().collect()
650 }
651
652 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
653 let api_endpoint = match self.connection_overrides.get(&peer_id) {
654 Some(url) => {
655 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
656 url
657 }
658 None => self.peers.get(&peer_id).ok_or_else(|| {
659 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
660 })?,
661 };
662
663 #[cfg(not(target_family = "wasm"))]
664 let mut client = {
665 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
666 let mut root_certs = RootCertStore::empty();
667 root_certs.extend(webpki_roots);
668
669 let tls_cfg = CustomCertStore::builder()
670 .with_root_certificates(root_certs)
671 .with_no_client_auth();
672
673 WsClientBuilder::default()
674 .max_concurrent_requests(u16::MAX as usize)
675 .with_custom_cert_store(tls_cfg)
676 };
677
678 #[cfg(target_family = "wasm")]
679 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
680
681 if let Some(api_secret) = &self.api_secret {
682 #[cfg(not(target_family = "wasm"))]
683 {
684 let mut headers = HeaderMap::new();
687
688 let auth = base64::engine::general_purpose::STANDARD
689 .encode(format!("fedimint:{api_secret}"));
690
691 headers.insert(
692 "Authorization",
693 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
694 );
695
696 client = client.set_headers(headers);
697 }
698 #[cfg(target_family = "wasm")]
699 {
700 let mut url = api_endpoint.clone();
703 url.set_username("fedimint")
704 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
705 url.set_password(Some(&api_secret))
706 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
707
708 let client = client
709 .build(url.as_str())
710 .await
711 .map_err(|err| PeerError::InternalClientError(err.into()))?;
712
713 return Ok(client.into_dyn());
714 }
715 }
716
717 let client = client
718 .build(api_endpoint.as_str())
719 .await
720 .map_err(|err| PeerError::InternalClientError(err.into()))?;
721
722 Ok(client.into_dyn())
723 }
724}
725
726#[cfg(all(feature = "tor", not(target_family = "wasm")))]
727#[derive(Debug, Clone)]
728pub struct TorConnector {
729 peers: BTreeMap<PeerId, SafeUrl>,
730 api_secret: Option<String>,
731}
732
733#[cfg(all(feature = "tor", not(target_family = "wasm")))]
734impl TorConnector {
735 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
736 Self { peers, api_secret }
737 }
738}
739
740#[cfg(all(feature = "tor", not(target_family = "wasm")))]
741#[async_trait]
742impl IClientConnector for TorConnector {
743 fn peers(&self) -> BTreeSet<PeerId> {
744 self.peers.keys().copied().collect()
745 }
746
747 #[allow(clippy::too_many_lines)]
748 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
749 let api_endpoint = self
750 .peers
751 .get(&peer_id)
752 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
753
754 let tor_config = TorClientConfig::default();
755 let tor_client = TorClient::create_bootstrapped(tor_config)
756 .await
757 .map_err(|err| PeerError::InternalClientError(err.into()))?
758 .isolated_client();
759
760 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
761
762 let addr = (
765 api_endpoint
766 .host_str()
767 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
768 api_endpoint
769 .port_or_known_default()
770 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
771 );
772 let tor_addr = TorAddr::from(addr).map_err(|e| {
773 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
774 })?;
775
776 let tor_addr_clone = tor_addr.clone();
777
778 debug!(
779 ?tor_addr,
780 ?addr,
781 "Successfully created `TorAddr` for given address (i.e. host and port)"
782 );
783
784 let anonymized_stream = if api_endpoint.is_onion_address() {
787 let mut stream_prefs = arti_client::StreamPrefs::default();
788 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
789
790 let anonymized_stream = tor_client
791 .connect_with_prefs(tor_addr, &stream_prefs)
792 .await
793 .map_err(|e| PeerError::Connection(e.into()))?;
794
795 debug!(
796 ?tor_addr_clone,
797 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
798 );
799 anonymized_stream
800 } else {
801 let anonymized_stream = tor_client
802 .connect(tor_addr)
803 .await
804 .map_err(|e| PeerError::Connection(e.into()))?;
805
806 debug!(
807 ?tor_addr_clone,
808 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
809 );
810 anonymized_stream
811 };
812
813 let is_tls = match api_endpoint.scheme() {
814 "wss" => true,
815 "ws" => false,
816 unexpected_scheme => {
817 return Err(PeerError::InvalidEndpoint(anyhow!(
818 "Unsupported scheme: {unexpected_scheme}"
819 )));
820 }
821 };
822
823 let tls_connector = if is_tls {
824 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
825 let mut root_certs = RootCertStore::empty();
826 root_certs.extend(webpki_roots);
827
828 let tls_config = TlsClientConfig::builder()
829 .with_root_certificates(root_certs)
830 .with_no_client_auth();
831 let tls_connector = TlsConnector::from(Arc::new(tls_config));
832 Some(tls_connector)
833 } else {
834 None
835 };
836
837 let mut ws_client_builder =
838 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
839
840 if let Some(api_secret) = &self.api_secret {
841 let mut headers = HeaderMap::new();
844
845 let auth =
846 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
847
848 headers.insert(
849 "Authorization",
850 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
851 );
852
853 ws_client_builder = ws_client_builder.set_headers(headers);
854 }
855
856 match tls_connector {
857 None => {
858 let client = ws_client_builder
859 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
860 .await
861 .map_err(|e| PeerError::Connection(e.into()))?;
862
863 Ok(client.into_dyn())
864 }
865 Some(tls_connector) => {
866 let host = api_endpoint
867 .host_str()
868 .map(ToOwned::to_owned)
869 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
870
871 let server_name = rustls_pki_types::ServerName::try_from(host)
874 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
875
876 let anonymized_tls_stream = tls_connector
877 .connect(server_name, anonymized_stream)
878 .await
879 .map_err(|e| PeerError::Connection(e.into()))?;
880
881 let client = ws_client_builder
882 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
883 .await
884 .map_err(|e| PeerError::Connection(e.into()))?;
885
886 Ok(client.into_dyn())
887 }
888 }
889 }
890}
891
892fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
893 match jsonrpc_error {
894 JsonRpcClientError::Call(error_object) => {
895 let error = anyhow!(error_object.message().to_owned());
896 match ErrorCode::from(error_object.code()) {
897 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
898 PeerError::InvalidRequest(error)
899 }
900 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
901 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
902 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
903 PeerError::ServerError(error)
904 }
905 }
906 }
907 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
908 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
909 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
910 JsonRpcClientError::InvalidSubscriptionId => {
911 PeerError::Transport(anyhow!("Invalid subscription id"))
912 }
913 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
914 PeerError::InvalidRequest(anyhow!(invalid_request_id))
915 }
916 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
917 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
918 JsonRpcClientError::HttpNotImplemented => {
919 PeerError::ServerError(anyhow!("Http not implemented"))
920 }
921 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
922 PeerError::InvalidRequest(anyhow!(empty_batch_request))
923 }
924 JsonRpcClientError::RegisterMethod(register_method_error) => {
925 PeerError::InvalidResponse(anyhow!(register_method_error))
926 }
927 }
928}
929
930#[async_trait]
931impl IClientConnection for WsClient {
932 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
933 let method = match method {
934 ApiMethod::Core(method) => method,
935 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
936 };
937
938 Ok(ClientT::request(self, &method, [request.to_json()])
939 .await
940 .map_err(jsonrpc_error_to_peer_error)?)
941 }
942
943 async fn await_disconnection(&self) {
944 self.on_disconnect().await;
945 }
946}
947
948pub type DynClientConnector = Arc<dyn IClientConnector>;
949
950#[async_trait]
953pub trait IClientConnector: Send + Sync + 'static {
954 fn peers(&self) -> BTreeSet<PeerId>;
955
956 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
957
958 fn into_dyn(self) -> DynClientConnector
959 where
960 Self: Sized,
961 {
962 Arc::new(self)
963 }
964}
965
966pub type DynClientConnection = Arc<dyn IClientConnection>;
967
968#[async_trait]
969pub trait IClientConnection: Debug + Send + Sync + 'static {
970 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
971
972 async fn await_disconnection(&self);
973
974 fn into_dyn(self) -> DynClientConnection
975 where
976 Self: Sized,
977 {
978 Arc::new(self)
979 }
980}
981
982#[derive(Clone, Debug)]
983pub struct ReconnectFederationApi {
984 peers: BTreeSet<PeerId>,
985 admin_id: Option<PeerId>,
986 module_id: Option<ModuleInstanceId>,
987 connections: ReconnectClientConnections,
988}
989
990impl ReconnectFederationApi {
991 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
992 Self {
993 peers: connector.peers(),
994 admin_id,
995 module_id: None,
996 connections: ReconnectClientConnections::new(connector),
997 }
998 }
999
1000 pub async fn new_admin(
1001 peer: PeerId,
1002 url: SafeUrl,
1003 api_secret: &Option<String>,
1004 ) -> anyhow::Result<Self> {
1005 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1006 }
1007
1008 pub async fn from_endpoints(
1009 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1010 api_secret: &Option<String>,
1011 admin_id: Option<PeerId>,
1012 ) -> anyhow::Result<Self> {
1013 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1014
1015 let scheme = peers
1016 .values()
1017 .next()
1018 .expect("Federation api has been initialized with no peers")
1019 .scheme();
1020
1021 let connector = match scheme {
1022 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1023 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1024 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1025 "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1026 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1027 };
1028
1029 Ok(ReconnectFederationApi::new(&connector, admin_id))
1030 }
1031}
1032
1033impl IModuleFederationApi for ReconnectFederationApi {}
1034
1035#[apply(async_trait_maybe_send!)]
1036impl IRawFederationApi for ReconnectFederationApi {
1037 fn all_peers(&self) -> &BTreeSet<PeerId> {
1038 &self.peers
1039 }
1040
1041 fn self_peer(&self) -> Option<PeerId> {
1042 self.admin_id
1043 }
1044
1045 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1046 ReconnectFederationApi {
1047 peers: self.peers.clone(),
1048 admin_id: self.admin_id,
1049 module_id: Some(id),
1050 connections: self.connections.clone(),
1051 }
1052 .into()
1053 }
1054
1055 async fn request_raw(
1056 &self,
1057 peer_id: PeerId,
1058 method: &str,
1059 params: &ApiRequestErased,
1060 ) -> PeerResult<Value> {
1061 let method = match self.module_id {
1062 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1063 None => ApiMethod::Core(method.to_string()),
1064 };
1065
1066 self.connections
1067 .request(peer_id, method, params.clone())
1068 .await
1069 }
1070}
1071
1072#[derive(Clone, Debug)]
1073pub struct ReconnectClientConnections {
1074 connections: BTreeMap<PeerId, ClientConnection>,
1075}
1076
1077impl ReconnectClientConnections {
1078 pub fn new(connector: &DynClientConnector) -> Self {
1079 ReconnectClientConnections {
1080 connections: connector
1081 .peers()
1082 .into_iter()
1083 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1084 .collect(),
1085 }
1086 }
1087
1088 async fn request(
1089 &self,
1090 peer: PeerId,
1091 method: ApiMethod,
1092 request: ApiRequestErased,
1093 ) -> PeerResult<Value> {
1094 trace!(target: LOG_NET_API, %method, "Api request");
1095 let res = self
1096 .connections
1097 .get(&peer)
1098 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1099 .connection()
1100 .await
1101 .context("Failed to connect to peer")
1102 .map_err(PeerError::Connection)?
1103 .request(method.clone(), request)
1104 .await;
1105
1106 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1107
1108 res
1109 }
1110}
1111
1112#[derive(Clone, Debug)]
1113struct ClientConnection {
1114 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1115}
1116
1117impl ClientConnection {
1118 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1119 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1120
1121 fedimint_core::task::spawn(
1122 "peer-api-connection",
1123 async move {
1124 let mut backoff = api_networking_backoff();
1125
1126 while let Ok(sender) = receiver.recv().await {
1127 let mut senders = vec![sender];
1128
1129 while let Ok(sender) = receiver.try_recv() {
1132 senders.push(sender);
1133 }
1134
1135 match connector.connect(peer).await {
1136 Ok(connection) => {
1137 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1138
1139 for sender in senders {
1140 sender.send(connection.clone()).ok();
1141 }
1142
1143 loop {
1144 tokio::select! {
1145 sender = receiver.recv() => {
1146 match sender.ok() {
1147 Some(sender) => sender.send(connection.clone()).ok(),
1148 None => break,
1149 };
1150 }
1151 () = connection.await_disconnection() => break,
1152 }
1153 }
1154
1155 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1156
1157 backoff = api_networking_backoff();
1158 }
1159 Err(e) => {
1160 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1161
1162 fedimint_core::task::sleep(
1163 backoff.next().expect("No limit to the number of retries"),
1164 )
1165 .await;
1166 }
1167 }
1168 }
1169
1170 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1171 }
1172 .instrument(trace_span!("peer-api-connection", ?peer)),
1173 );
1174
1175 ClientConnection { sender }
1176 }
1177
1178 async fn connection(&self) -> Option<DynClientConnection> {
1179 let (sender, receiver) = oneshot::channel();
1180
1181 self.sender
1182 .send(sender)
1183 .await
1184 .expect("Api connection request channel closed unexpectedly");
1185
1186 receiver.await.ok()
1187 }
1188}
1189
1190mod iroh {
1191 use std::collections::{BTreeMap, BTreeSet};
1192 use std::str::FromStr;
1193
1194 use anyhow::Context;
1195 use async_trait::async_trait;
1196 use fedimint_core::PeerId;
1197 use fedimint_core::envs::parse_kv_list_from_env;
1198 use fedimint_core::module::{
1199 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1200 };
1201 use fedimint_core::util::SafeUrl;
1202 use fedimint_logging::LOG_NET_IROH;
1203 use iroh::endpoint::Connection;
1204 use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1205 use iroh_base::ticket::NodeTicket;
1206 use serde_json::Value;
1207 use tracing::{debug, trace, warn};
1208
1209 use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1210
1211 #[derive(Debug, Clone)]
1212 pub struct IrohConnector {
1213 node_ids: BTreeMap<PeerId, NodeId>,
1214 endpoint: Endpoint,
1215
1216 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1222 }
1223
1224 impl IrohConnector {
1225 pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1226 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1227 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1228 let mut s = Self::new_no_overrides(peers).await?;
1229
1230 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1231 s = s.with_connection_override(k, v.into());
1232 }
1233
1234 Ok(s)
1235 }
1236
1237 pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1238 let node_ids = peers
1239 .into_iter()
1240 .map(|(peer, url)| {
1241 let host = url.host_str().context("Url is missing host")?;
1242
1243 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1244
1245 Ok((peer, node_id))
1246 })
1247 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1248
1249 let builder = Endpoint::builder().discovery_n0();
1250 #[cfg(not(target_family = "wasm"))]
1251 let builder = builder.discovery_dht();
1252 let endpoint = builder.bind().await?;
1253 debug!(
1254 target: LOG_NET_IROH,
1255 node_id = %endpoint.node_id(),
1256 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
1257 "Iroh api client endpoint"
1258 );
1259
1260 Ok(Self {
1261 node_ids,
1262 endpoint,
1263 connection_overrides: BTreeMap::new(),
1264 })
1265 }
1266
1267 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1268 self.connection_overrides.insert(node, addr);
1269 self
1270 }
1271 }
1272
1273 #[async_trait]
1274 impl IClientConnector for IrohConnector {
1275 fn peers(&self) -> BTreeSet<PeerId> {
1276 self.node_ids.keys().copied().collect()
1277 }
1278
1279 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1280 let node_id = *self
1281 .node_ids
1282 .get(&peer_id)
1283 .ok_or(PeerError::InvalidPeerId { peer_id })?;
1284
1285 let connection = match self.connection_overrides.get(&node_id) {
1286 Some(node_addr) => {
1287 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1288 self.endpoint
1289 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1290 .await
1291 }
1292 None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1293 }.map_err(PeerError::Connection)?;
1294
1295 Ok(connection.into_dyn())
1296 }
1297 }
1298
1299 #[async_trait]
1300 impl IClientConnection for Connection {
1301 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1302 let json = serde_json::to_vec(&IrohApiRequest { method, request })
1303 .expect("Serialization to vec can't fail");
1304
1305 let (mut sink, mut stream) = self
1306 .open_bi()
1307 .await
1308 .map_err(|e| PeerError::Transport(e.into()))?;
1309
1310 sink.write_all(&json)
1311 .await
1312 .map_err(|e| PeerError::Transport(e.into()))?;
1313
1314 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1315
1316 let response = stream
1317 .read_to_end(1_000_000)
1318 .await
1319 .map_err(|e| PeerError::Transport(e.into()))?;
1320
1321 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1323 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1324
1325 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1326 }
1327
1328 async fn await_disconnection(&self) {
1329 self.closed().await;
1330 }
1331 }
1332}
1333
1334#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1336pub struct LegacyFederationStatus {
1337 pub session_count: u64,
1338 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1339 pub peers_online: u64,
1340 pub peers_offline: u64,
1341 pub peers_flagged: u64,
1344 pub scheduled_shutdown: Option<u64>,
1345}
1346
1347#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1348pub struct LegacyPeerStatus {
1349 pub last_contribution: Option<u64>,
1350 pub connection_status: LegacyP2PConnectionStatus,
1351 pub flagged: bool,
1354}
1355
1356#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1357#[serde(rename_all = "snake_case")]
1358pub enum LegacyP2PConnectionStatus {
1359 #[default]
1360 Disconnected,
1361 Connected,
1362}
1363
1364#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1365pub struct StatusResponse {
1366 pub server: ServerStatusLegacy,
1367 pub federation: Option<LegacyFederationStatus>,
1368}
1369
1370#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1373pub struct GuardianConfigBackup {
1374 #[serde(with = "fedimint_core::hex::serde")]
1375 pub tar_archive_bytes: Vec<u8>,
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use std::str::FromStr as _;
1381
1382 use fedimint_core::config::FederationId;
1383 use fedimint_core::invite_code::InviteCode;
1384
1385 use super::*;
1386
1387 #[test]
1388 fn converts_invite_code() {
1389 let connect = InviteCode::new(
1390 "ws://test1".parse().unwrap(),
1391 PeerId::from(1),
1392 FederationId::dummy(),
1393 Some("api_secret".into()),
1394 );
1395
1396 let bech32 = connect.to_string();
1397 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1398 assert_eq!(connect, connect_parsed);
1399
1400 let json = serde_json::to_string(&connect).unwrap();
1401 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1402 assert_eq!(connect_as_string, bech32);
1403 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1404 assert_eq!(connect_parsed_json, connect_parsed);
1405 }
1406
1407 #[test]
1408 fn creates_essential_guardians_invite_code() {
1409 let mut peer_to_url_map = BTreeMap::new();
1410 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1411 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1412 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1413 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1414 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1415
1416 let code =
1417 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1418
1419 assert_eq!(FederationId::dummy(), code.federation_id());
1420
1421 let expected_map: BTreeMap<PeerId, SafeUrl> =
1422 peer_to_url_map.into_iter().take(max_size).collect();
1423 assert_eq!(expected_map, code.peers());
1424 }
1425}