1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
24use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
25use fedimint_core::core::backup::SignedBackupRequest;
26use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
27use fedimint_core::encoding::{Decodable, Encodable};
28use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::audit::AuditSummary;
31use fedimint_core::module::registry::ModuleDecoderRegistry;
32use fedimint_core::module::{
33 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
34};
35use fedimint_core::net::api_announcement::SignedApiAnnouncement;
36use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
37use fedimint_core::task::{MaybeSend, MaybeSync};
38use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
39use fedimint_core::util::backoff_util::api_networking_backoff;
40use fedimint_core::util::{FmtCompact as _, SafeUrl};
41use fedimint_core::{
42 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
43};
44use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
45use futures::channel::oneshot;
46use futures::future::pending;
47use futures::stream::FuturesUnordered;
48use futures::{Future, StreamExt};
49use global_api::with_cache::GlobalFederationApiWithCache;
50use jsonrpsee_core::DeserializeOwned;
51use jsonrpsee_core::client::ClientT;
52pub use jsonrpsee_core::client::Error as JsonRpcClientError;
53use jsonrpsee_types::ErrorCode;
54#[cfg(target_family = "wasm")]
55use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
56#[cfg(not(target_family = "wasm"))]
57use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
58#[cfg(not(target_family = "wasm"))]
59use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use tokio::sync::OnceCell;
63#[cfg(not(target_family = "wasm"))]
64use tokio_rustls::rustls::RootCertStore;
65#[cfg(all(feature = "tor", not(target_family = "wasm")))]
66use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
67use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
68
69use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
70
71pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
72
73pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
74 ApiVersion { major: 0, minor: 1 };
75
76pub type PeerResult<T> = Result<T, PeerError>;
77pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
78pub type FederationResult<T> = Result<T, FederationError>;
79pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
80
81pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
82
83static INSTALL_CRYPTO: OnceCell<()> = OnceCell::const_new();
84
85#[cfg(not(target_family = "wasm"))]
86async fn install_crypto_provider() {
87 INSTALL_CRYPTO
88 .get_or_init(|| async {
89 tokio_rustls::rustls::crypto::ring::default_provider()
90 .install_default()
91 .expect("Failed to install crypto");
92 })
93 .await;
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
100pub struct ApiVersionSet {
101 pub core: ApiVersion,
102 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
103}
104
105#[apply(async_trait_maybe_send!)]
107pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
108 fn all_peers(&self) -> &BTreeSet<PeerId>;
116
117 fn self_peer(&self) -> Option<PeerId>;
122
123 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
124
125 async fn request_raw(
127 &self,
128 peer_id: PeerId,
129 method: &str,
130 params: &ApiRequestErased,
131 ) -> PeerResult<Value>;
132}
133
134#[apply(async_trait_maybe_send!)]
137pub trait FederationApiExt: IRawFederationApi {
138 async fn request_single_peer<Ret>(
139 &self,
140 method: String,
141 params: ApiRequestErased,
142 peer: PeerId,
143 ) -> PeerResult<Ret>
144 where
145 Ret: DeserializeOwned,
146 {
147 self.request_raw(peer, &method, ¶ms)
148 .await
149 .and_then(|v| {
150 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
151 })
152 }
153
154 async fn request_single_peer_federation<FedRet>(
155 &self,
156 method: String,
157 params: ApiRequestErased,
158 peer_id: PeerId,
159 ) -> FederationResult<FedRet>
160 where
161 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
162 {
163 self.request_raw(peer_id, &method, ¶ms)
164 .await
165 .and_then(|v| {
166 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
167 })
168 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
169 }
170
171 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
174 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
175 &self,
176 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
177 method: String,
178 params: ApiRequestErased,
179 ) -> FederationResult<FR> {
180 #[cfg(not(target_family = "wasm"))]
184 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
185 #[cfg(target_family = "wasm")]
186 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
187
188 for peer in self.all_peers() {
189 futures.push(Box::pin({
190 let method = &method;
191 let params = ¶ms;
192 async move {
193 let result = self
194 .request_single_peer(method.clone(), params.clone(), *peer)
195 .await;
196
197 (*peer, result)
198 }
199 }));
200 }
201
202 let mut peer_errors = BTreeMap::new();
203 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
204
205 loop {
206 let (peer, result) = futures
207 .next()
208 .await
209 .expect("Query strategy ran out of peers to query without returning a result");
210
211 match result {
212 Ok(response) => match strategy.process(peer, response) {
213 QueryStep::Retry(peers) => {
214 for peer in peers {
215 futures.push(Box::pin({
216 let method = &method;
217 let params = ¶ms;
218 async move {
219 let result = self
220 .request_single_peer(method.clone(), params.clone(), peer)
221 .await;
222
223 (peer, result)
224 }
225 }));
226 }
227 }
228 QueryStep::Success(response) => return Ok(response),
229 QueryStep::Failure(e) => {
230 peer_errors.insert(peer, e);
231 }
232 QueryStep::Continue => {}
233 },
234 Err(e) => {
235 e.report_if_unusual(peer, "RequestWithStrategy");
236 peer_errors.insert(peer, e);
237 }
238 }
239
240 if peer_errors.len() == peer_error_threshold {
241 return Err(FederationError::peer_errors(
242 method.clone(),
243 params.params.clone(),
244 peer_errors,
245 ));
246 }
247 }
248 }
249
250 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
251 &self,
252 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
253 method: String,
254 params: ApiRequestErased,
255 ) -> FR {
256 #[cfg(not(target_family = "wasm"))]
260 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
261 #[cfg(target_family = "wasm")]
262 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
263
264 for peer in self.all_peers() {
265 futures.push(Box::pin({
266 let method = &method;
267 let params = ¶ms;
268 async move {
269 let response = util::retry(
270 format!("api-request-{method}-{peer}"),
271 api_networking_backoff(),
272 || async {
273 self.request_single_peer(method.clone(), params.clone(), *peer)
274 .await
275 .inspect_err(|e| {
276 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
277 })
278 .map_err(|e| anyhow!(e.to_string()))
279 },
280 )
281 .await
282 .expect("Number of retries has no limit");
283
284 (*peer, response)
285 }
286 }));
287 }
288
289 loop {
290 let (peer, response) = match futures.next().await {
291 Some(t) => t,
292 None => pending().await,
293 };
294
295 match strategy.process(peer, response) {
296 QueryStep::Retry(peers) => {
297 for peer in peers {
298 futures.push(Box::pin({
299 let method = &method;
300 let params = ¶ms;
301 async move {
302 let response = util::retry(
303 format!("api-request-{method}-{peer}"),
304 api_networking_backoff(),
305 || async {
306 self.request_single_peer(
307 method.clone(),
308 params.clone(),
309 peer,
310 )
311 .await
312 .inspect_err(|err| {
313 if err.is_unusual() {
314 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
315 }
316 })
317 .map_err(|e| anyhow!(e.to_string()))
318 },
319 )
320 .await
321 .expect("Number of retries has no limit");
322
323 (peer, response)
324 }
325 }));
326 }
327 }
328 QueryStep::Success(response) => return response,
329 QueryStep::Failure(e) => {
330 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
331 }
332 QueryStep::Continue => {}
333 }
334 }
335 }
336
337 async fn request_current_consensus<Ret>(
338 &self,
339 method: String,
340 params: ApiRequestErased,
341 ) -> FederationResult<Ret>
342 where
343 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344 {
345 self.request_with_strategy(
346 ThresholdConsensus::new(self.all_peers().to_num_peers()),
347 method,
348 params,
349 )
350 .await
351 }
352
353 async fn request_current_consensus_retry<Ret>(
354 &self,
355 method: String,
356 params: ApiRequestErased,
357 ) -> Ret
358 where
359 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
360 {
361 self.request_with_strategy_retry(
362 ThresholdConsensus::new(self.all_peers().to_num_peers()),
363 method,
364 params,
365 )
366 .await
367 }
368
369 async fn request_admin<Ret>(
370 &self,
371 method: &str,
372 params: ApiRequestErased,
373 auth: ApiAuth,
374 ) -> FederationResult<Ret>
375 where
376 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
377 {
378 let Some(self_peer_id) = self.self_peer() else {
379 return Err(FederationError::general(
380 method,
381 params,
382 anyhow::format_err!("Admin peer_id not set"),
383 ));
384 };
385
386 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
387 .await
388 }
389
390 async fn request_admin_no_auth<Ret>(
391 &self,
392 method: &str,
393 params: ApiRequestErased,
394 ) -> FederationResult<Ret>
395 where
396 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
397 {
398 let Some(self_peer_id) = self.self_peer() else {
399 return Err(FederationError::general(
400 method,
401 params,
402 anyhow::format_err!("Admin peer_id not set"),
403 ));
404 };
405
406 self.request_single_peer_federation(method.into(), params, self_peer_id)
407 .await
408 }
409}
410
411#[apply(async_trait_maybe_send!)]
412impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
413
414pub trait IModuleFederationApi: IRawFederationApi {}
416
417dyn_newtype_define! {
418 #[derive(Clone)]
419 pub DynModuleApi(Arc<IModuleFederationApi>)
420}
421
422dyn_newtype_define! {
423 #[derive(Clone)]
424 pub DynGlobalApi(Arc<IGlobalFederationApi>)
425}
426
427impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
428 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
429 self.inner.as_ref()
430 }
431}
432
433impl DynGlobalApi {
434 pub async fn new_admin(
435 peer: PeerId,
436 url: SafeUrl,
437 api_secret: &Option<String>,
438 ) -> anyhow::Result<DynGlobalApi> {
439 Ok(GlobalFederationApiWithCache::new(
440 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
441 .await?,
442 )
443 .into())
444 }
445
446 pub async fn from_setup_endpoint(
450 url: SafeUrl,
451 api_secret: &Option<String>,
452 ) -> anyhow::Result<Self> {
453 Self::new_admin(PeerId::from(1024), url, api_secret).await
457 }
458
459 pub async fn from_endpoints(
460 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
461 api_secret: &Option<String>,
462 ) -> anyhow::Result<Self> {
463 Ok(GlobalFederationApiWithCache::new(
464 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
465 )
466 .into())
467 }
468}
469
470#[apply(async_trait_maybe_send!)]
472pub trait IGlobalFederationApi: IRawFederationApi {
473 async fn submit_transaction(
474 &self,
475 tx: Transaction,
476 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
477
478 async fn await_block(
479 &self,
480 block_index: u64,
481 decoders: &ModuleDecoderRegistry,
482 ) -> anyhow::Result<SessionOutcome>;
483
484 async fn get_session_status(
485 &self,
486 block_index: u64,
487 decoders: &ModuleDecoderRegistry,
488 core_api_version: ApiVersion,
489 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
490 ) -> anyhow::Result<SessionStatus>;
491
492 async fn session_count(&self) -> FederationResult<u64>;
493
494 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
495
496 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
497
498 async fn download_backup(
499 &self,
500 id: &secp256k1::PublicKey,
501 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
502
503 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
507
508 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
509
510 async fn set_local_params(
511 &self,
512 name: String,
513 federation_name: Option<String>,
514 auth: ApiAuth,
515 ) -> FederationResult<String>;
516
517 async fn add_peer_connection_info(
518 &self,
519 info: String,
520 auth: ApiAuth,
521 ) -> FederationResult<String>;
522
523 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
525
526 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
528
529 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
537
538 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
543
544 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
548
549 async fn get_verify_config_hash(
552 &self,
553 auth: ApiAuth,
554 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
555
556 async fn verified_configs(
559 &self,
560 auth: ApiAuth,
561 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
562
563 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
569
570 async fn status(&self) -> FederationResult<StatusResponse>;
572
573 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
575
576 async fn guardian_config_backup(&self, auth: ApiAuth)
578 -> FederationResult<GuardianConfigBackup>;
579
580 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
582
583 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
584
585 async fn submit_api_announcement(
587 &self,
588 peer_id: PeerId,
589 announcement: SignedApiAnnouncement,
590 ) -> FederationResult<()>;
591
592 async fn api_announcements(
593 &self,
594 guardian: PeerId,
595 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
596
597 async fn sign_api_announcement(
598 &self,
599 api_url: SafeUrl,
600 auth: ApiAuth,
601 ) -> FederationResult<SignedApiAnnouncement>;
602
603 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
604
605 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
607
608 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
610
611 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
614}
615
616pub fn deserialize_outcome<R>(
617 outcome: &SerdeOutputOutcome,
618 module_decoder: &Decoder,
619) -> OutputOutcomeResult<R>
620where
621 R: OutputOutcome + MaybeSend,
622{
623 let dyn_outcome = outcome
624 .try_into_inner_known_module_kind(module_decoder)
625 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
626
627 let source_instance = dyn_outcome.module_instance_id();
628
629 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
630 let target_type = std::any::type_name::<R>();
631 OutputOutcomeError::ResponseDeserialization(anyhow!(
632 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
633 ))
634 })
635}
636
637#[derive(Debug, Clone)]
638pub struct WebsocketConnector {
639 peers: BTreeMap<PeerId, SafeUrl>,
640 api_secret: Option<String>,
641
642 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
648}
649
650impl WebsocketConnector {
651 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
652 let mut s = Self::new_no_overrides(peers, api_secret);
653
654 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
655 s = s.with_connection_override(k, v);
656 }
657
658 Ok(s)
659 }
660 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
661 self.connection_overrides.insert(peer_id, url);
662 self
663 }
664 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
665 Self {
666 peers,
667 api_secret,
668 connection_overrides: BTreeMap::default(),
669 }
670 }
671}
672
673#[async_trait]
674impl IClientConnector for WebsocketConnector {
675 fn peers(&self) -> BTreeSet<PeerId> {
676 self.peers.keys().copied().collect()
677 }
678
679 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
680 let api_endpoint = match self.connection_overrides.get(&peer_id) {
681 Some(url) => {
682 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
683 url
684 }
685 None => self.peers.get(&peer_id).ok_or_else(|| {
686 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
687 })?,
688 };
689
690 #[cfg(not(target_family = "wasm"))]
691 let mut client = {
692 install_crypto_provider().await;
693 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
694 let mut root_certs = RootCertStore::empty();
695 root_certs.extend(webpki_roots);
696
697 let tls_cfg = CustomCertStore::builder()
698 .with_root_certificates(root_certs)
699 .with_no_client_auth();
700
701 WsClientBuilder::default()
702 .max_concurrent_requests(u16::MAX as usize)
703 .with_custom_cert_store(tls_cfg)
704 };
705
706 #[cfg(target_family = "wasm")]
707 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
708
709 if let Some(api_secret) = &self.api_secret {
710 #[cfg(not(target_family = "wasm"))]
711 {
712 let mut headers = HeaderMap::new();
715
716 let auth = base64::engine::general_purpose::STANDARD
717 .encode(format!("fedimint:{api_secret}"));
718
719 headers.insert(
720 "Authorization",
721 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
722 );
723
724 client = client.set_headers(headers);
725 }
726 #[cfg(target_family = "wasm")]
727 {
728 let mut url = api_endpoint.clone();
731 url.set_username("fedimint")
732 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
733 url.set_password(Some(&api_secret))
734 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
735
736 let client = client
737 .build(url.as_str())
738 .await
739 .map_err(|err| PeerError::InternalClientError(err.into()))?;
740
741 return Ok(client.into_dyn());
742 }
743 }
744
745 let client = client
746 .build(api_endpoint.as_str())
747 .await
748 .map_err(|err| PeerError::InternalClientError(err.into()))?;
749
750 Ok(client.into_dyn())
751 }
752}
753
754#[cfg(all(feature = "tor", not(target_family = "wasm")))]
755#[derive(Debug, Clone)]
756pub struct TorConnector {
757 peers: BTreeMap<PeerId, SafeUrl>,
758 api_secret: Option<String>,
759}
760
761#[cfg(all(feature = "tor", not(target_family = "wasm")))]
762impl TorConnector {
763 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
764 Self { peers, api_secret }
765 }
766}
767
768#[cfg(all(feature = "tor", not(target_family = "wasm")))]
769#[async_trait]
770impl IClientConnector for TorConnector {
771 fn peers(&self) -> BTreeSet<PeerId> {
772 self.peers.keys().copied().collect()
773 }
774
775 #[allow(clippy::too_many_lines)]
776 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
777 let api_endpoint = self
778 .peers
779 .get(&peer_id)
780 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
781
782 install_crypto_provider().await;
783
784 let tor_config = TorClientConfig::default();
785 let tor_client = TorClient::create_bootstrapped(tor_config)
786 .await
787 .map_err(|err| PeerError::InternalClientError(err.into()))?
788 .isolated_client();
789
790 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
791
792 let addr = (
795 api_endpoint
796 .host_str()
797 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
798 api_endpoint
799 .port_or_known_default()
800 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
801 );
802 let tor_addr = TorAddr::from(addr).map_err(|e| {
803 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
804 })?;
805
806 let tor_addr_clone = tor_addr.clone();
807
808 debug!(
809 ?tor_addr,
810 ?addr,
811 "Successfully created `TorAddr` for given address (i.e. host and port)"
812 );
813
814 let anonymized_stream = if api_endpoint.is_onion_address() {
817 let mut stream_prefs = arti_client::StreamPrefs::default();
818 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
819
820 let anonymized_stream = tor_client
821 .connect_with_prefs(tor_addr, &stream_prefs)
822 .await
823 .map_err(|e| PeerError::Connection(e.into()))?;
824
825 debug!(
826 ?tor_addr_clone,
827 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
828 );
829 anonymized_stream
830 } else {
831 let anonymized_stream = tor_client
832 .connect(tor_addr)
833 .await
834 .map_err(|e| PeerError::Connection(e.into()))?;
835
836 debug!(
837 ?tor_addr_clone,
838 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
839 );
840 anonymized_stream
841 };
842
843 let is_tls = match api_endpoint.scheme() {
844 "wss" => true,
845 "ws" => false,
846 unexpected_scheme => {
847 return Err(PeerError::InvalidEndpoint(anyhow!(
848 "Unsupported scheme: {unexpected_scheme}"
849 )));
850 }
851 };
852
853 let tls_connector = if is_tls {
854 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
855 let mut root_certs = RootCertStore::empty();
856 root_certs.extend(webpki_roots);
857
858 let tls_config = TlsClientConfig::builder()
859 .with_root_certificates(root_certs)
860 .with_no_client_auth();
861 let tls_connector = TlsConnector::from(Arc::new(tls_config));
862 Some(tls_connector)
863 } else {
864 None
865 };
866
867 let mut ws_client_builder =
868 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
869
870 if let Some(api_secret) = &self.api_secret {
871 let mut headers = HeaderMap::new();
874
875 let auth =
876 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
877
878 headers.insert(
879 "Authorization",
880 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
881 );
882
883 ws_client_builder = ws_client_builder.set_headers(headers);
884 }
885
886 match tls_connector {
887 None => {
888 let client = ws_client_builder
889 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
890 .await
891 .map_err(|e| PeerError::Connection(e.into()))?;
892
893 Ok(client.into_dyn())
894 }
895 Some(tls_connector) => {
896 let host = api_endpoint
897 .host_str()
898 .map(ToOwned::to_owned)
899 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
900
901 let server_name = rustls_pki_types::ServerName::try_from(host)
904 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
905
906 let anonymized_tls_stream = tls_connector
907 .connect(server_name, anonymized_stream)
908 .await
909 .map_err(|e| PeerError::Connection(e.into()))?;
910
911 let client = ws_client_builder
912 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
913 .await
914 .map_err(|e| PeerError::Connection(e.into()))?;
915
916 Ok(client.into_dyn())
917 }
918 }
919 }
920}
921
922fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
923 match jsonrpc_error {
924 JsonRpcClientError::Call(error_object) => {
925 let error = anyhow!(error_object.message().to_owned());
926 match ErrorCode::from(error_object.code()) {
927 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
928 PeerError::InvalidRequest(error)
929 }
930 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
931 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
932 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
933 PeerError::ServerError(error)
934 }
935 }
936 }
937 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
938 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
939 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
940 JsonRpcClientError::InvalidSubscriptionId => {
941 PeerError::Transport(anyhow!("Invalid subscription id"))
942 }
943 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
944 PeerError::InvalidRequest(anyhow!(invalid_request_id))
945 }
946 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
947 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
948 JsonRpcClientError::HttpNotImplemented => {
949 PeerError::ServerError(anyhow!("Http not implemented"))
950 }
951 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
952 PeerError::InvalidRequest(anyhow!(empty_batch_request))
953 }
954 JsonRpcClientError::RegisterMethod(register_method_error) => {
955 PeerError::InvalidResponse(anyhow!(register_method_error))
956 }
957 }
958}
959
960#[async_trait]
961impl IClientConnection for WsClient {
962 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
963 let method = match method {
964 ApiMethod::Core(method) => method,
965 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
966 };
967
968 Ok(ClientT::request(self, &method, [request.to_json()])
969 .await
970 .map_err(jsonrpc_error_to_peer_error)?)
971 }
972
973 async fn await_disconnection(&self) {
974 self.on_disconnect().await;
975 }
976}
977
978pub type DynClientConnector = Arc<dyn IClientConnector>;
979
980#[async_trait]
983pub trait IClientConnector: Send + Sync + 'static {
984 fn peers(&self) -> BTreeSet<PeerId>;
985
986 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
987
988 fn into_dyn(self) -> DynClientConnector
989 where
990 Self: Sized,
991 {
992 Arc::new(self)
993 }
994}
995
996pub type DynClientConnection = Arc<dyn IClientConnection>;
997
998#[async_trait]
999pub trait IClientConnection: Debug + Send + Sync + 'static {
1000 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
1001
1002 async fn await_disconnection(&self);
1003
1004 fn into_dyn(self) -> DynClientConnection
1005 where
1006 Self: Sized,
1007 {
1008 Arc::new(self)
1009 }
1010}
1011
1012#[derive(Clone, Debug)]
1013pub struct ReconnectFederationApi {
1014 peers: BTreeSet<PeerId>,
1015 admin_id: Option<PeerId>,
1016 module_id: Option<ModuleInstanceId>,
1017 connections: ReconnectClientConnections,
1018}
1019
1020impl ReconnectFederationApi {
1021 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1022 Self {
1023 peers: connector.peers(),
1024 admin_id,
1025 module_id: None,
1026 connections: ReconnectClientConnections::new(connector),
1027 }
1028 }
1029
1030 pub async fn new_admin(
1031 peer: PeerId,
1032 url: SafeUrl,
1033 api_secret: &Option<String>,
1034 ) -> anyhow::Result<Self> {
1035 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1036 }
1037
1038 pub async fn from_endpoints(
1039 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1040 api_secret: &Option<String>,
1041 admin_id: Option<PeerId>,
1042 ) -> anyhow::Result<Self> {
1043 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1044
1045 let scheme = peers
1046 .values()
1047 .next()
1048 .expect("Federation api has been initialized with no peers")
1049 .scheme();
1050
1051 let connector = match scheme {
1052 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1053 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1054 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1055 "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1056 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1057 };
1058
1059 Ok(ReconnectFederationApi::new(&connector, admin_id))
1060 }
1061}
1062
1063impl IModuleFederationApi for ReconnectFederationApi {}
1064
1065#[apply(async_trait_maybe_send!)]
1066impl IRawFederationApi for ReconnectFederationApi {
1067 fn all_peers(&self) -> &BTreeSet<PeerId> {
1068 &self.peers
1069 }
1070
1071 fn self_peer(&self) -> Option<PeerId> {
1072 self.admin_id
1073 }
1074
1075 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1076 ReconnectFederationApi {
1077 peers: self.peers.clone(),
1078 admin_id: self.admin_id,
1079 module_id: Some(id),
1080 connections: self.connections.clone(),
1081 }
1082 .into()
1083 }
1084
1085 #[instrument(
1086 target = LOG_NET_API,
1087 skip_all,
1088 fields(
1089 peer_id = %peer_id,
1090 method = %method,
1091 params = %params.params,
1092 )
1093 )]
1094 async fn request_raw(
1095 &self,
1096 peer_id: PeerId,
1097 method: &str,
1098 params: &ApiRequestErased,
1099 ) -> PeerResult<Value> {
1100 let method = match self.module_id {
1101 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1102 None => ApiMethod::Core(method.to_string()),
1103 };
1104
1105 self.connections
1106 .request(peer_id, method, params.clone())
1107 .await
1108 }
1109}
1110
1111#[derive(Clone, Debug)]
1112pub struct ReconnectClientConnections {
1113 connections: BTreeMap<PeerId, ClientConnection>,
1114}
1115
1116impl ReconnectClientConnections {
1117 pub fn new(connector: &DynClientConnector) -> Self {
1118 ReconnectClientConnections {
1119 connections: connector
1120 .peers()
1121 .into_iter()
1122 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1123 .collect(),
1124 }
1125 }
1126
1127 async fn request(
1128 &self,
1129 peer: PeerId,
1130 method: ApiMethod,
1131 request: ApiRequestErased,
1132 ) -> PeerResult<Value> {
1133 trace!(target: LOG_NET_API, %method, "Api request");
1134 let res = self
1135 .connections
1136 .get(&peer)
1137 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1138 .connection()
1139 .await
1140 .context("Failed to connect to peer")
1141 .map_err(PeerError::Connection)?
1142 .request(method.clone(), request)
1143 .await;
1144
1145 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1146
1147 res
1148 }
1149}
1150
1151#[derive(Clone, Debug)]
1152struct ClientConnection {
1153 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1154}
1155
1156impl ClientConnection {
1157 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1158 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1159
1160 fedimint_core::task::spawn(
1161 "peer-api-connection",
1162 async move {
1163 let mut backoff = api_networking_backoff();
1164
1165 while let Ok(sender) = receiver.recv().await {
1166 let mut senders = vec![sender];
1167
1168 while let Ok(sender) = receiver.try_recv() {
1171 senders.push(sender);
1172 }
1173
1174 match connector.connect(peer).await {
1175 Ok(connection) => {
1176 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1177
1178 for sender in senders {
1179 sender.send(connection.clone()).ok();
1180 }
1181
1182 loop {
1183 tokio::select! {
1184 sender = receiver.recv() => {
1185 match sender.ok() {
1186 Some(sender) => sender.send(connection.clone()).ok(),
1187 None => break,
1188 };
1189 }
1190 () = connection.await_disconnection() => break,
1191 }
1192 }
1193
1194 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1195
1196 backoff = api_networking_backoff();
1197 }
1198 Err(e) => {
1199 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1200
1201 fedimint_core::task::sleep(
1202 backoff.next().expect("No limit to the number of retries"),
1203 )
1204 .await;
1205 }
1206 }
1207 }
1208
1209 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1210 }
1211 .instrument(trace_span!("peer-api-connection", ?peer)),
1212 );
1213
1214 ClientConnection { sender }
1215 }
1216
1217 async fn connection(&self) -> Option<DynClientConnection> {
1218 let (sender, receiver) = oneshot::channel();
1219
1220 self.sender
1221 .send(sender)
1222 .await
1223 .inspect_err(|err| {
1224 warn!(
1225 target: LOG_CLIENT_NET_API,
1226 err = %err.fmt_compact(),
1227 "Api connection request channel closed unexpectedly"
1228 );
1229 })
1230 .ok()?;
1231
1232 receiver.await.ok()
1233 }
1234}
1235
1236#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1238pub struct LegacyFederationStatus {
1239 pub session_count: u64,
1240 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1241 pub peers_online: u64,
1242 pub peers_offline: u64,
1243 pub peers_flagged: u64,
1246 pub scheduled_shutdown: Option<u64>,
1247}
1248
1249#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1250pub struct LegacyPeerStatus {
1251 pub last_contribution: Option<u64>,
1252 pub connection_status: LegacyP2PConnectionStatus,
1253 pub flagged: bool,
1256}
1257
1258#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1259#[serde(rename_all = "snake_case")]
1260pub enum LegacyP2PConnectionStatus {
1261 #[default]
1262 Disconnected,
1263 Connected,
1264}
1265
1266#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1267pub struct StatusResponse {
1268 pub server: ServerStatusLegacy,
1269 pub federation: Option<LegacyFederationStatus>,
1270}
1271
1272#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1275pub struct GuardianConfigBackup {
1276 #[serde(with = "fedimint_core::hex::serde")]
1277 pub tar_archive_bytes: Vec<u8>,
1278}
1279
1280#[cfg(test)]
1281mod tests;