1mod error;
2pub mod global_api;
3pub mod iroh;
4pub mod net;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use core::fmt;
10use std::collections::{BTreeMap, BTreeSet, HashMap};
11use std::fmt::Debug;
12use std::future::pending;
13use std::pin::Pin;
14use std::result;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24 GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
31use fedimint_core::invite_code::InviteCode;
32use fedimint_core::module::audit::AuditSummary;
33use fedimint_core::module::registry::ModuleDecoderRegistry;
34use fedimint_core::module::{
35 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
36};
37use fedimint_core::net::api_announcement::SignedApiAnnouncement;
38use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
39use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff, custom_backoff};
43use fedimint_core::util::{FmtCompact as _, SafeUrl};
44use fedimint_core::{
45 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
46};
47use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET, LOG_NET_API};
48use futures::stream::FuturesUnordered;
49use futures::{Future, StreamExt};
50use global_api::with_cache::GlobalFederationApiWithCache;
51use jsonrpsee_core::DeserializeOwned;
52#[cfg(target_family = "wasm")]
53use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use tokio::sync::OnceCell;
57use tracing::{debug, instrument, trace, warn};
58
59use crate::api;
60use crate::api::ws::WebsocketConnector;
61use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
62
63pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
64
65pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
66 ApiVersion { major: 0, minor: 1 };
67
68pub type PeerResult<T> = Result<T, PeerError>;
69pub type FederationResult<T> = Result<T, FederationError>;
70pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
71
72pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
73
74#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
78pub struct ApiVersionSet {
79 pub core: ApiVersion,
80 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
81}
82
83#[apply(async_trait_maybe_send!)]
85pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
86 fn all_peers(&self) -> &BTreeSet<PeerId>;
94
95 fn self_peer(&self) -> Option<PeerId>;
100
101 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
102
103 async fn request_raw(
105 &self,
106 peer_id: PeerId,
107 method: &str,
108 params: &ApiRequestErased,
109 ) -> PeerResult<Value>;
110}
111
112#[apply(async_trait_maybe_send!)]
115pub trait FederationApiExt: IRawFederationApi {
116 async fn request_single_peer<Ret>(
117 &self,
118 method: String,
119 params: ApiRequestErased,
120 peer: PeerId,
121 ) -> PeerResult<Ret>
122 where
123 Ret: DeserializeOwned,
124 {
125 self.request_raw(peer, &method, ¶ms)
126 .await
127 .and_then(|v| {
128 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
129 })
130 }
131
132 async fn request_single_peer_federation<FedRet>(
133 &self,
134 method: String,
135 params: ApiRequestErased,
136 peer_id: PeerId,
137 ) -> FederationResult<FedRet>
138 where
139 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
140 {
141 self.request_raw(peer_id, &method, ¶ms)
142 .await
143 .and_then(|v| {
144 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
145 })
146 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
147 }
148
149 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
152 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
153 &self,
154 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
155 method: String,
156 params: ApiRequestErased,
157 ) -> FederationResult<FR> {
158 #[cfg(not(target_family = "wasm"))]
162 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
163 #[cfg(target_family = "wasm")]
164 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
165
166 for peer in self.all_peers() {
167 futures.push(Box::pin({
168 let method = &method;
169 let params = ¶ms;
170 async move {
171 let result = self
172 .request_single_peer(method.clone(), params.clone(), *peer)
173 .await;
174
175 (*peer, result)
176 }
177 }));
178 }
179
180 let mut peer_errors = BTreeMap::new();
181 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
182
183 loop {
184 let (peer, result) = futures
185 .next()
186 .await
187 .expect("Query strategy ran out of peers to query without returning a result");
188
189 match result {
190 Ok(response) => match strategy.process(peer, response) {
191 QueryStep::Retry(peers) => {
192 for peer in peers {
193 futures.push(Box::pin({
194 let method = &method;
195 let params = ¶ms;
196 async move {
197 let result = self
198 .request_single_peer(method.clone(), params.clone(), peer)
199 .await;
200
201 (peer, result)
202 }
203 }));
204 }
205 }
206 QueryStep::Success(response) => return Ok(response),
207 QueryStep::Failure(e) => {
208 peer_errors.insert(peer, e);
209 }
210 QueryStep::Continue => {}
211 },
212 Err(e) => {
213 e.report_if_unusual(peer, "RequestWithStrategy");
214 peer_errors.insert(peer, e);
215 }
216 }
217
218 if peer_errors.len() == peer_error_threshold {
219 return Err(FederationError::peer_errors(
220 method.clone(),
221 params.params.clone(),
222 peer_errors,
223 ));
224 }
225 }
226 }
227
228 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
229 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
230 &self,
231 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
232 method: String,
233 params: ApiRequestErased,
234 ) -> FR {
235 #[cfg(not(target_family = "wasm"))]
239 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
240 #[cfg(target_family = "wasm")]
241 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
242
243 for peer in self.all_peers() {
244 futures.push(Box::pin({
245 let method = &method;
246 let params = ¶ms;
247 async move {
248 let response = util::retry(
249 format!("api-request-{method}-{peer}"),
250 api_networking_backoff(),
251 || async {
252 self.request_single_peer(method.clone(), params.clone(), *peer)
253 .await
254 .inspect_err(|e| {
255 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
256 })
257 .map_err(|e| anyhow!(e.to_string()))
258 },
259 )
260 .await
261 .expect("Number of retries has no limit");
262
263 (*peer, response)
264 }
265 }));
266 }
267
268 loop {
269 let (peer, response) = match futures.next().await {
270 Some(t) => t,
271 None => pending().await,
272 };
273
274 match strategy.process(peer, response) {
275 QueryStep::Retry(peers) => {
276 for peer in peers {
277 futures.push(Box::pin({
278 let method = &method;
279 let params = ¶ms;
280 async move {
281 let response = util::retry(
282 format!("api-request-{method}-{peer}"),
283 api_networking_backoff(),
284 || async {
285 self.request_single_peer(
286 method.clone(),
287 params.clone(),
288 peer,
289 )
290 .await
291 .inspect_err(|err| {
292 if err.is_unusual() {
293 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
294 }
295 })
296 .map_err(|e| anyhow!(e.to_string()))
297 },
298 )
299 .await
300 .expect("Number of retries has no limit");
301
302 (peer, response)
303 }
304 }));
305 }
306 }
307 QueryStep::Success(response) => return response,
308 QueryStep::Failure(e) => {
309 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
310 }
311 QueryStep::Continue => {}
312 }
313 }
314 }
315
316 async fn request_current_consensus<Ret>(
317 &self,
318 method: String,
319 params: ApiRequestErased,
320 ) -> FederationResult<Ret>
321 where
322 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
323 {
324 self.request_with_strategy(
325 ThresholdConsensus::new(self.all_peers().to_num_peers()),
326 method,
327 params,
328 )
329 .await
330 }
331
332 async fn request_current_consensus_retry<Ret>(
333 &self,
334 method: String,
335 params: ApiRequestErased,
336 ) -> Ret
337 where
338 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
339 {
340 self.request_with_strategy_retry(
341 ThresholdConsensus::new(self.all_peers().to_num_peers()),
342 method,
343 params,
344 )
345 .await
346 }
347
348 async fn request_admin<Ret>(
349 &self,
350 method: &str,
351 params: ApiRequestErased,
352 auth: ApiAuth,
353 ) -> FederationResult<Ret>
354 where
355 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
356 {
357 let Some(self_peer_id) = self.self_peer() else {
358 return Err(FederationError::general(
359 method,
360 params,
361 anyhow::format_err!("Admin peer_id not set"),
362 ));
363 };
364
365 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
366 .await
367 }
368
369 async fn request_admin_no_auth<Ret>(
370 &self,
371 method: &str,
372 params: ApiRequestErased,
373 ) -> FederationResult<Ret>
374 where
375 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
376 {
377 let Some(self_peer_id) = self.self_peer() else {
378 return Err(FederationError::general(
379 method,
380 params,
381 anyhow::format_err!("Admin peer_id not set"),
382 ));
383 };
384
385 self.request_single_peer_federation(method.into(), params, self_peer_id)
386 .await
387 }
388}
389
390#[apply(async_trait_maybe_send!)]
391impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
392
393pub trait IModuleFederationApi: IRawFederationApi {}
395
396dyn_newtype_define! {
397 #[derive(Clone)]
398 pub DynModuleApi(Arc<IModuleFederationApi>)
399}
400
401dyn_newtype_define! {
402 #[derive(Clone)]
403 pub DynGlobalApi(Arc<IGlobalFederationApi>)
404}
405
406impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
407 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
408 self.inner.as_ref()
409 }
410}
411
412impl DynGlobalApi {
413 pub fn new(
414 connectors: ConnectorRegistry,
415 peers: BTreeMap<PeerId, SafeUrl>,
416 api_secret: Option<&str>,
417 ) -> anyhow::Result<Self> {
418 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
419 connectors, peers, None, api_secret,
420 ))
421 .into())
422 }
423 pub fn new_admin(
424 connectors: ConnectorRegistry,
425 peer: PeerId,
426 url: SafeUrl,
427 api_secret: Option<&str>,
428 ) -> anyhow::Result<DynGlobalApi> {
429 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
430 connectors,
431 [(peer, url)].into(),
432 Some(peer),
433 api_secret,
434 ))
435 .into())
436 }
437
438 pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
439 Self::new_admin(
442 connectors,
443 PeerId::from(1024),
444 url,
445 None,
447 )
448 }
449}
450
451#[apply(async_trait_maybe_send!)]
453pub trait IGlobalFederationApi: IRawFederationApi {
454 async fn submit_transaction(
455 &self,
456 tx: Transaction,
457 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
458
459 async fn await_block(
460 &self,
461 block_index: u64,
462 decoders: &ModuleDecoderRegistry,
463 ) -> anyhow::Result<SessionOutcome>;
464
465 async fn get_session_status(
466 &self,
467 block_index: u64,
468 decoders: &ModuleDecoderRegistry,
469 core_api_version: ApiVersion,
470 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
471 ) -> anyhow::Result<SessionStatus>;
472
473 async fn session_count(&self) -> FederationResult<u64>;
474
475 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
476
477 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
478
479 async fn download_backup(
480 &self,
481 id: &secp256k1::PublicKey,
482 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
483
484 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
488
489 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
490
491 async fn set_local_params(
492 &self,
493 name: String,
494 federation_name: Option<String>,
495 disable_base_fees: Option<bool>,
496 auth: ApiAuth,
497 ) -> FederationResult<String>;
498
499 async fn add_peer_connection_info(
500 &self,
501 info: String,
502 auth: ApiAuth,
503 ) -> FederationResult<String>;
504
505 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
507
508 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
510
511 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
519
520 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
525
526 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
530
531 async fn get_verify_config_hash(
534 &self,
535 auth: ApiAuth,
536 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
537
538 async fn verified_configs(
541 &self,
542 auth: ApiAuth,
543 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
544
545 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
551
552 async fn status(&self) -> FederationResult<StatusResponse>;
554
555 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
557
558 async fn guardian_config_backup(&self, auth: ApiAuth)
560 -> FederationResult<GuardianConfigBackup>;
561
562 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
564
565 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
566
567 async fn submit_api_announcement(
569 &self,
570 peer_id: PeerId,
571 announcement: SignedApiAnnouncement,
572 ) -> FederationResult<()>;
573
574 async fn api_announcements(
575 &self,
576 guardian: PeerId,
577 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
578
579 async fn sign_api_announcement(
580 &self,
581 api_url: SafeUrl,
582 auth: ApiAuth,
583 ) -> FederationResult<SignedApiAnnouncement>;
584
585 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
586
587 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
589
590 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
592
593 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
596
597 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
600}
601
602pub fn deserialize_outcome<R>(
603 outcome: &SerdeOutputOutcome,
604 module_decoder: &Decoder,
605) -> OutputOutcomeResult<R>
606where
607 R: OutputOutcome + MaybeSend,
608{
609 let dyn_outcome = outcome
610 .try_into_inner_known_module_kind(module_decoder)
611 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
612
613 let source_instance = dyn_outcome.module_instance_id();
614
615 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
616 let target_type = std::any::type_name::<R>();
617 OutputOutcomeError::ResponseDeserialization(anyhow!(
618 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
619 ))
620 })
621}
622
623#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
629 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
630
631 iroh_enable: bool,
633 iroh_dns: Option<SafeUrl>,
635 iroh_next: bool,
637 iroh_pkarr_dht: bool,
639
640 ws_enable: bool,
642 ws_force_tor: bool,
643}
644
645impl ConnectorRegistryBuilder {
646 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
648 let mut inner: BTreeMap<String, JitTryAnyhow<DynConnector>> = BTreeMap::new();
649
650 if self.iroh_enable {
651 let iroh_connector = JitTryAnyhow::new_try(move || async move {
652 Ok(Arc::new(
653 api::iroh::IrohConnector::new(
654 self.iroh_dns,
655 self.iroh_pkarr_dht,
656 self.iroh_next,
657 )
658 .await?,
659 ) as DynConnector)
660 });
661 inner.insert("iroh".to_string(), iroh_connector);
662 }
663
664 if self.ws_enable {
665 match self.ws_force_tor {
666 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
667 true => {
668 use crate::api::tor::TorConnector;
669
670 let tor_connector = JitTry::new_try(move || async move {
671 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
672 });
673 inner.insert("wss".into(), tor_connector.clone());
674 inner.insert("ws".into(), tor_connector);
675 }
676
677 false => {
678 let websocket_connector = JitTry::new_try(move || async move {
679 Ok(Arc::new(WebsocketConnector::new()) as DynConnector)
680 });
681 inner.insert("wss".into(), websocket_connector.clone());
682 inner.insert("ws".into(), websocket_connector);
683 }
684 #[allow(unreachable_patterns)]
685 _ => bail!("Tor requested, but not support not compiled in"),
686 }
687 }
688
689 Ok(ConnectorRegistry {
690 connectors: inner,
691 connection_overrides: self.connection_overrides,
692 })
693 }
694
695 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
696 Self {
697 iroh_pkarr_dht: enable,
698 ..self
699 }
700 }
701
702 pub fn iroh_next(self, enable: bool) -> Self {
703 Self {
704 iroh_next: enable,
705 ..self
706 }
707 }
708
709 pub fn ws_force_tor(self, enable: bool) -> Self {
710 Self {
711 ws_force_tor: enable,
712 ..self
713 }
714 }
715
716 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
717 Self {
718 iroh_dns: Some(url),
719 ..self
720 }
721 }
722
723 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
725 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
727 self = self.with_connection_override(k, v);
728 }
729
730 Ok(Self { ..self })
731 }
732 pub fn with_connection_override(
733 mut self,
734 original_url: SafeUrl,
735 replacement_url: SafeUrl,
736 ) -> Self {
737 self.connection_overrides
738 .insert(original_url, replacement_url);
739 self
740 }
741}
742
743#[derive(Clone, Debug)]
756pub struct ConnectorRegistry {
757 connectors: BTreeMap<String, JitTryAnyhow<DynConnector>>,
758
759 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
764}
765
766impl ConnectorRegistry {
767 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
772 ConnectorRegistryBuilder {
773 iroh_enable: true,
774 iroh_dns: None,
775 iroh_pkarr_dht: false,
776 iroh_next: true,
777 ws_enable: true,
778 ws_force_tor: false,
779
780 connection_overrides: BTreeMap::default(),
781 }
782 }
783
784 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
787 ConnectorRegistryBuilder {
788 iroh_enable: true,
789 iroh_dns: None,
790 iroh_pkarr_dht: true,
791 iroh_next: true,
792 ws_enable: true,
793 ws_force_tor: false,
794
795 connection_overrides: BTreeMap::default(),
796 }
797 }
798
799 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
802 ConnectorRegistryBuilder {
803 iroh_enable: true,
804 iroh_dns: None,
805 iroh_pkarr_dht: false,
806 iroh_next: false,
807 ws_enable: true,
808 ws_force_tor: false,
809
810 connection_overrides: BTreeMap::default(),
811 }
812 }
813
814 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
817 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
818 Ok(builder)
819 }
820
821 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
824 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
825 Ok(builder)
826 }
827
828 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
831 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
832 Ok(builder)
833 }
834
835 pub async fn connect_guardian(
840 &self,
841 url: &SafeUrl,
842 api_secret: Option<&str>,
843 ) -> PeerResult<DynGuaridianConnection> {
844 let url = match self.connection_overrides.get(url) {
845 Some(replacement) => {
846 trace!(
847 target: LOG_NET,
848 original_url = %url,
849 replacement_url = %replacement,
850 "Using a connectivity override for connection"
851 );
852
853 replacement
854 }
855 None => url,
856 };
857
858 let Some(connector) = self.connectors.get(url.scheme()) else {
859 return Err(PeerError::InvalidEndpoint(anyhow!(
860 "Unsupported scheme: {}; missing endpoint handler",
861 url.scheme()
862 )));
863 };
864
865 connector
866 .get_try()
867 .await
868 .map_err(|e| {
869 PeerError::Transport(anyhow!(
870 "Connector failed to initialize: {}",
871 e.fmt_compact()
872 ))
873 })?
874 .connect_guardian(url, api_secret)
875 .await
876 }
877}
878pub type DynConnector = Arc<dyn Connector>;
879
880#[async_trait]
881pub trait Connector: Send + Sync + 'static + fmt::Debug {
882 async fn connect_guardian(
883 &self,
884 url: &SafeUrl,
885 api_secret: Option<&str>,
886 ) -> PeerResult<DynGuaridianConnection>;
887}
888
889pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
891
892#[async_trait]
894pub trait IGuardianConnection: Debug + Send + Sync + 'static {
895 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
896
897 fn is_connected(&self) -> bool;
898
899 async fn await_disconnection(&self);
900
901 fn into_dyn(self) -> DynGuaridianConnection
902 where
903 Self: Sized,
904 {
905 Arc::new(self)
906 }
907}
908
909#[derive(Clone, Debug)]
920pub struct FederationApi {
921 connectors: ConnectorRegistry,
923 peers: BTreeMap<PeerId, SafeUrl>,
925 peers_keys: BTreeSet<PeerId>,
927 admin_id: Option<PeerId>,
929 module_id: Option<ModuleInstanceId>,
931
932 api_secret: Option<String>,
933
934 #[allow(clippy::type_complexity)]
940 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState>>>>,
941}
942
943#[derive(Debug)]
946struct ConnectionStateInner {
947 fresh: bool,
948 backoff: FibonacciBackoff,
949}
950
951#[derive(Debug)]
952struct ConnectionState {
953 connection: tokio::sync::OnceCell<DynGuaridianConnection>,
955 inner: std::sync::Mutex<ConnectionStateInner>,
959}
960
961impl ConnectionState {
962 fn new_initial() -> Self {
964 Self {
965 connection: OnceCell::new(),
966 inner: std::sync::Mutex::new(ConnectionStateInner {
967 fresh: true,
968 backoff: custom_backoff(
969 Duration::from_millis(5),
971 Duration::from_secs(30),
972 None,
973 ),
974 }),
975 }
976 }
977
978 fn new_reconnecting() -> Self {
981 Self {
982 connection: OnceCell::new(),
983 inner: std::sync::Mutex::new(ConnectionStateInner {
984 fresh: false,
986 backoff: custom_backoff(
987 Duration::from_millis(500),
989 Duration::from_secs(30),
990 None,
991 ),
992 }),
993 }
994 }
995
996 fn pre_reconnect_delay(&self) -> Duration {
999 let mut backoff_locked = self.inner.lock().expect("Locking failed");
1000 let fresh = backoff_locked.fresh;
1001
1002 backoff_locked.fresh = false;
1003
1004 if fresh {
1005 Duration::default()
1006 } else {
1007 backoff_locked.backoff.next().expect("Keeps retrying")
1008 }
1009 }
1010}
1011impl FederationApi {
1012 pub fn new(
1013 connectors: ConnectorRegistry,
1014 peers: BTreeMap<PeerId, SafeUrl>,
1015 admin_peer_id: Option<PeerId>,
1016 api_secret: Option<&str>,
1017 ) -> Self {
1018 Self {
1019 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1020 peers_keys: peers.keys().copied().collect(),
1021 peers,
1022 admin_id: admin_peer_id,
1023 module_id: None,
1024 connectors,
1025 api_secret: api_secret.map(ToOwned::to_owned),
1026 }
1027 }
1028
1029 async fn get_or_create_connection(
1030 &self,
1031 url: &SafeUrl,
1032 api_secret: Option<&str>,
1033 ) -> PeerResult<DynGuaridianConnection> {
1034 let mut pool_locked = self.connections.lock().await;
1035
1036 let pool_entry_arc = pool_locked
1037 .entry(url.to_owned())
1038 .and_modify(|entry_arc| {
1039 if let Some(existing_conn) = entry_arc.connection.get()
1045 && !existing_conn.is_connected(){
1046 trace!(target: LOG_NET_API, %url, "Existing connection is disconnected, removing from pool");
1047 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
1048 }
1049 })
1050 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
1051 .clone();
1052
1053 drop(pool_locked);
1055
1056 let conn = pool_entry_arc
1057 .connection
1058 .get_or_try_init(|| async {
1066 let retry_delay = pool_entry_arc.pre_reconnect_delay();
1067 fedimint_core::runtime::sleep(retry_delay).await;
1068
1069 let conn = self.connectors.connect_guardian(url, api_secret).await?;
1070
1071 Ok(conn)
1072 })
1073 .await?;
1074
1075 trace!(target: LOG_NET_API, %url, "Using websocket connection");
1076 Ok(conn.clone())
1077 }
1078
1079 async fn request(
1080 &self,
1081 peer: PeerId,
1082 method: ApiMethod,
1083 request: ApiRequestErased,
1084 ) -> PeerResult<Value> {
1085 trace!(target: LOG_NET_API, %peer, %method, "Api request");
1086 let url = self
1087 .peers
1088 .get(&peer)
1089 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?;
1090 let conn = self
1091 .get_or_create_connection(url, self.api_secret.as_deref())
1092 .await
1093 .context("Failed to connect to peer")
1094 .map_err(PeerError::Connection)?;
1095 let res = conn.request(method.clone(), request).await;
1096
1097 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1098
1099 res
1100 }
1101}
1102
1103impl IModuleFederationApi for FederationApi {}
1104
1105#[apply(async_trait_maybe_send!)]
1106impl IRawFederationApi for FederationApi {
1107 fn all_peers(&self) -> &BTreeSet<PeerId> {
1108 &self.peers_keys
1109 }
1110
1111 fn self_peer(&self) -> Option<PeerId> {
1112 self.admin_id
1113 }
1114
1115 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1116 FederationApi {
1117 api_secret: self.api_secret.clone(),
1118 connections: self.connections.clone(),
1119 connectors: self.connectors.clone(),
1120 peers: self.peers.clone(),
1121 peers_keys: self.peers_keys.clone(),
1122 admin_id: self.admin_id,
1123 module_id: Some(id),
1124 }
1125 .into()
1126 }
1127
1128 #[instrument(
1129 target = LOG_NET_API,
1130 skip_all,
1131 fields(
1132 peer_id = %peer_id,
1133 method = %method,
1134 params = %params.params,
1135 )
1136 )]
1137 async fn request_raw(
1138 &self,
1139 peer_id: PeerId,
1140 method: &str,
1141 params: &ApiRequestErased,
1142 ) -> PeerResult<Value> {
1143 let method = match self.module_id {
1144 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1145 None => ApiMethod::Core(method.to_string()),
1146 };
1147
1148 self.request(peer_id, method, params.clone()).await
1149 }
1150}
1151
1152#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1154pub struct LegacyFederationStatus {
1155 pub session_count: u64,
1156 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1157 pub peers_online: u64,
1158 pub peers_offline: u64,
1159 pub peers_flagged: u64,
1162 pub scheduled_shutdown: Option<u64>,
1163}
1164
1165#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1166pub struct LegacyPeerStatus {
1167 pub last_contribution: Option<u64>,
1168 pub connection_status: LegacyP2PConnectionStatus,
1169 pub flagged: bool,
1172}
1173
1174#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1175#[serde(rename_all = "snake_case")]
1176pub enum LegacyP2PConnectionStatus {
1177 #[default]
1178 Disconnected,
1179 Connected,
1180}
1181
1182#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1183pub struct StatusResponse {
1184 pub server: ServerStatusLegacy,
1185 pub federation: Option<LegacyFederationStatus>,
1186}
1187
1188#[cfg(test)]
1189mod tests;