1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24 GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{
31 FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
32};
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::audit::AuditSummary;
35use fedimint_core::module::registry::ModuleDecoderRegistry;
36use fedimint_core::module::{
37 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
38};
39use fedimint_core::net::api_announcement::SignedApiAnnouncement;
40#[cfg(not(target_family = "wasm"))]
41use fedimint_core::rustls::install_crypto_provider;
42use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
43use fedimint_core::task::{MaybeSend, MaybeSync};
44use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
45use fedimint_core::util::backoff_util::api_networking_backoff;
46use fedimint_core::util::{FmtCompact as _, SafeUrl};
47use fedimint_core::{
48 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
49};
50use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
51use futures::channel::oneshot;
52use futures::future::pending;
53use futures::stream::FuturesUnordered;
54use futures::{Future, StreamExt};
55use global_api::with_cache::GlobalFederationApiWithCache;
56use jsonrpsee_core::DeserializeOwned;
57use jsonrpsee_core::client::ClientT;
58pub use jsonrpsee_core::client::Error as JsonRpcClientError;
59use jsonrpsee_types::ErrorCode;
60#[cfg(target_family = "wasm")]
61use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
62#[cfg(not(target_family = "wasm"))]
63use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
64#[cfg(not(target_family = "wasm"))]
65use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
66use serde::{Deserialize, Serialize};
67use serde_json::Value;
68#[cfg(not(target_family = "wasm"))]
69use tokio_rustls::rustls::RootCertStore;
70#[cfg(all(feature = "tor", not(target_family = "wasm")))]
71use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
72use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
73
74use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
75
76pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
77
78pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
79 ApiVersion { major: 0, minor: 1 };
80
81pub type PeerResult<T> = Result<T, PeerError>;
82pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
83pub type FederationResult<T> = Result<T, FederationError>;
84pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
85
86pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
87
88#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
92pub struct ApiVersionSet {
93 pub core: ApiVersion,
94 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
95}
96
97#[apply(async_trait_maybe_send!)]
99pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
100 fn all_peers(&self) -> &BTreeSet<PeerId>;
108
109 fn self_peer(&self) -> Option<PeerId>;
114
115 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
116
117 async fn request_raw(
119 &self,
120 peer_id: PeerId,
121 method: &str,
122 params: &ApiRequestErased,
123 ) -> PeerResult<Value>;
124}
125
126#[apply(async_trait_maybe_send!)]
129pub trait FederationApiExt: IRawFederationApi {
130 async fn request_single_peer<Ret>(
131 &self,
132 method: String,
133 params: ApiRequestErased,
134 peer: PeerId,
135 ) -> PeerResult<Ret>
136 where
137 Ret: DeserializeOwned,
138 {
139 self.request_raw(peer, &method, ¶ms)
140 .await
141 .and_then(|v| {
142 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
143 })
144 }
145
146 async fn request_single_peer_federation<FedRet>(
147 &self,
148 method: String,
149 params: ApiRequestErased,
150 peer_id: PeerId,
151 ) -> FederationResult<FedRet>
152 where
153 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
154 {
155 self.request_raw(peer_id, &method, ¶ms)
156 .await
157 .and_then(|v| {
158 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
159 })
160 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
161 }
162
163 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
166 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
167 &self,
168 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
169 method: String,
170 params: ApiRequestErased,
171 ) -> FederationResult<FR> {
172 #[cfg(not(target_family = "wasm"))]
176 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
177 #[cfg(target_family = "wasm")]
178 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
179
180 for peer in self.all_peers() {
181 futures.push(Box::pin({
182 let method = &method;
183 let params = ¶ms;
184 async move {
185 let result = self
186 .request_single_peer(method.clone(), params.clone(), *peer)
187 .await;
188
189 (*peer, result)
190 }
191 }));
192 }
193
194 let mut peer_errors = BTreeMap::new();
195 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
196
197 loop {
198 let (peer, result) = futures
199 .next()
200 .await
201 .expect("Query strategy ran out of peers to query without returning a result");
202
203 match result {
204 Ok(response) => match strategy.process(peer, response) {
205 QueryStep::Retry(peers) => {
206 for peer in peers {
207 futures.push(Box::pin({
208 let method = &method;
209 let params = ¶ms;
210 async move {
211 let result = self
212 .request_single_peer(method.clone(), params.clone(), peer)
213 .await;
214
215 (peer, result)
216 }
217 }));
218 }
219 }
220 QueryStep::Success(response) => return Ok(response),
221 QueryStep::Failure(e) => {
222 peer_errors.insert(peer, e);
223 }
224 QueryStep::Continue => {}
225 },
226 Err(e) => {
227 e.report_if_unusual(peer, "RequestWithStrategy");
228 peer_errors.insert(peer, e);
229 }
230 }
231
232 if peer_errors.len() == peer_error_threshold {
233 return Err(FederationError::peer_errors(
234 method.clone(),
235 params.params.clone(),
236 peer_errors,
237 ));
238 }
239 }
240 }
241
242 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
243 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
244 &self,
245 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
246 method: String,
247 params: ApiRequestErased,
248 ) -> FR {
249 #[cfg(not(target_family = "wasm"))]
253 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
254 #[cfg(target_family = "wasm")]
255 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
256
257 for peer in self.all_peers() {
258 futures.push(Box::pin({
259 let method = &method;
260 let params = ¶ms;
261 async move {
262 let response = util::retry(
263 format!("api-request-{method}-{peer}"),
264 api_networking_backoff(),
265 || async {
266 self.request_single_peer(method.clone(), params.clone(), *peer)
267 .await
268 .inspect_err(|e| {
269 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
270 })
271 .map_err(|e| anyhow!(e.to_string()))
272 },
273 )
274 .await
275 .expect("Number of retries has no limit");
276
277 (*peer, response)
278 }
279 }));
280 }
281
282 loop {
283 let (peer, response) = match futures.next().await {
284 Some(t) => t,
285 None => pending().await,
286 };
287
288 match strategy.process(peer, response) {
289 QueryStep::Retry(peers) => {
290 for peer in peers {
291 futures.push(Box::pin({
292 let method = &method;
293 let params = ¶ms;
294 async move {
295 let response = util::retry(
296 format!("api-request-{method}-{peer}"),
297 api_networking_backoff(),
298 || async {
299 self.request_single_peer(
300 method.clone(),
301 params.clone(),
302 peer,
303 )
304 .await
305 .inspect_err(|err| {
306 if err.is_unusual() {
307 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
308 }
309 })
310 .map_err(|e| anyhow!(e.to_string()))
311 },
312 )
313 .await
314 .expect("Number of retries has no limit");
315
316 (peer, response)
317 }
318 }));
319 }
320 }
321 QueryStep::Success(response) => return response,
322 QueryStep::Failure(e) => {
323 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
324 }
325 QueryStep::Continue => {}
326 }
327 }
328 }
329
330 async fn request_current_consensus<Ret>(
331 &self,
332 method: String,
333 params: ApiRequestErased,
334 ) -> FederationResult<Ret>
335 where
336 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
337 {
338 self.request_with_strategy(
339 ThresholdConsensus::new(self.all_peers().to_num_peers()),
340 method,
341 params,
342 )
343 .await
344 }
345
346 async fn request_current_consensus_retry<Ret>(
347 &self,
348 method: String,
349 params: ApiRequestErased,
350 ) -> Ret
351 where
352 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
353 {
354 self.request_with_strategy_retry(
355 ThresholdConsensus::new(self.all_peers().to_num_peers()),
356 method,
357 params,
358 )
359 .await
360 }
361
362 async fn request_admin<Ret>(
363 &self,
364 method: &str,
365 params: ApiRequestErased,
366 auth: ApiAuth,
367 ) -> FederationResult<Ret>
368 where
369 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
370 {
371 let Some(self_peer_id) = self.self_peer() else {
372 return Err(FederationError::general(
373 method,
374 params,
375 anyhow::format_err!("Admin peer_id not set"),
376 ));
377 };
378
379 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
380 .await
381 }
382
383 async fn request_admin_no_auth<Ret>(
384 &self,
385 method: &str,
386 params: ApiRequestErased,
387 ) -> FederationResult<Ret>
388 where
389 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
390 {
391 let Some(self_peer_id) = self.self_peer() else {
392 return Err(FederationError::general(
393 method,
394 params,
395 anyhow::format_err!("Admin peer_id not set"),
396 ));
397 };
398
399 self.request_single_peer_federation(method.into(), params, self_peer_id)
400 .await
401 }
402}
403
404#[apply(async_trait_maybe_send!)]
405impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
406
407pub trait IModuleFederationApi: IRawFederationApi {}
409
410dyn_newtype_define! {
411 #[derive(Clone)]
412 pub DynModuleApi(Arc<IModuleFederationApi>)
413}
414
415dyn_newtype_define! {
416 #[derive(Clone)]
417 pub DynGlobalApi(Arc<IGlobalFederationApi>)
418}
419
420impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
421 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
422 self.inner.as_ref()
423 }
424}
425
426impl DynGlobalApi {
427 pub async fn new_admin(
428 peer: PeerId,
429 url: SafeUrl,
430 api_secret: &Option<String>,
431 ) -> anyhow::Result<DynGlobalApi> {
432 Ok(GlobalFederationApiWithCache::new(
433 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
434 .await?,
435 )
436 .into())
437 }
438
439 pub async fn from_setup_endpoint(
443 url: SafeUrl,
444 api_secret: &Option<String>,
445 ) -> anyhow::Result<Self> {
446 Self::new_admin(PeerId::from(1024), url, api_secret).await
450 }
451
452 pub async fn from_endpoints(
453 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
454 api_secret: &Option<String>,
455 ) -> anyhow::Result<Self> {
456 Ok(GlobalFederationApiWithCache::new(
457 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
458 )
459 .into())
460 }
461}
462
463#[apply(async_trait_maybe_send!)]
465pub trait IGlobalFederationApi: IRawFederationApi {
466 async fn submit_transaction(
467 &self,
468 tx: Transaction,
469 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
470
471 async fn await_block(
472 &self,
473 block_index: u64,
474 decoders: &ModuleDecoderRegistry,
475 ) -> anyhow::Result<SessionOutcome>;
476
477 async fn get_session_status(
478 &self,
479 block_index: u64,
480 decoders: &ModuleDecoderRegistry,
481 core_api_version: ApiVersion,
482 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
483 ) -> anyhow::Result<SessionStatus>;
484
485 async fn session_count(&self) -> FederationResult<u64>;
486
487 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
488
489 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
490
491 async fn download_backup(
492 &self,
493 id: &secp256k1::PublicKey,
494 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
495
496 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
500
501 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
502
503 async fn set_local_params(
504 &self,
505 name: String,
506 federation_name: Option<String>,
507 disable_base_fees: Option<bool>,
508 auth: ApiAuth,
509 ) -> FederationResult<String>;
510
511 async fn add_peer_connection_info(
512 &self,
513 info: String,
514 auth: ApiAuth,
515 ) -> FederationResult<String>;
516
517 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
519
520 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
522
523 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
531
532 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
537
538 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
542
543 async fn get_verify_config_hash(
546 &self,
547 auth: ApiAuth,
548 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
549
550 async fn verified_configs(
553 &self,
554 auth: ApiAuth,
555 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
556
557 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
563
564 async fn status(&self) -> FederationResult<StatusResponse>;
566
567 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
569
570 async fn guardian_config_backup(&self, auth: ApiAuth)
572 -> FederationResult<GuardianConfigBackup>;
573
574 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
576
577 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
578
579 async fn submit_api_announcement(
581 &self,
582 peer_id: PeerId,
583 announcement: SignedApiAnnouncement,
584 ) -> FederationResult<()>;
585
586 async fn api_announcements(
587 &self,
588 guardian: PeerId,
589 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
590
591 async fn sign_api_announcement(
592 &self,
593 api_url: SafeUrl,
594 auth: ApiAuth,
595 ) -> FederationResult<SignedApiAnnouncement>;
596
597 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
598
599 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
601
602 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
604
605 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
608}
609
610pub fn deserialize_outcome<R>(
611 outcome: &SerdeOutputOutcome,
612 module_decoder: &Decoder,
613) -> OutputOutcomeResult<R>
614where
615 R: OutputOutcome + MaybeSend,
616{
617 let dyn_outcome = outcome
618 .try_into_inner_known_module_kind(module_decoder)
619 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
620
621 let source_instance = dyn_outcome.module_instance_id();
622
623 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
624 let target_type = std::any::type_name::<R>();
625 OutputOutcomeError::ResponseDeserialization(anyhow!(
626 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
627 ))
628 })
629}
630
631#[derive(Debug, Clone)]
632pub struct WebsocketConnector {
633 peers: BTreeMap<PeerId, SafeUrl>,
634 api_secret: Option<String>,
635
636 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
642}
643
644impl WebsocketConnector {
645 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
646 let mut s = Self::new_no_overrides(peers, api_secret);
647
648 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
649 s = s.with_connection_override(k, v);
650 }
651
652 Ok(s)
653 }
654 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
655 self.connection_overrides.insert(peer_id, url);
656 self
657 }
658 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
659 Self {
660 peers,
661 api_secret,
662 connection_overrides: BTreeMap::default(),
663 }
664 }
665}
666
667#[async_trait]
668impl IClientConnector for WebsocketConnector {
669 fn peers(&self) -> BTreeSet<PeerId> {
670 self.peers.keys().copied().collect()
671 }
672
673 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
674 let api_endpoint = match self.connection_overrides.get(&peer_id) {
675 Some(url) => {
676 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
677 url
678 }
679 None => self.peers.get(&peer_id).ok_or_else(|| {
680 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
681 })?,
682 };
683
684 #[cfg(not(target_family = "wasm"))]
685 let mut client = {
686 install_crypto_provider().await;
687 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
688 let mut root_certs = RootCertStore::empty();
689 root_certs.extend(webpki_roots);
690
691 let tls_cfg = CustomCertStore::builder()
692 .with_root_certificates(root_certs)
693 .with_no_client_auth();
694
695 WsClientBuilder::default()
696 .max_concurrent_requests(u16::MAX as usize)
697 .with_custom_cert_store(tls_cfg)
698 };
699
700 #[cfg(target_family = "wasm")]
701 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
702
703 if let Some(api_secret) = &self.api_secret {
704 #[cfg(not(target_family = "wasm"))]
705 {
706 let mut headers = HeaderMap::new();
709
710 let auth = base64::engine::general_purpose::STANDARD
711 .encode(format!("fedimint:{api_secret}"));
712
713 headers.insert(
714 "Authorization",
715 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
716 );
717
718 client = client.set_headers(headers);
719 }
720 #[cfg(target_family = "wasm")]
721 {
722 let mut url = api_endpoint.clone();
725 url.set_username("fedimint")
726 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
727 url.set_password(Some(&api_secret))
728 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
729
730 let client = client
731 .build(url.as_str())
732 .await
733 .map_err(|err| PeerError::InternalClientError(err.into()))?;
734
735 return Ok(client.into_dyn());
736 }
737 }
738
739 let client = client
740 .build(api_endpoint.as_str())
741 .await
742 .map_err(|err| PeerError::InternalClientError(err.into()))?;
743
744 Ok(client.into_dyn())
745 }
746}
747
748#[cfg(all(feature = "tor", not(target_family = "wasm")))]
749#[derive(Debug, Clone)]
750pub struct TorConnector {
751 peers: BTreeMap<PeerId, SafeUrl>,
752 api_secret: Option<String>,
753}
754
755#[cfg(all(feature = "tor", not(target_family = "wasm")))]
756impl TorConnector {
757 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
758 Self { peers, api_secret }
759 }
760}
761
762#[cfg(all(feature = "tor", not(target_family = "wasm")))]
763#[async_trait]
764impl IClientConnector for TorConnector {
765 fn peers(&self) -> BTreeSet<PeerId> {
766 self.peers.keys().copied().collect()
767 }
768
769 #[allow(clippy::too_many_lines)]
770 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
771 let api_endpoint = self
772 .peers
773 .get(&peer_id)
774 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
775
776 let tor_config = TorClientConfig::default();
777 let tor_client = TorClient::create_bootstrapped(tor_config)
778 .await
779 .map_err(|err| PeerError::InternalClientError(err.into()))?
780 .isolated_client();
781
782 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
783
784 let addr = (
787 api_endpoint
788 .host_str()
789 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
790 api_endpoint
791 .port_or_known_default()
792 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
793 );
794 let tor_addr = TorAddr::from(addr).map_err(|e| {
795 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
796 })?;
797
798 let tor_addr_clone = tor_addr.clone();
799
800 debug!(
801 ?tor_addr,
802 ?addr,
803 "Successfully created `TorAddr` for given address (i.e. host and port)"
804 );
805
806 let anonymized_stream = if api_endpoint.is_onion_address() {
809 let mut stream_prefs = arti_client::StreamPrefs::default();
810 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
811
812 let anonymized_stream = tor_client
813 .connect_with_prefs(tor_addr, &stream_prefs)
814 .await
815 .map_err(|e| PeerError::Connection(e.into()))?;
816
817 debug!(
818 ?tor_addr_clone,
819 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
820 );
821 anonymized_stream
822 } else {
823 let anonymized_stream = tor_client
824 .connect(tor_addr)
825 .await
826 .map_err(|e| PeerError::Connection(e.into()))?;
827
828 debug!(
829 ?tor_addr_clone,
830 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
831 );
832 anonymized_stream
833 };
834
835 let is_tls = match api_endpoint.scheme() {
836 "wss" => true,
837 "ws" => false,
838 unexpected_scheme => {
839 return Err(PeerError::InvalidEndpoint(anyhow!(
840 "Unsupported scheme: {unexpected_scheme}"
841 )));
842 }
843 };
844
845 let tls_connector = if is_tls {
846 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
847 let mut root_certs = RootCertStore::empty();
848 root_certs.extend(webpki_roots);
849
850 let tls_config = TlsClientConfig::builder()
851 .with_root_certificates(root_certs)
852 .with_no_client_auth();
853 let tls_connector = TlsConnector::from(Arc::new(tls_config));
854 Some(tls_connector)
855 } else {
856 None
857 };
858
859 let mut ws_client_builder =
860 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
861
862 if let Some(api_secret) = &self.api_secret {
863 let mut headers = HeaderMap::new();
866
867 let auth =
868 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
869
870 headers.insert(
871 "Authorization",
872 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
873 );
874
875 ws_client_builder = ws_client_builder.set_headers(headers);
876 }
877
878 match tls_connector {
879 None => {
880 let client = ws_client_builder
881 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
882 .await
883 .map_err(|e| PeerError::Connection(e.into()))?;
884
885 Ok(client.into_dyn())
886 }
887 Some(tls_connector) => {
888 let host = api_endpoint
889 .host_str()
890 .map(ToOwned::to_owned)
891 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
892
893 let server_name = rustls_pki_types::ServerName::try_from(host)
896 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
897
898 let anonymized_tls_stream = tls_connector
899 .connect(server_name, anonymized_stream)
900 .await
901 .map_err(|e| PeerError::Connection(e.into()))?;
902
903 let client = ws_client_builder
904 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
905 .await
906 .map_err(|e| PeerError::Connection(e.into()))?;
907
908 Ok(client.into_dyn())
909 }
910 }
911 }
912}
913
914fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
915 match jsonrpc_error {
916 JsonRpcClientError::Call(error_object) => {
917 let error = anyhow!(error_object.message().to_owned());
918 match ErrorCode::from(error_object.code()) {
919 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
920 PeerError::InvalidRequest(error)
921 }
922 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
923 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
924 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
925 PeerError::ServerError(error)
926 }
927 }
928 }
929 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
930 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
931 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
932 JsonRpcClientError::InvalidSubscriptionId => {
933 PeerError::Transport(anyhow!("Invalid subscription id"))
934 }
935 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
936 PeerError::InvalidRequest(anyhow!(invalid_request_id))
937 }
938 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
939 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
940 JsonRpcClientError::HttpNotImplemented => {
941 PeerError::ServerError(anyhow!("Http not implemented"))
942 }
943 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
944 PeerError::InvalidRequest(anyhow!(empty_batch_request))
945 }
946 JsonRpcClientError::RegisterMethod(register_method_error) => {
947 PeerError::InvalidResponse(anyhow!(register_method_error))
948 }
949 }
950}
951
952#[async_trait]
953impl IClientConnection for WsClient {
954 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
955 let method = match method {
956 ApiMethod::Core(method) => method,
957 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
958 };
959
960 Ok(ClientT::request(self, &method, [request.to_json()])
961 .await
962 .map_err(jsonrpc_error_to_peer_error)?)
963 }
964
965 async fn await_disconnection(&self) {
966 self.on_disconnect().await;
967 }
968}
969
970pub type DynClientConnector = Arc<dyn IClientConnector>;
971
972#[async_trait]
975pub trait IClientConnector: Send + Sync + 'static {
976 fn peers(&self) -> BTreeSet<PeerId>;
977
978 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
979
980 fn into_dyn(self) -> DynClientConnector
981 where
982 Self: Sized,
983 {
984 Arc::new(self)
985 }
986}
987
988pub type DynClientConnection = Arc<dyn IClientConnection>;
989
990#[async_trait]
991pub trait IClientConnection: Debug + Send + Sync + 'static {
992 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
993
994 async fn await_disconnection(&self);
995
996 fn into_dyn(self) -> DynClientConnection
997 where
998 Self: Sized,
999 {
1000 Arc::new(self)
1001 }
1002}
1003
1004#[derive(Clone, Debug)]
1005pub struct ReconnectFederationApi {
1006 peers: BTreeSet<PeerId>,
1007 admin_id: Option<PeerId>,
1008 module_id: Option<ModuleInstanceId>,
1009 connections: ReconnectClientConnections,
1010}
1011
1012impl ReconnectFederationApi {
1013 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1014 Self {
1015 peers: connector.peers(),
1016 admin_id,
1017 module_id: None,
1018 connections: ReconnectClientConnections::new(connector),
1019 }
1020 }
1021
1022 pub async fn new_admin(
1023 peer: PeerId,
1024 url: SafeUrl,
1025 api_secret: &Option<String>,
1026 ) -> anyhow::Result<Self> {
1027 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1028 }
1029
1030 pub async fn from_endpoints(
1031 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1032 api_secret: &Option<String>,
1033 admin_id: Option<PeerId>,
1034 ) -> anyhow::Result<Self> {
1035 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1036
1037 let scheme = peers
1038 .values()
1039 .next()
1040 .expect("Federation api has been initialized with no peers")
1041 .scheme();
1042
1043 let connector = match scheme {
1044 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1045 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1046 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1047 "iroh" => {
1048 let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1049 .ok()
1050 .and_then(|dns| dns.parse().ok());
1051 iroh::IrohConnector::new(peers, iroh_dns).await?.into_dyn()
1052 }
1053 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1054 };
1055
1056 Ok(ReconnectFederationApi::new(&connector, admin_id))
1057 }
1058}
1059
1060impl IModuleFederationApi for ReconnectFederationApi {}
1061
1062#[apply(async_trait_maybe_send!)]
1063impl IRawFederationApi for ReconnectFederationApi {
1064 fn all_peers(&self) -> &BTreeSet<PeerId> {
1065 &self.peers
1066 }
1067
1068 fn self_peer(&self) -> Option<PeerId> {
1069 self.admin_id
1070 }
1071
1072 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1073 ReconnectFederationApi {
1074 peers: self.peers.clone(),
1075 admin_id: self.admin_id,
1076 module_id: Some(id),
1077 connections: self.connections.clone(),
1078 }
1079 .into()
1080 }
1081
1082 #[instrument(
1083 target = LOG_NET_API,
1084 skip_all,
1085 fields(
1086 peer_id = %peer_id,
1087 method = %method,
1088 params = %params.params,
1089 )
1090 )]
1091 async fn request_raw(
1092 &self,
1093 peer_id: PeerId,
1094 method: &str,
1095 params: &ApiRequestErased,
1096 ) -> PeerResult<Value> {
1097 let method = match self.module_id {
1098 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1099 None => ApiMethod::Core(method.to_string()),
1100 };
1101
1102 self.connections
1103 .request(peer_id, method, params.clone())
1104 .await
1105 }
1106}
1107
1108#[derive(Clone, Debug)]
1109pub struct ReconnectClientConnections {
1110 connections: BTreeMap<PeerId, ClientConnection>,
1111}
1112
1113impl ReconnectClientConnections {
1114 pub fn new(connector: &DynClientConnector) -> Self {
1115 ReconnectClientConnections {
1116 connections: connector
1117 .peers()
1118 .into_iter()
1119 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1120 .collect(),
1121 }
1122 }
1123
1124 async fn request(
1125 &self,
1126 peer: PeerId,
1127 method: ApiMethod,
1128 request: ApiRequestErased,
1129 ) -> PeerResult<Value> {
1130 trace!(target: LOG_NET_API, %method, "Api request");
1131 let res = self
1132 .connections
1133 .get(&peer)
1134 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1135 .connection()
1136 .await
1137 .context("Failed to connect to peer")
1138 .map_err(PeerError::Connection)?
1139 .request(method.clone(), request)
1140 .await;
1141
1142 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1143
1144 res
1145 }
1146}
1147
1148#[derive(Clone, Debug)]
1149struct ClientConnection {
1150 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1151}
1152
1153impl ClientConnection {
1154 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1155 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1156
1157 fedimint_core::task::spawn(
1158 "peer-api-connection",
1159 async move {
1160 let mut backoff = api_networking_backoff();
1161
1162 while let Ok(sender) = receiver.recv().await {
1163 let mut senders = vec![sender];
1164
1165 while let Ok(sender) = receiver.try_recv() {
1168 senders.push(sender);
1169 }
1170
1171 match connector.connect(peer).await {
1172 Ok(connection) => {
1173 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1174
1175 for sender in senders {
1176 sender.send(connection.clone()).ok();
1177 }
1178
1179 loop {
1180 tokio::select! {
1181 sender = receiver.recv() => {
1182 match sender.ok() {
1183 Some(sender) => sender.send(connection.clone()).ok(),
1184 None => break,
1185 };
1186 }
1187 () = connection.await_disconnection() => break,
1188 }
1189 }
1190
1191 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1192
1193 backoff = api_networking_backoff();
1194 }
1195 Err(e) => {
1196 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1197
1198 fedimint_core::task::sleep(
1199 backoff.next().expect("No limit to the number of retries"),
1200 )
1201 .await;
1202 }
1203 }
1204 }
1205
1206 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1207 }
1208 .instrument(trace_span!("peer-api-connection", ?peer)),
1209 );
1210
1211 ClientConnection { sender }
1212 }
1213
1214 async fn connection(&self) -> Option<DynClientConnection> {
1215 let (sender, receiver) = oneshot::channel();
1216
1217 self.sender
1218 .send(sender)
1219 .await
1220 .inspect_err(|err| {
1221 warn!(
1222 target: LOG_CLIENT_NET_API,
1223 err = %err.fmt_compact(),
1224 "Api connection request channel closed unexpectedly"
1225 );
1226 })
1227 .ok()?;
1228
1229 receiver.await.ok()
1230 }
1231}
1232
1233#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1235pub struct LegacyFederationStatus {
1236 pub session_count: u64,
1237 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1238 pub peers_online: u64,
1239 pub peers_offline: u64,
1240 pub peers_flagged: u64,
1243 pub scheduled_shutdown: Option<u64>,
1244}
1245
1246#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1247pub struct LegacyPeerStatus {
1248 pub last_contribution: Option<u64>,
1249 pub connection_status: LegacyP2PConnectionStatus,
1250 pub flagged: bool,
1253}
1254
1255#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1256#[serde(rename_all = "snake_case")]
1257pub enum LegacyP2PConnectionStatus {
1258 #[default]
1259 Disconnected,
1260 Connected,
1261}
1262
1263#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1264pub struct StatusResponse {
1265 pub server: ServerStatusLegacy,
1266 pub federation: Option<LegacyFederationStatus>,
1267}
1268
1269#[cfg(test)]
1270mod tests;