1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24 GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{
31 FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
32};
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::audit::AuditSummary;
35use fedimint_core::module::registry::ModuleDecoderRegistry;
36use fedimint_core::module::{
37 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
38};
39use fedimint_core::net::api_announcement::SignedApiAnnouncement;
40#[cfg(not(target_family = "wasm"))]
41use fedimint_core::rustls::install_crypto_provider;
42use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
43use fedimint_core::task::{MaybeSend, MaybeSync};
44use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
45use fedimint_core::util::backoff_util::api_networking_backoff;
46use fedimint_core::util::{FmtCompact as _, SafeUrl};
47use fedimint_core::{
48 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
49};
50use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
51use futures::channel::oneshot;
52use futures::future::pending;
53use futures::stream::FuturesUnordered;
54use futures::{Future, StreamExt};
55use global_api::with_cache::GlobalFederationApiWithCache;
56use jsonrpsee_core::DeserializeOwned;
57use jsonrpsee_core::client::ClientT;
58pub use jsonrpsee_core::client::Error as JsonRpcClientError;
59use jsonrpsee_types::ErrorCode;
60#[cfg(target_family = "wasm")]
61use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
62#[cfg(not(target_family = "wasm"))]
63use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
64#[cfg(not(target_family = "wasm"))]
65use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
66use serde::{Deserialize, Serialize};
67use serde_json::Value;
68#[cfg(not(target_family = "wasm"))]
69use tokio_rustls::rustls::RootCertStore;
70#[cfg(all(feature = "tor", not(target_family = "wasm")))]
71use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
72use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
73
74use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
75
76pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
77
78pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
79 ApiVersion { major: 0, minor: 1 };
80
81pub type PeerResult<T> = Result<T, PeerError>;
82pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
83pub type FederationResult<T> = Result<T, FederationError>;
84pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
85
86pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
87
88#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
92pub struct ApiVersionSet {
93 pub core: ApiVersion,
94 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
95}
96
97#[apply(async_trait_maybe_send!)]
99pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
100 fn all_peers(&self) -> &BTreeSet<PeerId>;
108
109 fn self_peer(&self) -> Option<PeerId>;
114
115 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
116
117 async fn request_raw(
119 &self,
120 peer_id: PeerId,
121 method: &str,
122 params: &ApiRequestErased,
123 ) -> PeerResult<Value>;
124}
125
126#[apply(async_trait_maybe_send!)]
129pub trait FederationApiExt: IRawFederationApi {
130 async fn request_single_peer<Ret>(
131 &self,
132 method: String,
133 params: ApiRequestErased,
134 peer: PeerId,
135 ) -> PeerResult<Ret>
136 where
137 Ret: DeserializeOwned,
138 {
139 self.request_raw(peer, &method, ¶ms)
140 .await
141 .and_then(|v| {
142 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
143 })
144 }
145
146 async fn request_single_peer_federation<FedRet>(
147 &self,
148 method: String,
149 params: ApiRequestErased,
150 peer_id: PeerId,
151 ) -> FederationResult<FedRet>
152 where
153 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
154 {
155 self.request_raw(peer_id, &method, ¶ms)
156 .await
157 .and_then(|v| {
158 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
159 })
160 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
161 }
162
163 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
166 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
167 &self,
168 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
169 method: String,
170 params: ApiRequestErased,
171 ) -> FederationResult<FR> {
172 #[cfg(not(target_family = "wasm"))]
176 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
177 #[cfg(target_family = "wasm")]
178 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
179
180 for peer in self.all_peers() {
181 futures.push(Box::pin({
182 let method = &method;
183 let params = ¶ms;
184 async move {
185 let result = self
186 .request_single_peer(method.clone(), params.clone(), *peer)
187 .await;
188
189 (*peer, result)
190 }
191 }));
192 }
193
194 let mut peer_errors = BTreeMap::new();
195 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
196
197 loop {
198 let (peer, result) = futures
199 .next()
200 .await
201 .expect("Query strategy ran out of peers to query without returning a result");
202
203 match result {
204 Ok(response) => match strategy.process(peer, response) {
205 QueryStep::Retry(peers) => {
206 for peer in peers {
207 futures.push(Box::pin({
208 let method = &method;
209 let params = ¶ms;
210 async move {
211 let result = self
212 .request_single_peer(method.clone(), params.clone(), peer)
213 .await;
214
215 (peer, result)
216 }
217 }));
218 }
219 }
220 QueryStep::Success(response) => return Ok(response),
221 QueryStep::Failure(e) => {
222 peer_errors.insert(peer, e);
223 }
224 QueryStep::Continue => {}
225 },
226 Err(e) => {
227 e.report_if_unusual(peer, "RequestWithStrategy");
228 peer_errors.insert(peer, e);
229 }
230 }
231
232 if peer_errors.len() == peer_error_threshold {
233 return Err(FederationError::peer_errors(
234 method.clone(),
235 params.params.clone(),
236 peer_errors,
237 ));
238 }
239 }
240 }
241
242 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
243 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
244 &self,
245 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
246 method: String,
247 params: ApiRequestErased,
248 ) -> FR {
249 #[cfg(not(target_family = "wasm"))]
253 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
254 #[cfg(target_family = "wasm")]
255 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
256
257 for peer in self.all_peers() {
258 futures.push(Box::pin({
259 let method = &method;
260 let params = ¶ms;
261 async move {
262 let response = util::retry(
263 format!("api-request-{method}-{peer}"),
264 api_networking_backoff(),
265 || async {
266 self.request_single_peer(method.clone(), params.clone(), *peer)
267 .await
268 .inspect_err(|e| {
269 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
270 })
271 .map_err(|e| anyhow!(e.to_string()))
272 },
273 )
274 .await
275 .expect("Number of retries has no limit");
276
277 (*peer, response)
278 }
279 }));
280 }
281
282 loop {
283 let (peer, response) = match futures.next().await {
284 Some(t) => t,
285 None => pending().await,
286 };
287
288 match strategy.process(peer, response) {
289 QueryStep::Retry(peers) => {
290 for peer in peers {
291 futures.push(Box::pin({
292 let method = &method;
293 let params = ¶ms;
294 async move {
295 let response = util::retry(
296 format!("api-request-{method}-{peer}"),
297 api_networking_backoff(),
298 || async {
299 self.request_single_peer(
300 method.clone(),
301 params.clone(),
302 peer,
303 )
304 .await
305 .inspect_err(|err| {
306 if err.is_unusual() {
307 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
308 }
309 })
310 .map_err(|e| anyhow!(e.to_string()))
311 },
312 )
313 .await
314 .expect("Number of retries has no limit");
315
316 (peer, response)
317 }
318 }));
319 }
320 }
321 QueryStep::Success(response) => return response,
322 QueryStep::Failure(e) => {
323 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
324 }
325 QueryStep::Continue => {}
326 }
327 }
328 }
329
330 async fn request_current_consensus<Ret>(
331 &self,
332 method: String,
333 params: ApiRequestErased,
334 ) -> FederationResult<Ret>
335 where
336 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
337 {
338 self.request_with_strategy(
339 ThresholdConsensus::new(self.all_peers().to_num_peers()),
340 method,
341 params,
342 )
343 .await
344 }
345
346 async fn request_current_consensus_retry<Ret>(
347 &self,
348 method: String,
349 params: ApiRequestErased,
350 ) -> Ret
351 where
352 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
353 {
354 self.request_with_strategy_retry(
355 ThresholdConsensus::new(self.all_peers().to_num_peers()),
356 method,
357 params,
358 )
359 .await
360 }
361
362 async fn request_admin<Ret>(
363 &self,
364 method: &str,
365 params: ApiRequestErased,
366 auth: ApiAuth,
367 ) -> FederationResult<Ret>
368 where
369 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
370 {
371 let Some(self_peer_id) = self.self_peer() else {
372 return Err(FederationError::general(
373 method,
374 params,
375 anyhow::format_err!("Admin peer_id not set"),
376 ));
377 };
378
379 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
380 .await
381 }
382
383 async fn request_admin_no_auth<Ret>(
384 &self,
385 method: &str,
386 params: ApiRequestErased,
387 ) -> FederationResult<Ret>
388 where
389 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
390 {
391 let Some(self_peer_id) = self.self_peer() else {
392 return Err(FederationError::general(
393 method,
394 params,
395 anyhow::format_err!("Admin peer_id not set"),
396 ));
397 };
398
399 self.request_single_peer_federation(method.into(), params, self_peer_id)
400 .await
401 }
402}
403
404#[apply(async_trait_maybe_send!)]
405impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
406
407pub trait IModuleFederationApi: IRawFederationApi {}
409
410dyn_newtype_define! {
411 #[derive(Clone)]
412 pub DynModuleApi(Arc<IModuleFederationApi>)
413}
414
415dyn_newtype_define! {
416 #[derive(Clone)]
417 pub DynGlobalApi(Arc<IGlobalFederationApi>)
418}
419
420impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
421 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
422 self.inner.as_ref()
423 }
424}
425
426impl DynGlobalApi {
427 pub async fn new_admin(
428 peer: PeerId,
429 url: SafeUrl,
430 api_secret: &Option<String>,
431 ) -> anyhow::Result<DynGlobalApi> {
432 Ok(GlobalFederationApiWithCache::new(
433 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
434 .await?,
435 )
436 .into())
437 }
438
439 pub async fn from_setup_endpoint(
443 url: SafeUrl,
444 api_secret: &Option<String>,
445 ) -> anyhow::Result<Self> {
446 Self::new_admin(PeerId::from(1024), url, api_secret).await
450 }
451
452 pub async fn from_endpoints(
453 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
454 api_secret: &Option<String>,
455 ) -> anyhow::Result<Self> {
456 Ok(GlobalFederationApiWithCache::new(
457 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
458 )
459 .into())
460 }
461}
462
463#[apply(async_trait_maybe_send!)]
465pub trait IGlobalFederationApi: IRawFederationApi {
466 async fn submit_transaction(
467 &self,
468 tx: Transaction,
469 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
470
471 async fn await_block(
472 &self,
473 block_index: u64,
474 decoders: &ModuleDecoderRegistry,
475 ) -> anyhow::Result<SessionOutcome>;
476
477 async fn get_session_status(
478 &self,
479 block_index: u64,
480 decoders: &ModuleDecoderRegistry,
481 core_api_version: ApiVersion,
482 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
483 ) -> anyhow::Result<SessionStatus>;
484
485 async fn session_count(&self) -> FederationResult<u64>;
486
487 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
488
489 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
490
491 async fn download_backup(
492 &self,
493 id: &secp256k1::PublicKey,
494 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
495
496 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
500
501 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
502
503 async fn set_local_params(
504 &self,
505 name: String,
506 federation_name: Option<String>,
507 auth: ApiAuth,
508 ) -> FederationResult<String>;
509
510 async fn add_peer_connection_info(
511 &self,
512 info: String,
513 auth: ApiAuth,
514 ) -> FederationResult<String>;
515
516 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
518
519 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
521
522 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
530
531 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
536
537 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
541
542 async fn get_verify_config_hash(
545 &self,
546 auth: ApiAuth,
547 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
548
549 async fn verified_configs(
552 &self,
553 auth: ApiAuth,
554 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
555
556 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
562
563 async fn status(&self) -> FederationResult<StatusResponse>;
565
566 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
568
569 async fn guardian_config_backup(&self, auth: ApiAuth)
571 -> FederationResult<GuardianConfigBackup>;
572
573 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
575
576 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
577
578 async fn submit_api_announcement(
580 &self,
581 peer_id: PeerId,
582 announcement: SignedApiAnnouncement,
583 ) -> FederationResult<()>;
584
585 async fn api_announcements(
586 &self,
587 guardian: PeerId,
588 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
589
590 async fn sign_api_announcement(
591 &self,
592 api_url: SafeUrl,
593 auth: ApiAuth,
594 ) -> FederationResult<SignedApiAnnouncement>;
595
596 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
597
598 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
600
601 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
603
604 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
607}
608
609pub fn deserialize_outcome<R>(
610 outcome: &SerdeOutputOutcome,
611 module_decoder: &Decoder,
612) -> OutputOutcomeResult<R>
613where
614 R: OutputOutcome + MaybeSend,
615{
616 let dyn_outcome = outcome
617 .try_into_inner_known_module_kind(module_decoder)
618 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
619
620 let source_instance = dyn_outcome.module_instance_id();
621
622 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
623 let target_type = std::any::type_name::<R>();
624 OutputOutcomeError::ResponseDeserialization(anyhow!(
625 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
626 ))
627 })
628}
629
630#[derive(Debug, Clone)]
631pub struct WebsocketConnector {
632 peers: BTreeMap<PeerId, SafeUrl>,
633 api_secret: Option<String>,
634
635 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
641}
642
643impl WebsocketConnector {
644 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
645 let mut s = Self::new_no_overrides(peers, api_secret);
646
647 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
648 s = s.with_connection_override(k, v);
649 }
650
651 Ok(s)
652 }
653 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
654 self.connection_overrides.insert(peer_id, url);
655 self
656 }
657 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
658 Self {
659 peers,
660 api_secret,
661 connection_overrides: BTreeMap::default(),
662 }
663 }
664}
665
666#[async_trait]
667impl IClientConnector for WebsocketConnector {
668 fn peers(&self) -> BTreeSet<PeerId> {
669 self.peers.keys().copied().collect()
670 }
671
672 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
673 let api_endpoint = match self.connection_overrides.get(&peer_id) {
674 Some(url) => {
675 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
676 url
677 }
678 None => self.peers.get(&peer_id).ok_or_else(|| {
679 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
680 })?,
681 };
682
683 #[cfg(not(target_family = "wasm"))]
684 let mut client = {
685 install_crypto_provider().await;
686 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
687 let mut root_certs = RootCertStore::empty();
688 root_certs.extend(webpki_roots);
689
690 let tls_cfg = CustomCertStore::builder()
691 .with_root_certificates(root_certs)
692 .with_no_client_auth();
693
694 WsClientBuilder::default()
695 .max_concurrent_requests(u16::MAX as usize)
696 .with_custom_cert_store(tls_cfg)
697 };
698
699 #[cfg(target_family = "wasm")]
700 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
701
702 if let Some(api_secret) = &self.api_secret {
703 #[cfg(not(target_family = "wasm"))]
704 {
705 let mut headers = HeaderMap::new();
708
709 let auth = base64::engine::general_purpose::STANDARD
710 .encode(format!("fedimint:{api_secret}"));
711
712 headers.insert(
713 "Authorization",
714 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
715 );
716
717 client = client.set_headers(headers);
718 }
719 #[cfg(target_family = "wasm")]
720 {
721 let mut url = api_endpoint.clone();
724 url.set_username("fedimint")
725 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
726 url.set_password(Some(&api_secret))
727 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
728
729 let client = client
730 .build(url.as_str())
731 .await
732 .map_err(|err| PeerError::InternalClientError(err.into()))?;
733
734 return Ok(client.into_dyn());
735 }
736 }
737
738 let client = client
739 .build(api_endpoint.as_str())
740 .await
741 .map_err(|err| PeerError::InternalClientError(err.into()))?;
742
743 Ok(client.into_dyn())
744 }
745}
746
747#[cfg(all(feature = "tor", not(target_family = "wasm")))]
748#[derive(Debug, Clone)]
749pub struct TorConnector {
750 peers: BTreeMap<PeerId, SafeUrl>,
751 api_secret: Option<String>,
752}
753
754#[cfg(all(feature = "tor", not(target_family = "wasm")))]
755impl TorConnector {
756 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
757 Self { peers, api_secret }
758 }
759}
760
761#[cfg(all(feature = "tor", not(target_family = "wasm")))]
762#[async_trait]
763impl IClientConnector for TorConnector {
764 fn peers(&self) -> BTreeSet<PeerId> {
765 self.peers.keys().copied().collect()
766 }
767
768 #[allow(clippy::too_many_lines)]
769 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
770 let api_endpoint = self
771 .peers
772 .get(&peer_id)
773 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
774
775 let tor_config = TorClientConfig::default();
776 let tor_client = TorClient::create_bootstrapped(tor_config)
777 .await
778 .map_err(|err| PeerError::InternalClientError(err.into()))?
779 .isolated_client();
780
781 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
782
783 let addr = (
786 api_endpoint
787 .host_str()
788 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
789 api_endpoint
790 .port_or_known_default()
791 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
792 );
793 let tor_addr = TorAddr::from(addr).map_err(|e| {
794 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
795 })?;
796
797 let tor_addr_clone = tor_addr.clone();
798
799 debug!(
800 ?tor_addr,
801 ?addr,
802 "Successfully created `TorAddr` for given address (i.e. host and port)"
803 );
804
805 let anonymized_stream = if api_endpoint.is_onion_address() {
808 let mut stream_prefs = arti_client::StreamPrefs::default();
809 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
810
811 let anonymized_stream = tor_client
812 .connect_with_prefs(tor_addr, &stream_prefs)
813 .await
814 .map_err(|e| PeerError::Connection(e.into()))?;
815
816 debug!(
817 ?tor_addr_clone,
818 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
819 );
820 anonymized_stream
821 } else {
822 let anonymized_stream = tor_client
823 .connect(tor_addr)
824 .await
825 .map_err(|e| PeerError::Connection(e.into()))?;
826
827 debug!(
828 ?tor_addr_clone,
829 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
830 );
831 anonymized_stream
832 };
833
834 let is_tls = match api_endpoint.scheme() {
835 "wss" => true,
836 "ws" => false,
837 unexpected_scheme => {
838 return Err(PeerError::InvalidEndpoint(anyhow!(
839 "Unsupported scheme: {unexpected_scheme}"
840 )));
841 }
842 };
843
844 let tls_connector = if is_tls {
845 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
846 let mut root_certs = RootCertStore::empty();
847 root_certs.extend(webpki_roots);
848
849 let tls_config = TlsClientConfig::builder()
850 .with_root_certificates(root_certs)
851 .with_no_client_auth();
852 let tls_connector = TlsConnector::from(Arc::new(tls_config));
853 Some(tls_connector)
854 } else {
855 None
856 };
857
858 let mut ws_client_builder =
859 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
860
861 if let Some(api_secret) = &self.api_secret {
862 let mut headers = HeaderMap::new();
865
866 let auth =
867 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
868
869 headers.insert(
870 "Authorization",
871 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
872 );
873
874 ws_client_builder = ws_client_builder.set_headers(headers);
875 }
876
877 match tls_connector {
878 None => {
879 let client = ws_client_builder
880 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
881 .await
882 .map_err(|e| PeerError::Connection(e.into()))?;
883
884 Ok(client.into_dyn())
885 }
886 Some(tls_connector) => {
887 let host = api_endpoint
888 .host_str()
889 .map(ToOwned::to_owned)
890 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
891
892 let server_name = rustls_pki_types::ServerName::try_from(host)
895 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
896
897 let anonymized_tls_stream = tls_connector
898 .connect(server_name, anonymized_stream)
899 .await
900 .map_err(|e| PeerError::Connection(e.into()))?;
901
902 let client = ws_client_builder
903 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
904 .await
905 .map_err(|e| PeerError::Connection(e.into()))?;
906
907 Ok(client.into_dyn())
908 }
909 }
910 }
911}
912
913fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
914 match jsonrpc_error {
915 JsonRpcClientError::Call(error_object) => {
916 let error = anyhow!(error_object.message().to_owned());
917 match ErrorCode::from(error_object.code()) {
918 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
919 PeerError::InvalidRequest(error)
920 }
921 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
922 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
923 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
924 PeerError::ServerError(error)
925 }
926 }
927 }
928 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
929 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
930 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
931 JsonRpcClientError::InvalidSubscriptionId => {
932 PeerError::Transport(anyhow!("Invalid subscription id"))
933 }
934 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
935 PeerError::InvalidRequest(anyhow!(invalid_request_id))
936 }
937 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
938 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
939 JsonRpcClientError::HttpNotImplemented => {
940 PeerError::ServerError(anyhow!("Http not implemented"))
941 }
942 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
943 PeerError::InvalidRequest(anyhow!(empty_batch_request))
944 }
945 JsonRpcClientError::RegisterMethod(register_method_error) => {
946 PeerError::InvalidResponse(anyhow!(register_method_error))
947 }
948 }
949}
950
951#[async_trait]
952impl IClientConnection for WsClient {
953 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
954 let method = match method {
955 ApiMethod::Core(method) => method,
956 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
957 };
958
959 Ok(ClientT::request(self, &method, [request.to_json()])
960 .await
961 .map_err(jsonrpc_error_to_peer_error)?)
962 }
963
964 async fn await_disconnection(&self) {
965 self.on_disconnect().await;
966 }
967}
968
969pub type DynClientConnector = Arc<dyn IClientConnector>;
970
971#[async_trait]
974pub trait IClientConnector: Send + Sync + 'static {
975 fn peers(&self) -> BTreeSet<PeerId>;
976
977 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
978
979 fn into_dyn(self) -> DynClientConnector
980 where
981 Self: Sized,
982 {
983 Arc::new(self)
984 }
985}
986
987pub type DynClientConnection = Arc<dyn IClientConnection>;
988
989#[async_trait]
990pub trait IClientConnection: Debug + Send + Sync + 'static {
991 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
992
993 async fn await_disconnection(&self);
994
995 fn into_dyn(self) -> DynClientConnection
996 where
997 Self: Sized,
998 {
999 Arc::new(self)
1000 }
1001}
1002
1003#[derive(Clone, Debug)]
1004pub struct ReconnectFederationApi {
1005 peers: BTreeSet<PeerId>,
1006 admin_id: Option<PeerId>,
1007 module_id: Option<ModuleInstanceId>,
1008 connections: ReconnectClientConnections,
1009}
1010
1011impl ReconnectFederationApi {
1012 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1013 Self {
1014 peers: connector.peers(),
1015 admin_id,
1016 module_id: None,
1017 connections: ReconnectClientConnections::new(connector),
1018 }
1019 }
1020
1021 pub async fn new_admin(
1022 peer: PeerId,
1023 url: SafeUrl,
1024 api_secret: &Option<String>,
1025 ) -> anyhow::Result<Self> {
1026 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1027 }
1028
1029 pub async fn from_endpoints(
1030 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1031 api_secret: &Option<String>,
1032 admin_id: Option<PeerId>,
1033 ) -> anyhow::Result<Self> {
1034 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1035
1036 let scheme = peers
1037 .values()
1038 .next()
1039 .expect("Federation api has been initialized with no peers")
1040 .scheme();
1041
1042 let connector = match scheme {
1043 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1044 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1045 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1046 "iroh" => {
1047 let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1048 .ok()
1049 .and_then(|dns| dns.parse().ok());
1050 iroh::IrohConnector::new(peers, iroh_dns).await?.into_dyn()
1051 }
1052 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1053 };
1054
1055 Ok(ReconnectFederationApi::new(&connector, admin_id))
1056 }
1057}
1058
1059impl IModuleFederationApi for ReconnectFederationApi {}
1060
1061#[apply(async_trait_maybe_send!)]
1062impl IRawFederationApi for ReconnectFederationApi {
1063 fn all_peers(&self) -> &BTreeSet<PeerId> {
1064 &self.peers
1065 }
1066
1067 fn self_peer(&self) -> Option<PeerId> {
1068 self.admin_id
1069 }
1070
1071 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1072 ReconnectFederationApi {
1073 peers: self.peers.clone(),
1074 admin_id: self.admin_id,
1075 module_id: Some(id),
1076 connections: self.connections.clone(),
1077 }
1078 .into()
1079 }
1080
1081 #[instrument(
1082 target = LOG_NET_API,
1083 skip_all,
1084 fields(
1085 peer_id = %peer_id,
1086 method = %method,
1087 params = %params.params,
1088 )
1089 )]
1090 async fn request_raw(
1091 &self,
1092 peer_id: PeerId,
1093 method: &str,
1094 params: &ApiRequestErased,
1095 ) -> PeerResult<Value> {
1096 let method = match self.module_id {
1097 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1098 None => ApiMethod::Core(method.to_string()),
1099 };
1100
1101 self.connections
1102 .request(peer_id, method, params.clone())
1103 .await
1104 }
1105}
1106
1107#[derive(Clone, Debug)]
1108pub struct ReconnectClientConnections {
1109 connections: BTreeMap<PeerId, ClientConnection>,
1110}
1111
1112impl ReconnectClientConnections {
1113 pub fn new(connector: &DynClientConnector) -> Self {
1114 ReconnectClientConnections {
1115 connections: connector
1116 .peers()
1117 .into_iter()
1118 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1119 .collect(),
1120 }
1121 }
1122
1123 async fn request(
1124 &self,
1125 peer: PeerId,
1126 method: ApiMethod,
1127 request: ApiRequestErased,
1128 ) -> PeerResult<Value> {
1129 trace!(target: LOG_NET_API, %method, "Api request");
1130 let res = self
1131 .connections
1132 .get(&peer)
1133 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1134 .connection()
1135 .await
1136 .context("Failed to connect to peer")
1137 .map_err(PeerError::Connection)?
1138 .request(method.clone(), request)
1139 .await;
1140
1141 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1142
1143 res
1144 }
1145}
1146
1147#[derive(Clone, Debug)]
1148struct ClientConnection {
1149 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1150}
1151
1152impl ClientConnection {
1153 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1154 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1155
1156 fedimint_core::task::spawn(
1157 "peer-api-connection",
1158 async move {
1159 let mut backoff = api_networking_backoff();
1160
1161 while let Ok(sender) = receiver.recv().await {
1162 let mut senders = vec![sender];
1163
1164 while let Ok(sender) = receiver.try_recv() {
1167 senders.push(sender);
1168 }
1169
1170 match connector.connect(peer).await {
1171 Ok(connection) => {
1172 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1173
1174 for sender in senders {
1175 sender.send(connection.clone()).ok();
1176 }
1177
1178 loop {
1179 tokio::select! {
1180 sender = receiver.recv() => {
1181 match sender.ok() {
1182 Some(sender) => sender.send(connection.clone()).ok(),
1183 None => break,
1184 };
1185 }
1186 () = connection.await_disconnection() => break,
1187 }
1188 }
1189
1190 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1191
1192 backoff = api_networking_backoff();
1193 }
1194 Err(e) => {
1195 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1196
1197 fedimint_core::task::sleep(
1198 backoff.next().expect("No limit to the number of retries"),
1199 )
1200 .await;
1201 }
1202 }
1203 }
1204
1205 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1206 }
1207 .instrument(trace_span!("peer-api-connection", ?peer)),
1208 );
1209
1210 ClientConnection { sender }
1211 }
1212
1213 async fn connection(&self) -> Option<DynClientConnection> {
1214 let (sender, receiver) = oneshot::channel();
1215
1216 self.sender
1217 .send(sender)
1218 .await
1219 .inspect_err(|err| {
1220 warn!(
1221 target: LOG_CLIENT_NET_API,
1222 err = %err.fmt_compact(),
1223 "Api connection request channel closed unexpectedly"
1224 );
1225 })
1226 .ok()?;
1227
1228 receiver.await.ok()
1229 }
1230}
1231
1232#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1234pub struct LegacyFederationStatus {
1235 pub session_count: u64,
1236 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1237 pub peers_online: u64,
1238 pub peers_offline: u64,
1239 pub peers_flagged: u64,
1242 pub scheduled_shutdown: Option<u64>,
1243}
1244
1245#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1246pub struct LegacyPeerStatus {
1247 pub last_contribution: Option<u64>,
1248 pub connection_status: LegacyP2PConnectionStatus,
1249 pub flagged: bool,
1252}
1253
1254#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1255#[serde(rename_all = "snake_case")]
1256pub enum LegacyP2PConnectionStatus {
1257 #[default]
1258 Disconnected,
1259 Connected,
1260}
1261
1262#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1263pub struct StatusResponse {
1264 pub server: ServerStatusLegacy,
1265 pub federation: Option<LegacyFederationStatus>,
1266}
1267
1268#[cfg(test)]
1269mod tests;