fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3pub mod iroh;
4pub mod net;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use core::fmt;
10use std::collections::{BTreeMap, BTreeSet, HashMap};
11use std::fmt::Debug;
12use std::future::pending;
13use std::pin::Pin;
14use std::result;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24    GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
31use fedimint_core::invite_code::InviteCode;
32use fedimint_core::module::audit::AuditSummary;
33use fedimint_core::module::registry::ModuleDecoderRegistry;
34use fedimint_core::module::{
35    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
36};
37use fedimint_core::net::api_announcement::SignedApiAnnouncement;
38use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
39use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff, custom_backoff};
43use fedimint_core::util::{FmtCompact as _, SafeUrl};
44use fedimint_core::{
45    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
46};
47use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET, LOG_NET_API};
48use futures::stream::FuturesUnordered;
49use futures::{Future, StreamExt};
50use global_api::with_cache::GlobalFederationApiWithCache;
51use jsonrpsee_core::DeserializeOwned;
52#[cfg(target_family = "wasm")]
53use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use tokio::sync::OnceCell;
57use tracing::{debug, instrument, trace, warn};
58
59use crate::api;
60use crate::api::ws::WebsocketConnector;
61use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
62
63pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
64
65pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
66    ApiVersion { major: 0, minor: 1 };
67
68pub type PeerResult<T> = Result<T, PeerError>;
69pub type FederationResult<T> = Result<T, FederationError>;
70pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
71
72pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
73
74/// Set of api versions for each component (core + modules)
75///
76/// E.g. result of federated common api versions discovery.
77#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
78pub struct ApiVersionSet {
79    pub core: ApiVersion,
80    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
81}
82
83/// An API (module or global) that can query a federation
84#[apply(async_trait_maybe_send!)]
85pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
86    /// List of all federation peers for the purpose of iterating each peer
87    /// in the federation.
88    ///
89    /// The underlying implementation is responsible for knowing how many
90    /// and `PeerId`s of each. The caller of this interface most probably
91    /// have some idea as well, but passing this set across every
92    /// API call to the federation would be inconvenient.
93    fn all_peers(&self) -> &BTreeSet<PeerId>;
94
95    /// `PeerId` of the Guardian node, if set
96    ///
97    /// This is for using Client in a "Admin" mode, making authenticated
98    /// calls to own `fedimintd` instance.
99    fn self_peer(&self) -> Option<PeerId>;
100
101    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
102
103    /// Make request to a specific federation peer by `peer_id`
104    async fn request_raw(
105        &self,
106        peer_id: PeerId,
107        method: &str,
108        params: &ApiRequestErased,
109    ) -> PeerResult<Value>;
110}
111
112/// An extension trait allowing to making federation-wide API call on top
113/// [`IRawFederationApi`].
114#[apply(async_trait_maybe_send!)]
115pub trait FederationApiExt: IRawFederationApi {
116    async fn request_single_peer<Ret>(
117        &self,
118        method: String,
119        params: ApiRequestErased,
120        peer: PeerId,
121    ) -> PeerResult<Ret>
122    where
123        Ret: DeserializeOwned,
124    {
125        self.request_raw(peer, &method, &params)
126            .await
127            .and_then(|v| {
128                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
129            })
130    }
131
132    async fn request_single_peer_federation<FedRet>(
133        &self,
134        method: String,
135        params: ApiRequestErased,
136        peer_id: PeerId,
137    ) -> FederationResult<FedRet>
138    where
139        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
140    {
141        self.request_raw(peer_id, &method, &params)
142            .await
143            .and_then(|v| {
144                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
145            })
146            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
147    }
148
149    /// Make an aggregate request to federation, using `strategy` to logically
150    /// merge the responses.
151    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
152    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
153        &self,
154        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
155        method: String,
156        params: ApiRequestErased,
157    ) -> FederationResult<FR> {
158        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
159        // completed results from it and we don't do any `await`s when
160        // processing them, it should be totally OK.
161        #[cfg(not(target_family = "wasm"))]
162        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
163        #[cfg(target_family = "wasm")]
164        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
165
166        for peer in self.all_peers() {
167            futures.push(Box::pin({
168                let method = &method;
169                let params = &params;
170                async move {
171                    let result = self
172                        .request_single_peer(method.clone(), params.clone(), *peer)
173                        .await;
174
175                    (*peer, result)
176                }
177            }));
178        }
179
180        let mut peer_errors = BTreeMap::new();
181        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
182
183        loop {
184            let (peer, result) = futures
185                .next()
186                .await
187                .expect("Query strategy ran out of peers to query without returning a result");
188
189            match result {
190                Ok(response) => match strategy.process(peer, response) {
191                    QueryStep::Retry(peers) => {
192                        for peer in peers {
193                            futures.push(Box::pin({
194                                let method = &method;
195                                let params = &params;
196                                async move {
197                                    let result = self
198                                        .request_single_peer(method.clone(), params.clone(), peer)
199                                        .await;
200
201                                    (peer, result)
202                                }
203                            }));
204                        }
205                    }
206                    QueryStep::Success(response) => return Ok(response),
207                    QueryStep::Failure(e) => {
208                        peer_errors.insert(peer, e);
209                    }
210                    QueryStep::Continue => {}
211                },
212                Err(e) => {
213                    e.report_if_unusual(peer, "RequestWithStrategy");
214                    peer_errors.insert(peer, e);
215                }
216            }
217
218            if peer_errors.len() == peer_error_threshold {
219                return Err(FederationError::peer_errors(
220                    method.clone(),
221                    params.params.clone(),
222                    peer_errors,
223                ));
224            }
225        }
226    }
227
228    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
229    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
230        &self,
231        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
232        method: String,
233        params: ApiRequestErased,
234    ) -> FR {
235        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
236        // completed results from it and we don't do any `await`s when
237        // processing them, it should be totally OK.
238        #[cfg(not(target_family = "wasm"))]
239        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
240        #[cfg(target_family = "wasm")]
241        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
242
243        for peer in self.all_peers() {
244            futures.push(Box::pin({
245                let method = &method;
246                let params = &params;
247                async move {
248                    let response = util::retry(
249                        format!("api-request-{method}-{peer}"),
250                        api_networking_backoff(),
251                        || async {
252                            self.request_single_peer(method.clone(), params.clone(), *peer)
253                                .await
254                                .inspect_err(|e| {
255                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
256                                })
257                                .map_err(|e| anyhow!(e.to_string()))
258                        },
259                    )
260                    .await
261                    .expect("Number of retries has no limit");
262
263                    (*peer, response)
264                }
265            }));
266        }
267
268        loop {
269            let (peer, response) = match futures.next().await {
270                Some(t) => t,
271                None => pending().await,
272            };
273
274            match strategy.process(peer, response) {
275                QueryStep::Retry(peers) => {
276                    for peer in peers {
277                        futures.push(Box::pin({
278                            let method = &method;
279                            let params = &params;
280                            async move {
281                                let response = util::retry(
282                                    format!("api-request-{method}-{peer}"),
283                                    api_networking_backoff(),
284                                    || async {
285                                        self.request_single_peer(
286                                            method.clone(),
287                                            params.clone(),
288                                            peer,
289                                        )
290                                        .await
291                                        .inspect_err(|err| {
292                                            if err.is_unusual() {
293                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
294                                            }
295                                        })
296                                        .map_err(|e| anyhow!(e.to_string()))
297                                    },
298                                )
299                                .await
300                                .expect("Number of retries has no limit");
301
302                                (peer, response)
303                            }
304                        }));
305                    }
306                }
307                QueryStep::Success(response) => return response,
308                QueryStep::Failure(e) => {
309                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
310                }
311                QueryStep::Continue => {}
312            }
313        }
314    }
315
316    async fn request_current_consensus<Ret>(
317        &self,
318        method: String,
319        params: ApiRequestErased,
320    ) -> FederationResult<Ret>
321    where
322        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
323    {
324        self.request_with_strategy(
325            ThresholdConsensus::new(self.all_peers().to_num_peers()),
326            method,
327            params,
328        )
329        .await
330    }
331
332    async fn request_current_consensus_retry<Ret>(
333        &self,
334        method: String,
335        params: ApiRequestErased,
336    ) -> Ret
337    where
338        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
339    {
340        self.request_with_strategy_retry(
341            ThresholdConsensus::new(self.all_peers().to_num_peers()),
342            method,
343            params,
344        )
345        .await
346    }
347
348    async fn request_admin<Ret>(
349        &self,
350        method: &str,
351        params: ApiRequestErased,
352        auth: ApiAuth,
353    ) -> FederationResult<Ret>
354    where
355        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
356    {
357        let Some(self_peer_id) = self.self_peer() else {
358            return Err(FederationError::general(
359                method,
360                params,
361                anyhow::format_err!("Admin peer_id not set"),
362            ));
363        };
364
365        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
366            .await
367    }
368
369    async fn request_admin_no_auth<Ret>(
370        &self,
371        method: &str,
372        params: ApiRequestErased,
373    ) -> FederationResult<Ret>
374    where
375        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
376    {
377        let Some(self_peer_id) = self.self_peer() else {
378            return Err(FederationError::general(
379                method,
380                params,
381                anyhow::format_err!("Admin peer_id not set"),
382            ));
383        };
384
385        self.request_single_peer_federation(method.into(), params, self_peer_id)
386            .await
387    }
388}
389
390#[apply(async_trait_maybe_send!)]
391impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
392
393/// Trait marker for the module (non-global) endpoints
394pub trait IModuleFederationApi: IRawFederationApi {}
395
396dyn_newtype_define! {
397    #[derive(Clone)]
398    pub DynModuleApi(Arc<IModuleFederationApi>)
399}
400
401dyn_newtype_define! {
402    #[derive(Clone)]
403    pub DynGlobalApi(Arc<IGlobalFederationApi>)
404}
405
406impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
407    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
408        self.inner.as_ref()
409    }
410}
411
412impl DynGlobalApi {
413    pub fn new(
414        connectors: ConnectorRegistry,
415        peers: BTreeMap<PeerId, SafeUrl>,
416        api_secret: Option<&str>,
417    ) -> anyhow::Result<Self> {
418        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
419            connectors, peers, None, api_secret,
420        ))
421        .into())
422    }
423    pub fn new_admin(
424        connectors: ConnectorRegistry,
425        peer: PeerId,
426        url: SafeUrl,
427        api_secret: Option<&str>,
428    ) -> anyhow::Result<DynGlobalApi> {
429        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
430            connectors,
431            [(peer, url)].into(),
432            Some(peer),
433            api_secret,
434        ))
435        .into())
436    }
437
438    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
439        // PeerIds are used only for informational purposes, but just in case, make a
440        // big number so it stands out
441        Self::new_admin(
442            connectors,
443            PeerId::from(1024),
444            url,
445            // Setup does not have api secrets yet
446            None,
447        )
448    }
449}
450
451/// The API for the global (non-module) endpoints
452#[apply(async_trait_maybe_send!)]
453pub trait IGlobalFederationApi: IRawFederationApi {
454    async fn submit_transaction(
455        &self,
456        tx: Transaction,
457    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
458
459    async fn await_block(
460        &self,
461        block_index: u64,
462        decoders: &ModuleDecoderRegistry,
463    ) -> anyhow::Result<SessionOutcome>;
464
465    async fn get_session_status(
466        &self,
467        block_index: u64,
468        decoders: &ModuleDecoderRegistry,
469        core_api_version: ApiVersion,
470        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
471    ) -> anyhow::Result<SessionStatus>;
472
473    async fn session_count(&self) -> FederationResult<u64>;
474
475    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
476
477    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
478
479    async fn download_backup(
480        &self,
481        id: &secp256k1::PublicKey,
482    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
483
484    /// Sets the password used to decrypt the configs and authenticate
485    ///
486    /// Must be called first before any other calls to the API
487    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
488
489    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
490
491    async fn set_local_params(
492        &self,
493        name: String,
494        federation_name: Option<String>,
495        disable_base_fees: Option<bool>,
496        auth: ApiAuth,
497    ) -> FederationResult<String>;
498
499    async fn add_peer_connection_info(
500        &self,
501        info: String,
502        auth: ApiAuth,
503    ) -> FederationResult<String>;
504
505    /// Reset the peer setup codes during the federation setup process
506    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
507
508    /// Returns the setup code if `set_local_params` was already called
509    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
510
511    /// During config gen, used for an API-to-API call that adds a peer's server
512    /// connection info to the leader.
513    ///
514    /// Note this call will fail until the leader has their API running and has
515    /// `set_server_connections` so clients should retry.
516    ///
517    /// This call is not authenticated because it's guardian-to-guardian
518    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
519
520    /// During config gen, gets all the server connections we've received from
521    /// peers using `add_config_gen_peer`
522    ///
523    /// Could be called on the leader, so it's not authenticated
524    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
525
526    /// Runs DKG, can only be called once after configs have been generated in
527    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
528    /// error and config gen must be restarted.
529    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
530
531    /// After DKG, returns the hash of the consensus config tweaked with our id.
532    /// We need to share this with all other peers to complete verification.
533    async fn get_verify_config_hash(
534        &self,
535        auth: ApiAuth,
536    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
537
538    /// Updates local state and notify leader that we have verified configs.
539    /// This allows for a synchronization point, before we start consensus.
540    async fn verified_configs(
541        &self,
542        auth: ApiAuth,
543    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
544
545    /// Reads the configs from the disk, starts the consensus server, and shuts
546    /// down the config gen API to start the Fedimint API
547    ///
548    /// Clients may receive an error due to forced shutdown, should call the
549    /// `server_status` to see if consensus has started.
550    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
551
552    /// Returns the status of the server
553    async fn status(&self) -> FederationResult<StatusResponse>;
554
555    /// Show an audit across all modules
556    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
557
558    /// Download the guardian config to back it up
559    async fn guardian_config_backup(&self, auth: ApiAuth)
560    -> FederationResult<GuardianConfigBackup>;
561
562    /// Check auth credentials
563    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
564
565    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
566
567    /// Publish our signed API announcement to other guardians
568    async fn submit_api_announcement(
569        &self,
570        peer_id: PeerId,
571        announcement: SignedApiAnnouncement,
572    ) -> FederationResult<()>;
573
574    async fn api_announcements(
575        &self,
576        guardian: PeerId,
577    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
578
579    async fn sign_api_announcement(
580        &self,
581        api_url: SafeUrl,
582        auth: ApiAuth,
583    ) -> FederationResult<SignedApiAnnouncement>;
584
585    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
586
587    /// Returns the fedimintd version a peer is running
588    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
589
590    /// Fetch the backup statistics from the federation (admin endpoint)
591    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
592
593    /// Get the invite code for the federation guardian.
594    /// For instance, useful after DKG
595    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
596
597    /// Change the password used to encrypt the configs and for guardian
598    /// authentication
599    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
600}
601
602pub fn deserialize_outcome<R>(
603    outcome: &SerdeOutputOutcome,
604    module_decoder: &Decoder,
605) -> OutputOutcomeResult<R>
606where
607    R: OutputOutcome + MaybeSend,
608{
609    let dyn_outcome = outcome
610        .try_into_inner_known_module_kind(module_decoder)
611        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
612
613    let source_instance = dyn_outcome.module_instance_id();
614
615    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
616        let target_type = std::any::type_name::<R>();
617        OutputOutcomeError::ResponseDeserialization(anyhow!(
618            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
619        ))
620    })
621}
622
623/// Builder for [`ConnectorRegistry`]
624///
625/// See [`ConnectorRegistry::build_from_client_env`] and similar
626/// to create.
627#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
628pub struct ConnectorRegistryBuilder {
629    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
630
631    /// Enable Iroh endpoints at all?
632    iroh_enable: bool,
633    /// Override the Iroh DNS server to use
634    iroh_dns: Option<SafeUrl>,
635    /// Should start the "next/unstable" Iroh stack
636    iroh_next: bool,
637    /// Enable Pkarr DHT discovery
638    iroh_pkarr_dht: bool,
639
640    /// Enable Websocket API handling at all?
641    ws_enable: bool,
642    ws_force_tor: bool,
643}
644
645impl ConnectorRegistryBuilder {
646    #[allow(clippy::unused_async)] // Leave room for async in the future
647    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
648        let mut inner: BTreeMap<String, JitTryAnyhow<DynConnector>> = BTreeMap::new();
649
650        if self.iroh_enable {
651            let iroh_connector = JitTryAnyhow::new_try(move || async move {
652                Ok(Arc::new(
653                    api::iroh::IrohConnector::new(
654                        self.iroh_dns,
655                        self.iroh_pkarr_dht,
656                        self.iroh_next,
657                    )
658                    .await?,
659                ) as DynConnector)
660            });
661            inner.insert("iroh".to_string(), iroh_connector);
662        }
663
664        if self.ws_enable {
665            match self.ws_force_tor {
666                #[cfg(all(feature = "tor", not(target_family = "wasm")))]
667                true => {
668                    use crate::api::tor::TorConnector;
669
670                    let tor_connector = JitTry::new_try(move || async move {
671                        Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
672                    });
673                    inner.insert("wss".into(), tor_connector.clone());
674                    inner.insert("ws".into(), tor_connector);
675                }
676
677                false => {
678                    let websocket_connector = JitTry::new_try(move || async move {
679                        Ok(Arc::new(WebsocketConnector::new()) as DynConnector)
680                    });
681                    inner.insert("wss".into(), websocket_connector.clone());
682                    inner.insert("ws".into(), websocket_connector);
683                }
684                #[allow(unreachable_patterns)]
685                _ => bail!("Tor requested, but not support not compiled in"),
686            }
687        }
688
689        Ok(ConnectorRegistry {
690            connectors: inner,
691            connection_overrides: self.connection_overrides,
692        })
693    }
694
695    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
696        Self {
697            iroh_pkarr_dht: enable,
698            ..self
699        }
700    }
701
702    pub fn iroh_next(self, enable: bool) -> Self {
703        Self {
704            iroh_next: enable,
705            ..self
706        }
707    }
708
709    pub fn ws_force_tor(self, enable: bool) -> Self {
710        Self {
711            ws_force_tor: enable,
712            ..self
713        }
714    }
715
716    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
717        Self {
718            iroh_dns: Some(url),
719            ..self
720        }
721    }
722
723    /// Apply overrides from env variables
724    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
725        // TODO: read rest of the env
726        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
727            self = self.with_connection_override(k, v);
728        }
729
730        Ok(Self { ..self })
731    }
732    pub fn with_connection_override(
733        mut self,
734        original_url: SafeUrl,
735        replacement_url: SafeUrl,
736    ) -> Self {
737        self.connection_overrides
738            .insert(original_url, replacement_url);
739        self
740    }
741}
742
743/// A set of available connectivity protocols a client can use to make
744/// network API requests (typically to federation).
745///
746/// Maps from connection URL schema to [`Connector`] to use to connect to it.
747///
748/// See [`ConnectorRegistry::build_from_client_env`] and similar
749/// to create.
750///
751/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
752/// mixed-networking stack connection.
753///
754/// Responsibilities:
755#[derive(Clone, Debug)]
756pub struct ConnectorRegistry {
757    connectors: BTreeMap<String, JitTryAnyhow<DynConnector>>,
758
759    /// List of overrides to use when attempting to connect to given url
760    ///
761    /// This is useful for testing, or forcing non-default network
762    /// connectivity.
763    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
764}
765
766impl ConnectorRegistry {
767    /// Create a builder with recommended defaults intended for client-side
768    /// usage
769    ///
770    /// In particular mobile devices are considered.
771    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
772        ConnectorRegistryBuilder {
773            iroh_enable: true,
774            iroh_dns: None,
775            iroh_pkarr_dht: false,
776            iroh_next: true,
777            ws_enable: true,
778            ws_force_tor: false,
779
780            connection_overrides: BTreeMap::default(),
781        }
782    }
783
784    /// Create a builder with recommended defaults intended for the server-side
785    /// usage
786    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
787        ConnectorRegistryBuilder {
788            iroh_enable: true,
789            iroh_dns: None,
790            iroh_pkarr_dht: true,
791            iroh_next: true,
792            ws_enable: true,
793            ws_force_tor: false,
794
795            connection_overrides: BTreeMap::default(),
796        }
797    }
798
799    /// Create a builder with recommended defaults intended for testing
800    /// usage
801    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
802        ConnectorRegistryBuilder {
803            iroh_enable: true,
804            iroh_dns: None,
805            iroh_pkarr_dht: false,
806            iroh_next: false,
807            ws_enable: true,
808            ws_force_tor: false,
809
810            connection_overrides: BTreeMap::default(),
811        }
812    }
813
814    /// Like [`Self::build_from_client_defaults`] build will apply
815    /// environment-provided overrides.
816    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
817        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
818        Ok(builder)
819    }
820
821    /// Like [`Self::build_from_server_defaults`] build will apply
822    /// environment-provided overrides.
823    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
824        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
825        Ok(builder)
826    }
827
828    /// Like [`Self::build_from_testing_defaults`] build will apply
829    /// environment-provided overrides.
830    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
831        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
832        Ok(builder)
833    }
834
835    /// Connect to a given `url` using matching [`Connector`]
836    ///
837    /// This is the main function consumed by the downstream use for making
838    /// connection.
839    pub async fn connect_guardian(
840        &self,
841        url: &SafeUrl,
842        api_secret: Option<&str>,
843    ) -> PeerResult<DynGuaridianConnection> {
844        let url = match self.connection_overrides.get(url) {
845            Some(replacement) => {
846                trace!(
847                    target: LOG_NET,
848                    original_url = %url,
849                    replacement_url = %replacement,
850                    "Using a connectivity override for connection"
851                );
852
853                replacement
854            }
855            None => url,
856        };
857
858        let Some(connector) = self.connectors.get(url.scheme()) else {
859            return Err(PeerError::InvalidEndpoint(anyhow!(
860                "Unsupported scheme: {}; missing endpoint handler",
861                url.scheme()
862            )));
863        };
864
865        connector
866            .get_try()
867            .await
868            .map_err(|e| {
869                PeerError::Transport(anyhow!(
870                    "Connector failed to initialize: {}",
871                    e.fmt_compact()
872                ))
873            })?
874            .connect_guardian(url, api_secret)
875            .await
876    }
877}
878pub type DynConnector = Arc<dyn Connector>;
879
880#[async_trait]
881pub trait Connector: Send + Sync + 'static + fmt::Debug {
882    async fn connect_guardian(
883        &self,
884        url: &SafeUrl,
885        api_secret: Option<&str>,
886    ) -> PeerResult<DynGuaridianConnection>;
887}
888
889/// A connection from api client to a federation guardian (type erased)
890pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
891
892/// A connection from api client to a federation guardian
893#[async_trait]
894pub trait IGuardianConnection: Debug + Send + Sync + 'static {
895    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
896
897    fn is_connected(&self) -> bool;
898
899    async fn await_disconnection(&self);
900
901    fn into_dyn(self) -> DynGuaridianConnection
902    where
903        Self: Sized,
904    {
905        Arc::new(self)
906    }
907}
908
909/// Federation API client
910///
911/// The core underlying object used to make API requests to a federation.
912///
913/// It has an `connectors` handle to actually making outgoing connections
914/// to given URLs, and knows which peers there are and what URLs to connect to
915/// to reach them.
916// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
917// general purpose outgoing connection. Not a big deal, but might need refactor
918// in the future.
919#[derive(Clone, Debug)]
920pub struct FederationApi {
921    /// Available connectors which we can make connections
922    connectors: ConnectorRegistry,
923    /// Map of known URLs to use to connect to peers
924    peers: BTreeMap<PeerId, SafeUrl>,
925    /// List of peer ids, redundant to avoid collecting all the time
926    peers_keys: BTreeSet<PeerId>,
927    /// Our own [`PeerId`] to use when making admin apis
928    admin_id: Option<PeerId>,
929    /// Set when this API is used to communicate with a module
930    module_id: Option<ModuleInstanceId>,
931
932    api_secret: Option<String>,
933
934    /// Connection pool
935    ///
936    /// Every entry in this map will be created on demand and correspond to a
937    /// single outgoing connection to a certain URL that is in the process
938    /// of being established, or we already established.
939    #[allow(clippy::type_complexity)]
940    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState>>>>,
941}
942
943/// Inner part of [`ConnectionState`] preserving state between attempts to
944/// initialize [`ConnectionState::connection`]
945#[derive(Debug)]
946struct ConnectionStateInner {
947    fresh: bool,
948    backoff: FibonacciBackoff,
949}
950
951#[derive(Debug)]
952struct ConnectionState {
953    /// Connection we are trying to or already established
954    connection: tokio::sync::OnceCell<DynGuaridianConnection>,
955    /// State that technically is protected every time by
956    /// the serialization of `OnceCell::get_or_try_init`, but
957    /// for Rust purposes needs to be locked.
958    inner: std::sync::Mutex<ConnectionStateInner>,
959}
960
961impl ConnectionState {
962    /// Create a new connection state for a first time connection
963    fn new_initial() -> Self {
964        Self {
965            connection: OnceCell::new(),
966            inner: std::sync::Mutex::new(ConnectionStateInner {
967                fresh: true,
968                backoff: custom_backoff(
969                    // First time connections start quick
970                    Duration::from_millis(5),
971                    Duration::from_secs(30),
972                    None,
973                ),
974            }),
975        }
976    }
977
978    /// Create a new connection state for a connection that already failed, and
979    /// is being reset
980    fn new_reconnecting() -> Self {
981        Self {
982            connection: OnceCell::new(),
983            inner: std::sync::Mutex::new(ConnectionStateInner {
984                // set the attempts to 1, indicating that
985                fresh: false,
986                backoff: custom_backoff(
987                    // Connections after a disconnect start with some minimum delay
988                    Duration::from_millis(500),
989                    Duration::from_secs(30),
990                    None,
991                ),
992            }),
993        }
994    }
995
996    /// Record the fact that an attempt to connect is being made, and return
997    /// time the caller should wait.
998    fn pre_reconnect_delay(&self) -> Duration {
999        let mut backoff_locked = self.inner.lock().expect("Locking failed");
1000        let fresh = backoff_locked.fresh;
1001
1002        backoff_locked.fresh = false;
1003
1004        if fresh {
1005            Duration::default()
1006        } else {
1007            backoff_locked.backoff.next().expect("Keeps retrying")
1008        }
1009    }
1010}
1011impl FederationApi {
1012    pub fn new(
1013        connectors: ConnectorRegistry,
1014        peers: BTreeMap<PeerId, SafeUrl>,
1015        admin_peer_id: Option<PeerId>,
1016        api_secret: Option<&str>,
1017    ) -> Self {
1018        Self {
1019            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
1020            peers_keys: peers.keys().copied().collect(),
1021            peers,
1022            admin_id: admin_peer_id,
1023            module_id: None,
1024            connectors,
1025            api_secret: api_secret.map(ToOwned::to_owned),
1026        }
1027    }
1028
1029    async fn get_or_create_connection(
1030        &self,
1031        url: &SafeUrl,
1032        api_secret: Option<&str>,
1033    ) -> PeerResult<DynGuaridianConnection> {
1034        let mut pool_locked = self.connections.lock().await;
1035
1036        let pool_entry_arc = pool_locked
1037            .entry(url.to_owned())
1038                        .and_modify(|entry_arc| {
1039                // Check if existing connection is disconnected and reset the whole entry.
1040                //
1041                // This resets the state (like connectivity backoff), which is what we want.
1042                // Since the (`OnceCell`) was already initialized, it means connection was successfully
1043                // before, and disconnected afterwards.
1044                if let Some(existing_conn) = entry_arc.connection.get()
1045                    && !existing_conn.is_connected(){
1046                        trace!(target: LOG_NET_API, %url, "Existing connection is disconnected, removing from pool");
1047                        *entry_arc = Arc::new(ConnectionState::new_reconnecting());
1048                    }
1049            })
1050            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
1051            .clone();
1052
1053        // Drop the pool lock so other connections can work in parallel
1054        drop(pool_locked);
1055
1056        let conn = pool_entry_arc
1057            .connection
1058            // This serializes all the connection attempts. If one attempt to connect (including
1059            // waiting for the reconnect backoff) succeeds, all waiting ones will use it. If it
1060            // fails, any already pending/next will attempt it right afterwards.
1061            // Nit: if multiple calls are trying to connect to the same host that is offline, it
1062            // will take some of them multiples of maximum retry delay to actually return with
1063            // an error. This should be fine in practice and hard to avoid without a lot of
1064            // complexity.
1065            .get_or_try_init(|| async {
1066                let retry_delay = pool_entry_arc.pre_reconnect_delay();
1067                fedimint_core::runtime::sleep(retry_delay).await;
1068
1069                let conn = self.connectors.connect_guardian(url, api_secret).await?;
1070
1071                Ok(conn)
1072            })
1073            .await?;
1074
1075        trace!(target: LOG_NET_API, %url, "Using websocket connection");
1076        Ok(conn.clone())
1077    }
1078
1079    async fn request(
1080        &self,
1081        peer: PeerId,
1082        method: ApiMethod,
1083        request: ApiRequestErased,
1084    ) -> PeerResult<Value> {
1085        trace!(target: LOG_NET_API, %peer, %method, "Api request");
1086        let url = self
1087            .peers
1088            .get(&peer)
1089            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?;
1090        let conn = self
1091            .get_or_create_connection(url, self.api_secret.as_deref())
1092            .await
1093            .context("Failed to connect to peer")
1094            .map_err(PeerError::Connection)?;
1095        let res = conn.request(method.clone(), request).await;
1096
1097        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1098
1099        res
1100    }
1101}
1102
1103impl IModuleFederationApi for FederationApi {}
1104
1105#[apply(async_trait_maybe_send!)]
1106impl IRawFederationApi for FederationApi {
1107    fn all_peers(&self) -> &BTreeSet<PeerId> {
1108        &self.peers_keys
1109    }
1110
1111    fn self_peer(&self) -> Option<PeerId> {
1112        self.admin_id
1113    }
1114
1115    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1116        FederationApi {
1117            api_secret: self.api_secret.clone(),
1118            connections: self.connections.clone(),
1119            connectors: self.connectors.clone(),
1120            peers: self.peers.clone(),
1121            peers_keys: self.peers_keys.clone(),
1122            admin_id: self.admin_id,
1123            module_id: Some(id),
1124        }
1125        .into()
1126    }
1127
1128    #[instrument(
1129        target = LOG_NET_API,
1130        skip_all,
1131        fields(
1132            peer_id = %peer_id,
1133            method = %method,
1134            params = %params.params,
1135        )
1136    )]
1137    async fn request_raw(
1138        &self,
1139        peer_id: PeerId,
1140        method: &str,
1141        params: &ApiRequestErased,
1142    ) -> PeerResult<Value> {
1143        let method = match self.module_id {
1144            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1145            None => ApiMethod::Core(method.to_string()),
1146        };
1147
1148        self.request(peer_id, method, params.clone()).await
1149    }
1150}
1151
1152/// The status of a server, including how it views its peers
1153#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1154pub struct LegacyFederationStatus {
1155    pub session_count: u64,
1156    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1157    pub peers_online: u64,
1158    pub peers_offline: u64,
1159    /// This should always be 0 if everything is okay, so a monitoring tool
1160    /// should generate an alert if this is not the case.
1161    pub peers_flagged: u64,
1162    pub scheduled_shutdown: Option<u64>,
1163}
1164
1165#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1166pub struct LegacyPeerStatus {
1167    pub last_contribution: Option<u64>,
1168    pub connection_status: LegacyP2PConnectionStatus,
1169    /// Indicates that this peer needs attention from the operator since
1170    /// it has not contributed to the consensus in a long time
1171    pub flagged: bool,
1172}
1173
1174#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1175#[serde(rename_all = "snake_case")]
1176pub enum LegacyP2PConnectionStatus {
1177    #[default]
1178    Disconnected,
1179    Connected,
1180}
1181
1182#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1183pub struct StatusResponse {
1184    pub server: ServerStatusLegacy,
1185    pub federation: Option<LegacyFederationStatus>,
1186}
1187
1188#[cfg(test)]
1189mod tests;