1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
24use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
25use fedimint_core::core::backup::SignedBackupRequest;
26use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
27use fedimint_core::encoding::{Decodable, Encodable};
28use fedimint_core::envs::{
29 FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
30};
31use fedimint_core::invite_code::InviteCode;
32use fedimint_core::module::audit::AuditSummary;
33use fedimint_core::module::registry::ModuleDecoderRegistry;
34use fedimint_core::module::{
35 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
36};
37use fedimint_core::net::api_announcement::SignedApiAnnouncement;
38use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
39use fedimint_core::task::{MaybeSend, MaybeSync};
40use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
41use fedimint_core::util::backoff_util::api_networking_backoff;
42use fedimint_core::util::{FmtCompact as _, SafeUrl};
43use fedimint_core::{
44 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
45};
46use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
47use futures::channel::oneshot;
48use futures::future::pending;
49use futures::stream::FuturesUnordered;
50use futures::{Future, StreamExt};
51use global_api::with_cache::GlobalFederationApiWithCache;
52use jsonrpsee_core::DeserializeOwned;
53use jsonrpsee_core::client::ClientT;
54pub use jsonrpsee_core::client::Error as JsonRpcClientError;
55use jsonrpsee_types::ErrorCode;
56#[cfg(target_family = "wasm")]
57use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
58#[cfg(not(target_family = "wasm"))]
59use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
60#[cfg(not(target_family = "wasm"))]
61use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
62use serde::{Deserialize, Serialize};
63use serde_json::Value;
64#[cfg(not(target_family = "wasm"))]
65use tokio_rustls::rustls::RootCertStore;
66#[cfg(all(feature = "tor", not(target_family = "wasm")))]
67use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
68use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
69
70use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
71
72pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
73
74pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
75 ApiVersion { major: 0, minor: 1 };
76
77pub type PeerResult<T> = Result<T, PeerError>;
78pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
79pub type FederationResult<T> = Result<T, FederationError>;
80pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
81
82pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
83
84#[cfg(not(target_family = "wasm"))]
85fn install_crypto_provider() {
86 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
93pub struct ApiVersionSet {
94 pub core: ApiVersion,
95 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
96}
97
98#[apply(async_trait_maybe_send!)]
100pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
101 fn all_peers(&self) -> &BTreeSet<PeerId>;
109
110 fn self_peer(&self) -> Option<PeerId>;
115
116 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
117
118 async fn request_raw(
120 &self,
121 peer_id: PeerId,
122 method: &str,
123 params: &ApiRequestErased,
124 ) -> PeerResult<Value>;
125}
126
127#[apply(async_trait_maybe_send!)]
130pub trait FederationApiExt: IRawFederationApi {
131 async fn request_single_peer<Ret>(
132 &self,
133 method: String,
134 params: ApiRequestErased,
135 peer: PeerId,
136 ) -> PeerResult<Ret>
137 where
138 Ret: DeserializeOwned,
139 {
140 self.request_raw(peer, &method, ¶ms)
141 .await
142 .and_then(|v| {
143 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
144 })
145 }
146
147 async fn request_single_peer_federation<FedRet>(
148 &self,
149 method: String,
150 params: ApiRequestErased,
151 peer_id: PeerId,
152 ) -> FederationResult<FedRet>
153 where
154 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
155 {
156 self.request_raw(peer_id, &method, ¶ms)
157 .await
158 .and_then(|v| {
159 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
160 })
161 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
162 }
163
164 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
167 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
168 &self,
169 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
170 method: String,
171 params: ApiRequestErased,
172 ) -> FederationResult<FR> {
173 #[cfg(not(target_family = "wasm"))]
177 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
178 #[cfg(target_family = "wasm")]
179 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
180
181 for peer in self.all_peers() {
182 futures.push(Box::pin({
183 let method = &method;
184 let params = ¶ms;
185 async move {
186 let result = self
187 .request_single_peer(method.clone(), params.clone(), *peer)
188 .await;
189
190 (*peer, result)
191 }
192 }));
193 }
194
195 let mut peer_errors = BTreeMap::new();
196 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
197
198 loop {
199 let (peer, result) = futures
200 .next()
201 .await
202 .expect("Query strategy ran out of peers to query without returning a result");
203
204 match result {
205 Ok(response) => match strategy.process(peer, response) {
206 QueryStep::Retry(peers) => {
207 for peer in peers {
208 futures.push(Box::pin({
209 let method = &method;
210 let params = ¶ms;
211 async move {
212 let result = self
213 .request_single_peer(method.clone(), params.clone(), peer)
214 .await;
215
216 (peer, result)
217 }
218 }));
219 }
220 }
221 QueryStep::Success(response) => return Ok(response),
222 QueryStep::Failure(e) => {
223 peer_errors.insert(peer, e);
224 }
225 QueryStep::Continue => {}
226 },
227 Err(e) => {
228 e.report_if_unusual(peer, "RequestWithStrategy");
229 peer_errors.insert(peer, e);
230 }
231 }
232
233 if peer_errors.len() == peer_error_threshold {
234 return Err(FederationError::peer_errors(
235 method.clone(),
236 params.params.clone(),
237 peer_errors,
238 ));
239 }
240 }
241 }
242
243 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
244 &self,
245 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
246 method: String,
247 params: ApiRequestErased,
248 ) -> FR {
249 #[cfg(not(target_family = "wasm"))]
253 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
254 #[cfg(target_family = "wasm")]
255 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
256
257 for peer in self.all_peers() {
258 futures.push(Box::pin({
259 let method = &method;
260 let params = ¶ms;
261 async move {
262 let response = util::retry(
263 format!("api-request-{method}-{peer}"),
264 api_networking_backoff(),
265 || async {
266 self.request_single_peer(method.clone(), params.clone(), *peer)
267 .await
268 .inspect_err(|e| {
269 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
270 })
271 .map_err(|e| anyhow!(e.to_string()))
272 },
273 )
274 .await
275 .expect("Number of retries has no limit");
276
277 (*peer, response)
278 }
279 }));
280 }
281
282 loop {
283 let (peer, response) = match futures.next().await {
284 Some(t) => t,
285 None => pending().await,
286 };
287
288 match strategy.process(peer, response) {
289 QueryStep::Retry(peers) => {
290 for peer in peers {
291 futures.push(Box::pin({
292 let method = &method;
293 let params = ¶ms;
294 async move {
295 let response = util::retry(
296 format!("api-request-{method}-{peer}"),
297 api_networking_backoff(),
298 || async {
299 self.request_single_peer(
300 method.clone(),
301 params.clone(),
302 peer,
303 )
304 .await
305 .inspect_err(|err| {
306 if err.is_unusual() {
307 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
308 }
309 })
310 .map_err(|e| anyhow!(e.to_string()))
311 },
312 )
313 .await
314 .expect("Number of retries has no limit");
315
316 (peer, response)
317 }
318 }));
319 }
320 }
321 QueryStep::Success(response) => return response,
322 QueryStep::Failure(e) => {
323 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
324 }
325 QueryStep::Continue => {}
326 }
327 }
328 }
329
330 async fn request_current_consensus<Ret>(
331 &self,
332 method: String,
333 params: ApiRequestErased,
334 ) -> FederationResult<Ret>
335 where
336 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
337 {
338 self.request_with_strategy(
339 ThresholdConsensus::new(self.all_peers().to_num_peers()),
340 method,
341 params,
342 )
343 .await
344 }
345
346 async fn request_current_consensus_retry<Ret>(
347 &self,
348 method: String,
349 params: ApiRequestErased,
350 ) -> Ret
351 where
352 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
353 {
354 self.request_with_strategy_retry(
355 ThresholdConsensus::new(self.all_peers().to_num_peers()),
356 method,
357 params,
358 )
359 .await
360 }
361
362 async fn request_admin<Ret>(
363 &self,
364 method: &str,
365 params: ApiRequestErased,
366 auth: ApiAuth,
367 ) -> FederationResult<Ret>
368 where
369 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
370 {
371 let Some(self_peer_id) = self.self_peer() else {
372 return Err(FederationError::general(
373 method,
374 params,
375 anyhow::format_err!("Admin peer_id not set"),
376 ));
377 };
378
379 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
380 .await
381 }
382
383 async fn request_admin_no_auth<Ret>(
384 &self,
385 method: &str,
386 params: ApiRequestErased,
387 ) -> FederationResult<Ret>
388 where
389 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
390 {
391 let Some(self_peer_id) = self.self_peer() else {
392 return Err(FederationError::general(
393 method,
394 params,
395 anyhow::format_err!("Admin peer_id not set"),
396 ));
397 };
398
399 self.request_single_peer_federation(method.into(), params, self_peer_id)
400 .await
401 }
402}
403
404#[apply(async_trait_maybe_send!)]
405impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
406
407pub trait IModuleFederationApi: IRawFederationApi {}
409
410dyn_newtype_define! {
411 #[derive(Clone)]
412 pub DynModuleApi(Arc<IModuleFederationApi>)
413}
414
415dyn_newtype_define! {
416 #[derive(Clone)]
417 pub DynGlobalApi(Arc<IGlobalFederationApi>)
418}
419
420impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
421 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
422 self.inner.as_ref()
423 }
424}
425
426impl DynGlobalApi {
427 pub async fn new_admin(
428 peer: PeerId,
429 url: SafeUrl,
430 api_secret: &Option<String>,
431 ) -> anyhow::Result<DynGlobalApi> {
432 Ok(GlobalFederationApiWithCache::new(
433 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
434 .await?,
435 )
436 .into())
437 }
438
439 pub async fn from_setup_endpoint(
443 url: SafeUrl,
444 api_secret: &Option<String>,
445 ) -> anyhow::Result<Self> {
446 Self::new_admin(PeerId::from(1024), url, api_secret).await
450 }
451
452 pub async fn from_endpoints(
453 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
454 api_secret: &Option<String>,
455 ) -> anyhow::Result<Self> {
456 Ok(GlobalFederationApiWithCache::new(
457 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
458 )
459 .into())
460 }
461}
462
463#[apply(async_trait_maybe_send!)]
465pub trait IGlobalFederationApi: IRawFederationApi {
466 async fn submit_transaction(
467 &self,
468 tx: Transaction,
469 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
470
471 async fn await_block(
472 &self,
473 block_index: u64,
474 decoders: &ModuleDecoderRegistry,
475 ) -> anyhow::Result<SessionOutcome>;
476
477 async fn get_session_status(
478 &self,
479 block_index: u64,
480 decoders: &ModuleDecoderRegistry,
481 core_api_version: ApiVersion,
482 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
483 ) -> anyhow::Result<SessionStatus>;
484
485 async fn session_count(&self) -> FederationResult<u64>;
486
487 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
488
489 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
490
491 async fn download_backup(
492 &self,
493 id: &secp256k1::PublicKey,
494 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
495
496 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
500
501 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
502
503 async fn set_local_params(
504 &self,
505 name: String,
506 federation_name: Option<String>,
507 auth: ApiAuth,
508 ) -> FederationResult<String>;
509
510 async fn add_peer_connection_info(
511 &self,
512 info: String,
513 auth: ApiAuth,
514 ) -> FederationResult<String>;
515
516 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
518
519 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
521
522 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
530
531 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
536
537 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
541
542 async fn get_verify_config_hash(
545 &self,
546 auth: ApiAuth,
547 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
548
549 async fn verified_configs(
552 &self,
553 auth: ApiAuth,
554 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
555
556 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
562
563 async fn status(&self) -> FederationResult<StatusResponse>;
565
566 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
568
569 async fn guardian_config_backup(&self, auth: ApiAuth)
571 -> FederationResult<GuardianConfigBackup>;
572
573 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
575
576 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
577
578 async fn submit_api_announcement(
580 &self,
581 peer_id: PeerId,
582 announcement: SignedApiAnnouncement,
583 ) -> FederationResult<()>;
584
585 async fn api_announcements(
586 &self,
587 guardian: PeerId,
588 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
589
590 async fn sign_api_announcement(
591 &self,
592 api_url: SafeUrl,
593 auth: ApiAuth,
594 ) -> FederationResult<SignedApiAnnouncement>;
595
596 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
597
598 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
600
601 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
603
604 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
607}
608
609pub fn deserialize_outcome<R>(
610 outcome: &SerdeOutputOutcome,
611 module_decoder: &Decoder,
612) -> OutputOutcomeResult<R>
613where
614 R: OutputOutcome + MaybeSend,
615{
616 let dyn_outcome = outcome
617 .try_into_inner_known_module_kind(module_decoder)
618 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
619
620 let source_instance = dyn_outcome.module_instance_id();
621
622 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
623 let target_type = std::any::type_name::<R>();
624 OutputOutcomeError::ResponseDeserialization(anyhow!(
625 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
626 ))
627 })
628}
629
630#[derive(Debug, Clone)]
631pub struct WebsocketConnector {
632 peers: BTreeMap<PeerId, SafeUrl>,
633 api_secret: Option<String>,
634
635 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
641}
642
643impl WebsocketConnector {
644 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
645 let mut s = Self::new_no_overrides(peers, api_secret);
646
647 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
648 s = s.with_connection_override(k, v);
649 }
650
651 Ok(s)
652 }
653 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
654 self.connection_overrides.insert(peer_id, url);
655 self
656 }
657 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
658 Self {
659 peers,
660 api_secret,
661 connection_overrides: BTreeMap::default(),
662 }
663 }
664}
665
666#[async_trait]
667impl IClientConnector for WebsocketConnector {
668 fn peers(&self) -> BTreeSet<PeerId> {
669 self.peers.keys().copied().collect()
670 }
671
672 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
673 let api_endpoint = match self.connection_overrides.get(&peer_id) {
674 Some(url) => {
675 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
676 url
677 }
678 None => self.peers.get(&peer_id).ok_or_else(|| {
679 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
680 })?,
681 };
682
683 #[cfg(not(target_family = "wasm"))]
684 let mut client = {
685 install_crypto_provider();
686 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
687 let mut root_certs = RootCertStore::empty();
688 root_certs.extend(webpki_roots);
689
690 let tls_cfg = CustomCertStore::builder()
691 .with_root_certificates(root_certs)
692 .with_no_client_auth();
693
694 WsClientBuilder::default()
695 .max_concurrent_requests(u16::MAX as usize)
696 .with_custom_cert_store(tls_cfg)
697 };
698
699 #[cfg(target_family = "wasm")]
700 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
701
702 if let Some(api_secret) = &self.api_secret {
703 #[cfg(not(target_family = "wasm"))]
704 {
705 let mut headers = HeaderMap::new();
708
709 let auth = base64::engine::general_purpose::STANDARD
710 .encode(format!("fedimint:{api_secret}"));
711
712 headers.insert(
713 "Authorization",
714 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
715 );
716
717 client = client.set_headers(headers);
718 }
719 #[cfg(target_family = "wasm")]
720 {
721 let mut url = api_endpoint.clone();
724 url.set_username("fedimint")
725 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
726 url.set_password(Some(&api_secret))
727 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
728
729 let client = client
730 .build(url.as_str())
731 .await
732 .map_err(|err| PeerError::InternalClientError(err.into()))?;
733
734 return Ok(client.into_dyn());
735 }
736 }
737
738 let client = client
739 .build(api_endpoint.as_str())
740 .await
741 .map_err(|err| PeerError::InternalClientError(err.into()))?;
742
743 Ok(client.into_dyn())
744 }
745}
746
747#[cfg(all(feature = "tor", not(target_family = "wasm")))]
748#[derive(Debug, Clone)]
749pub struct TorConnector {
750 peers: BTreeMap<PeerId, SafeUrl>,
751 api_secret: Option<String>,
752}
753
754#[cfg(all(feature = "tor", not(target_family = "wasm")))]
755impl TorConnector {
756 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
757 Self { peers, api_secret }
758 }
759}
760
761#[cfg(all(feature = "tor", not(target_family = "wasm")))]
762#[async_trait]
763impl IClientConnector for TorConnector {
764 fn peers(&self) -> BTreeSet<PeerId> {
765 self.peers.keys().copied().collect()
766 }
767
768 #[allow(clippy::too_many_lines)]
769 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
770 let api_endpoint = self
771 .peers
772 .get(&peer_id)
773 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
774
775 install_crypto_provider();
776
777 let tor_config = TorClientConfig::default();
778 let tor_client = TorClient::create_bootstrapped(tor_config)
779 .await
780 .map_err(|err| PeerError::InternalClientError(err.into()))?
781 .isolated_client();
782
783 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
784
785 let addr = (
788 api_endpoint
789 .host_str()
790 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
791 api_endpoint
792 .port_or_known_default()
793 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
794 );
795 let tor_addr = TorAddr::from(addr).map_err(|e| {
796 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
797 })?;
798
799 let tor_addr_clone = tor_addr.clone();
800
801 debug!(
802 ?tor_addr,
803 ?addr,
804 "Successfully created `TorAddr` for given address (i.e. host and port)"
805 );
806
807 let anonymized_stream = if api_endpoint.is_onion_address() {
810 let mut stream_prefs = arti_client::StreamPrefs::default();
811 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
812
813 let anonymized_stream = tor_client
814 .connect_with_prefs(tor_addr, &stream_prefs)
815 .await
816 .map_err(|e| PeerError::Connection(e.into()))?;
817
818 debug!(
819 ?tor_addr_clone,
820 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
821 );
822 anonymized_stream
823 } else {
824 let anonymized_stream = tor_client
825 .connect(tor_addr)
826 .await
827 .map_err(|e| PeerError::Connection(e.into()))?;
828
829 debug!(
830 ?tor_addr_clone,
831 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
832 );
833 anonymized_stream
834 };
835
836 let is_tls = match api_endpoint.scheme() {
837 "wss" => true,
838 "ws" => false,
839 unexpected_scheme => {
840 return Err(PeerError::InvalidEndpoint(anyhow!(
841 "Unsupported scheme: {unexpected_scheme}"
842 )));
843 }
844 };
845
846 let tls_connector = if is_tls {
847 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
848 let mut root_certs = RootCertStore::empty();
849 root_certs.extend(webpki_roots);
850
851 let tls_config = TlsClientConfig::builder()
852 .with_root_certificates(root_certs)
853 .with_no_client_auth();
854 let tls_connector = TlsConnector::from(Arc::new(tls_config));
855 Some(tls_connector)
856 } else {
857 None
858 };
859
860 let mut ws_client_builder =
861 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
862
863 if let Some(api_secret) = &self.api_secret {
864 let mut headers = HeaderMap::new();
867
868 let auth =
869 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
870
871 headers.insert(
872 "Authorization",
873 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
874 );
875
876 ws_client_builder = ws_client_builder.set_headers(headers);
877 }
878
879 match tls_connector {
880 None => {
881 let client = ws_client_builder
882 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
883 .await
884 .map_err(|e| PeerError::Connection(e.into()))?;
885
886 Ok(client.into_dyn())
887 }
888 Some(tls_connector) => {
889 let host = api_endpoint
890 .host_str()
891 .map(ToOwned::to_owned)
892 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
893
894 let server_name = rustls_pki_types::ServerName::try_from(host)
897 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
898
899 let anonymized_tls_stream = tls_connector
900 .connect(server_name, anonymized_stream)
901 .await
902 .map_err(|e| PeerError::Connection(e.into()))?;
903
904 let client = ws_client_builder
905 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
906 .await
907 .map_err(|e| PeerError::Connection(e.into()))?;
908
909 Ok(client.into_dyn())
910 }
911 }
912 }
913}
914
915fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
916 match jsonrpc_error {
917 JsonRpcClientError::Call(error_object) => {
918 let error = anyhow!(error_object.message().to_owned());
919 match ErrorCode::from(error_object.code()) {
920 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
921 PeerError::InvalidRequest(error)
922 }
923 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
924 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
925 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
926 PeerError::ServerError(error)
927 }
928 }
929 }
930 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
931 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
932 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
933 JsonRpcClientError::InvalidSubscriptionId => {
934 PeerError::Transport(anyhow!("Invalid subscription id"))
935 }
936 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
937 PeerError::InvalidRequest(anyhow!(invalid_request_id))
938 }
939 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
940 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
941 JsonRpcClientError::HttpNotImplemented => {
942 PeerError::ServerError(anyhow!("Http not implemented"))
943 }
944 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
945 PeerError::InvalidRequest(anyhow!(empty_batch_request))
946 }
947 JsonRpcClientError::RegisterMethod(register_method_error) => {
948 PeerError::InvalidResponse(anyhow!(register_method_error))
949 }
950 }
951}
952
953#[async_trait]
954impl IClientConnection for WsClient {
955 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
956 let method = match method {
957 ApiMethod::Core(method) => method,
958 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
959 };
960
961 Ok(ClientT::request(self, &method, [request.to_json()])
962 .await
963 .map_err(jsonrpc_error_to_peer_error)?)
964 }
965
966 async fn await_disconnection(&self) {
967 self.on_disconnect().await;
968 }
969}
970
971pub type DynClientConnector = Arc<dyn IClientConnector>;
972
973#[async_trait]
976pub trait IClientConnector: Send + Sync + 'static {
977 fn peers(&self) -> BTreeSet<PeerId>;
978
979 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
980
981 fn into_dyn(self) -> DynClientConnector
982 where
983 Self: Sized,
984 {
985 Arc::new(self)
986 }
987}
988
989pub type DynClientConnection = Arc<dyn IClientConnection>;
990
991#[async_trait]
992pub trait IClientConnection: Debug + Send + Sync + 'static {
993 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
994
995 async fn await_disconnection(&self);
996
997 fn into_dyn(self) -> DynClientConnection
998 where
999 Self: Sized,
1000 {
1001 Arc::new(self)
1002 }
1003}
1004
1005#[derive(Clone, Debug)]
1006pub struct ReconnectFederationApi {
1007 peers: BTreeSet<PeerId>,
1008 admin_id: Option<PeerId>,
1009 module_id: Option<ModuleInstanceId>,
1010 connections: ReconnectClientConnections,
1011}
1012
1013impl ReconnectFederationApi {
1014 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1015 Self {
1016 peers: connector.peers(),
1017 admin_id,
1018 module_id: None,
1019 connections: ReconnectClientConnections::new(connector),
1020 }
1021 }
1022
1023 pub async fn new_admin(
1024 peer: PeerId,
1025 url: SafeUrl,
1026 api_secret: &Option<String>,
1027 ) -> anyhow::Result<Self> {
1028 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1029 }
1030
1031 pub async fn from_endpoints(
1032 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1033 api_secret: &Option<String>,
1034 admin_id: Option<PeerId>,
1035 ) -> anyhow::Result<Self> {
1036 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1037
1038 let scheme = peers
1039 .values()
1040 .next()
1041 .expect("Federation api has been initialized with no peers")
1042 .scheme();
1043
1044 let connector = match scheme {
1045 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1046 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1047 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1048 "iroh" => {
1049 let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1050 .ok()
1051 .and_then(|dns| dns.parse().ok());
1052 iroh::IrohConnector::new(peers, iroh_dns).await?.into_dyn()
1053 }
1054 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1055 };
1056
1057 Ok(ReconnectFederationApi::new(&connector, admin_id))
1058 }
1059}
1060
1061impl IModuleFederationApi for ReconnectFederationApi {}
1062
1063#[apply(async_trait_maybe_send!)]
1064impl IRawFederationApi for ReconnectFederationApi {
1065 fn all_peers(&self) -> &BTreeSet<PeerId> {
1066 &self.peers
1067 }
1068
1069 fn self_peer(&self) -> Option<PeerId> {
1070 self.admin_id
1071 }
1072
1073 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1074 ReconnectFederationApi {
1075 peers: self.peers.clone(),
1076 admin_id: self.admin_id,
1077 module_id: Some(id),
1078 connections: self.connections.clone(),
1079 }
1080 .into()
1081 }
1082
1083 #[instrument(
1084 target = LOG_NET_API,
1085 skip_all,
1086 fields(
1087 peer_id = %peer_id,
1088 method = %method,
1089 params = %params.params,
1090 )
1091 )]
1092 async fn request_raw(
1093 &self,
1094 peer_id: PeerId,
1095 method: &str,
1096 params: &ApiRequestErased,
1097 ) -> PeerResult<Value> {
1098 let method = match self.module_id {
1099 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1100 None => ApiMethod::Core(method.to_string()),
1101 };
1102
1103 self.connections
1104 .request(peer_id, method, params.clone())
1105 .await
1106 }
1107}
1108
1109#[derive(Clone, Debug)]
1110pub struct ReconnectClientConnections {
1111 connections: BTreeMap<PeerId, ClientConnection>,
1112}
1113
1114impl ReconnectClientConnections {
1115 pub fn new(connector: &DynClientConnector) -> Self {
1116 ReconnectClientConnections {
1117 connections: connector
1118 .peers()
1119 .into_iter()
1120 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1121 .collect(),
1122 }
1123 }
1124
1125 async fn request(
1126 &self,
1127 peer: PeerId,
1128 method: ApiMethod,
1129 request: ApiRequestErased,
1130 ) -> PeerResult<Value> {
1131 trace!(target: LOG_NET_API, %method, "Api request");
1132 let res = self
1133 .connections
1134 .get(&peer)
1135 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1136 .connection()
1137 .await
1138 .context("Failed to connect to peer")
1139 .map_err(PeerError::Connection)?
1140 .request(method.clone(), request)
1141 .await;
1142
1143 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1144
1145 res
1146 }
1147}
1148
1149#[derive(Clone, Debug)]
1150struct ClientConnection {
1151 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1152}
1153
1154impl ClientConnection {
1155 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1156 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1157
1158 fedimint_core::task::spawn(
1159 "peer-api-connection",
1160 async move {
1161 let mut backoff = api_networking_backoff();
1162
1163 while let Ok(sender) = receiver.recv().await {
1164 let mut senders = vec![sender];
1165
1166 while let Ok(sender) = receiver.try_recv() {
1169 senders.push(sender);
1170 }
1171
1172 match connector.connect(peer).await {
1173 Ok(connection) => {
1174 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1175
1176 for sender in senders {
1177 sender.send(connection.clone()).ok();
1178 }
1179
1180 loop {
1181 tokio::select! {
1182 sender = receiver.recv() => {
1183 match sender.ok() {
1184 Some(sender) => sender.send(connection.clone()).ok(),
1185 None => break,
1186 };
1187 }
1188 () = connection.await_disconnection() => break,
1189 }
1190 }
1191
1192 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1193
1194 backoff = api_networking_backoff();
1195 }
1196 Err(e) => {
1197 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1198
1199 fedimint_core::task::sleep(
1200 backoff.next().expect("No limit to the number of retries"),
1201 )
1202 .await;
1203 }
1204 }
1205 }
1206
1207 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1208 }
1209 .instrument(trace_span!("peer-api-connection", ?peer)),
1210 );
1211
1212 ClientConnection { sender }
1213 }
1214
1215 async fn connection(&self) -> Option<DynClientConnection> {
1216 let (sender, receiver) = oneshot::channel();
1217
1218 self.sender
1219 .send(sender)
1220 .await
1221 .inspect_err(|err| {
1222 warn!(
1223 target: LOG_CLIENT_NET_API,
1224 err = %err.fmt_compact(),
1225 "Api connection request channel closed unexpectedly"
1226 );
1227 })
1228 .ok()?;
1229
1230 receiver.await.ok()
1231 }
1232}
1233
1234#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1236pub struct LegacyFederationStatus {
1237 pub session_count: u64,
1238 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1239 pub peers_online: u64,
1240 pub peers_offline: u64,
1241 pub peers_flagged: u64,
1244 pub scheduled_shutdown: Option<u64>,
1245}
1246
1247#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1248pub struct LegacyPeerStatus {
1249 pub last_contribution: Option<u64>,
1250 pub connection_status: LegacyP2PConnectionStatus,
1251 pub flagged: bool,
1254}
1255
1256#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1257#[serde(rename_all = "snake_case")]
1258pub enum LegacyP2PConnectionStatus {
1259 #[default]
1260 Disconnected,
1261 Connected,
1262}
1263
1264#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1265pub struct StatusResponse {
1266 pub server: ServerStatusLegacy,
1267 pub federation: Option<LegacyFederationStatus>,
1268}
1269
1270#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1273pub struct GuardianConfigBackup {
1274 #[serde(with = "fedimint_core::hex::serde")]
1275 pub tar_archive_bytes: Vec<u8>,
1276}
1277
1278#[cfg(test)]
1279mod tests;