1mod error;
2pub mod global_api;
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::fmt::Debug;
6use std::future::pending;
7use std::pin::Pin;
8use std::result;
9use std::sync::Arc;
10
11use anyhow::{Context, anyhow};
12use bitcoin::secp256k1;
13pub use error::{FederationError, OutputOutcomeError};
14pub use fedimint_connectors::ServerResult;
15pub use fedimint_connectors::error::ServerError;
16use fedimint_connectors::{
17 ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
18};
19use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
20use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
21use fedimint_core::core::backup::SignedBackupRequest;
22use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
32use fedimint_core::task::{MaybeSend, MaybeSync};
33use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
34use fedimint_core::util::backoff_util::api_networking_backoff;
35use fedimint_core::util::{FmtCompact as _, SafeUrl};
36use fedimint_core::{
37 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
38};
39use fedimint_logging::LOG_CLIENT_NET_API;
40use futures::stream::{BoxStream, FuturesUnordered};
41use futures::{Future, StreamExt};
42use global_api::with_cache::GlobalFederationApiWithCache;
43use jsonrpsee_core::DeserializeOwned;
44use serde::{Deserialize, Serialize};
45use serde_json::Value;
46use tokio::sync::watch;
47use tokio_stream::wrappers::WatchStream;
48use tracing::{debug, instrument, trace, warn};
49
50use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
51
52pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
53
54pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
55 ApiVersion { major: 0, minor: 1 };
56
57pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
58pub type FederationResult<T> = Result<T, FederationError>;
59pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
60
61pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
62
63#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
67pub struct ApiVersionSet {
68 pub core: ApiVersion,
69 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
70}
71
72#[apply(async_trait_maybe_send!)]
74pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
75 fn all_peers(&self) -> &BTreeSet<PeerId>;
83
84 fn self_peer(&self) -> Option<PeerId>;
89
90 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
91
92 async fn request_raw(
94 &self,
95 peer_id: PeerId,
96 method: &str,
97 params: &ApiRequestErased,
98 ) -> ServerResult<Value>;
99
100 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
104 async fn wait_for_initialized_connections(&self);
109}
110
111#[apply(async_trait_maybe_send!)]
114pub trait FederationApiExt: IRawFederationApi {
115 async fn request_single_peer<Ret>(
116 &self,
117 method: String,
118 params: ApiRequestErased,
119 peer: PeerId,
120 ) -> ServerResult<Ret>
121 where
122 Ret: DeserializeOwned,
123 {
124 self.request_raw(peer, &method, ¶ms)
125 .await
126 .and_then(|v| {
127 serde_json::from_value(v)
128 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
129 })
130 }
131
132 async fn request_single_peer_federation<FedRet>(
133 &self,
134 method: String,
135 params: ApiRequestErased,
136 peer_id: PeerId,
137 ) -> FederationResult<FedRet>
138 where
139 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
140 {
141 self.request_raw(peer_id, &method, ¶ms)
142 .await
143 .and_then(|v| {
144 serde_json::from_value(v)
145 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
146 })
147 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
148 }
149
150 #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
153 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
154 &self,
155 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
156 method: String,
157 params: ApiRequestErased,
158 ) -> FederationResult<FR> {
159 #[cfg(not(target_family = "wasm"))]
163 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
164 #[cfg(target_family = "wasm")]
165 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
166
167 for peer in self.all_peers() {
168 futures.push(Box::pin({
169 let method = &method;
170 let params = ¶ms;
171 async move {
172 let result = self
173 .request_single_peer(method.clone(), params.clone(), *peer)
174 .await;
175
176 (*peer, result)
177 }
178 }));
179 }
180
181 let mut peer_errors = BTreeMap::new();
182 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
183
184 loop {
185 let (peer, result) = futures
186 .next()
187 .await
188 .expect("Query strategy ran out of peers to query without returning a result");
189
190 match result {
191 Ok(response) => match strategy.process(peer, response) {
192 QueryStep::Retry(peers) => {
193 for peer in peers {
194 futures.push(Box::pin({
195 let method = &method;
196 let params = ¶ms;
197 async move {
198 let result = self
199 .request_single_peer(method.clone(), params.clone(), peer)
200 .await;
201
202 (peer, result)
203 }
204 }));
205 }
206 }
207 QueryStep::Success(response) => return Ok(response),
208 QueryStep::Failure(e) => {
209 peer_errors.insert(peer, e);
210 }
211 QueryStep::Continue => {}
212 },
213 Err(e) => {
214 e.report_if_unusual(peer, "RequestWithStrategy");
215 peer_errors.insert(peer, e);
216 }
217 }
218
219 if peer_errors.len() == peer_error_threshold {
220 return Err(FederationError::peer_errors(
221 method.clone(),
222 params.params.clone(),
223 peer_errors,
224 ));
225 }
226 }
227 }
228
229 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
230 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
231 &self,
232 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
233 method: String,
234 params: ApiRequestErased,
235 ) -> FR {
236 #[cfg(not(target_family = "wasm"))]
240 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
241 #[cfg(target_family = "wasm")]
242 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
243
244 for peer in self.all_peers() {
245 futures.push(Box::pin({
246 let method = &method;
247 let params = ¶ms;
248 async move {
249 let response = util::retry(
250 format!("api-request-{method}-{peer}"),
251 api_networking_backoff(),
252 || async {
253 self.request_single_peer(method.clone(), params.clone(), *peer)
254 .await
255 .inspect_err(|e| {
256 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
257 })
258 .map_err(|e| anyhow!(e.to_string()))
259 },
260 )
261 .await
262 .expect("Number of retries has no limit");
263
264 (*peer, response)
265 }
266 }));
267 }
268
269 loop {
270 let (peer, response) = match futures.next().await {
271 Some(t) => t,
272 None => pending().await,
273 };
274
275 match strategy.process(peer, response) {
276 QueryStep::Retry(peers) => {
277 for peer in peers {
278 futures.push(Box::pin({
279 let method = &method;
280 let params = ¶ms;
281 async move {
282 let response = util::retry(
283 format!("api-request-{method}-{peer}"),
284 api_networking_backoff(),
285 || async {
286 self.request_single_peer(
287 method.clone(),
288 params.clone(),
289 peer,
290 )
291 .await
292 .inspect_err(|err| {
293 if err.is_unusual() {
294 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
295 }
296 })
297 .map_err(|e| anyhow!(e.to_string()))
298 },
299 )
300 .await
301 .expect("Number of retries has no limit");
302
303 (peer, response)
304 }
305 }));
306 }
307 }
308 QueryStep::Success(response) => return response,
309 QueryStep::Failure(e) => {
310 warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
311 }
312 QueryStep::Continue => {}
313 }
314 }
315 }
316
317 async fn request_current_consensus<Ret>(
318 &self,
319 method: String,
320 params: ApiRequestErased,
321 ) -> FederationResult<Ret>
322 where
323 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
324 {
325 self.request_with_strategy(
326 ThresholdConsensus::new(self.all_peers().to_num_peers()),
327 method,
328 params,
329 )
330 .await
331 }
332
333 async fn request_current_consensus_retry<Ret>(
334 &self,
335 method: String,
336 params: ApiRequestErased,
337 ) -> Ret
338 where
339 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
340 {
341 self.request_with_strategy_retry(
342 ThresholdConsensus::new(self.all_peers().to_num_peers()),
343 method,
344 params,
345 )
346 .await
347 }
348
349 async fn request_admin<Ret>(
350 &self,
351 method: &str,
352 params: ApiRequestErased,
353 auth: ApiAuth,
354 ) -> FederationResult<Ret>
355 where
356 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
357 {
358 let Some(self_peer_id) = self.self_peer() else {
359 return Err(FederationError::general(
360 method,
361 params,
362 anyhow::format_err!("Admin peer_id not set"),
363 ));
364 };
365
366 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
367 .await
368 }
369
370 async fn request_admin_no_auth<Ret>(
371 &self,
372 method: &str,
373 params: ApiRequestErased,
374 ) -> FederationResult<Ret>
375 where
376 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
377 {
378 let Some(self_peer_id) = self.self_peer() else {
379 return Err(FederationError::general(
380 method,
381 params,
382 anyhow::format_err!("Admin peer_id not set"),
383 ));
384 };
385
386 self.request_single_peer_federation(method.into(), params, self_peer_id)
387 .await
388 }
389}
390
391#[apply(async_trait_maybe_send!)]
392impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
393
394pub trait IModuleFederationApi: IRawFederationApi {}
396
397dyn_newtype_define! {
398 #[derive(Clone)]
399 pub DynModuleApi(Arc<IModuleFederationApi>)
400}
401
402dyn_newtype_define! {
403 #[derive(Clone)]
404 pub DynGlobalApi(Arc<IGlobalFederationApi>)
405}
406
407impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
408 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
409 self.inner.as_ref()
410 }
411}
412
413impl DynGlobalApi {
414 pub fn new(
415 connectors: ConnectorRegistry,
416 peers: BTreeMap<PeerId, SafeUrl>,
417 api_secret: Option<&str>,
418 ) -> anyhow::Result<Self> {
419 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
420 connectors, peers, None, api_secret,
421 ))
422 .into())
423 }
424 pub fn new_admin(
425 connectors: ConnectorRegistry,
426 peer: PeerId,
427 url: SafeUrl,
428 api_secret: Option<&str>,
429 ) -> anyhow::Result<DynGlobalApi> {
430 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
431 connectors,
432 [(peer, url)].into(),
433 Some(peer),
434 api_secret,
435 ))
436 .into())
437 }
438
439 pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
440 Self::new_admin(
443 connectors,
444 PeerId::from(1024),
445 url,
446 None,
448 )
449 }
450}
451
452#[apply(async_trait_maybe_send!)]
454pub trait IGlobalFederationApi: IRawFederationApi {
455 async fn submit_transaction(
456 &self,
457 tx: Transaction,
458 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
459
460 async fn await_block(
461 &self,
462 block_index: u64,
463 decoders: &ModuleDecoderRegistry,
464 ) -> anyhow::Result<SessionOutcome>;
465
466 async fn get_session_status(
467 &self,
468 block_index: u64,
469 decoders: &ModuleDecoderRegistry,
470 core_api_version: ApiVersion,
471 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
472 ) -> anyhow::Result<SessionStatus>;
473
474 async fn session_count(&self) -> FederationResult<u64>;
475
476 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
477
478 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
479
480 async fn download_backup(
481 &self,
482 id: &secp256k1::PublicKey,
483 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
484
485 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
489
490 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
491
492 async fn set_local_params(
493 &self,
494 name: String,
495 federation_name: Option<String>,
496 disable_base_fees: Option<bool>,
497 enabled_modules: Option<BTreeSet<ModuleKind>>,
498 auth: ApiAuth,
499 ) -> FederationResult<String>;
500
501 async fn add_peer_connection_info(
502 &self,
503 info: String,
504 auth: ApiAuth,
505 ) -> FederationResult<String>;
506
507 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
509
510 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
512
513 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
517
518 async fn status(&self) -> FederationResult<StatusResponse>;
520
521 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
523
524 async fn guardian_config_backup(&self, auth: ApiAuth)
526 -> FederationResult<GuardianConfigBackup>;
527
528 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
530
531 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
532
533 async fn submit_api_announcement(
535 &self,
536 peer_id: PeerId,
537 announcement: SignedApiAnnouncement,
538 ) -> FederationResult<()>;
539
540 async fn api_announcements(
541 &self,
542 guardian: PeerId,
543 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
544
545 async fn sign_api_announcement(
546 &self,
547 api_url: SafeUrl,
548 auth: ApiAuth,
549 ) -> FederationResult<SignedApiAnnouncement>;
550
551 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
552
553 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
555
556 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
558
559 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
562
563 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
566}
567
568pub fn deserialize_outcome<R>(
569 outcome: &SerdeOutputOutcome,
570 module_decoder: &Decoder,
571) -> OutputOutcomeResult<R>
572where
573 R: OutputOutcome + MaybeSend,
574{
575 let dyn_outcome = outcome
576 .try_into_inner_known_module_kind(module_decoder)
577 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
578
579 let source_instance = dyn_outcome.module_instance_id();
580
581 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
582 let target_type = std::any::type_name::<R>();
583 OutputOutcomeError::ResponseDeserialization(anyhow!(
584 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
585 ))
586 })
587}
588
589#[derive(Clone, Debug)]
600pub struct FederationApi {
601 peers: BTreeMap<PeerId, SafeUrl>,
603 peers_keys: BTreeSet<PeerId>,
605 admin_id: Option<PeerId>,
607 module_id: Option<ModuleInstanceId>,
609 api_secret: Option<String>,
611 connection_pool: ConnectionPool<dyn IGuardianConnection>,
613}
614
615impl FederationApi {
616 pub fn new(
617 connectors: ConnectorRegistry,
618 peers: BTreeMap<PeerId, SafeUrl>,
619 admin_peer_id: Option<PeerId>,
620 api_secret: Option<&str>,
621 ) -> Self {
622 Self {
623 peers_keys: peers.keys().copied().collect(),
624 peers,
625 admin_id: admin_peer_id,
626 module_id: None,
627 api_secret: api_secret.map(ToOwned::to_owned),
628 connection_pool: ConnectionPool::new(connectors),
629 }
630 }
631
632 async fn get_or_create_connection(
633 &self,
634 url: &SafeUrl,
635 api_secret: Option<&str>,
636 ) -> ServerResult<DynGuaridianConnection> {
637 self.connection_pool
638 .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
639 let conn = connectors
640 .connect_guardian(&url, api_secret.as_deref())
641 .await?;
642 Ok(conn)
643 })
644 .await
645 }
646
647 async fn request(
648 &self,
649 peer: PeerId,
650 method: ApiMethod,
651 request: ApiRequestErased,
652 ) -> ServerResult<Value> {
653 trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
654 let url = self
655 .peers
656 .get(&peer)
657 .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
658 let conn = self
659 .get_or_create_connection(url, self.api_secret.as_deref())
660 .await
661 .context("Failed to connect to peer")
662 .map_err(ServerError::Connection)?;
663 let res = conn.request(method.clone(), request).await;
664
665 trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
666
667 res
668 }
669
670 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
674 self.connection_pool.get_active_connection_receiver()
675 }
676}
677
678impl IModuleFederationApi for FederationApi {}
679
680#[apply(async_trait_maybe_send!)]
681impl IRawFederationApi for FederationApi {
682 fn all_peers(&self) -> &BTreeSet<PeerId> {
683 &self.peers_keys
684 }
685
686 fn self_peer(&self) -> Option<PeerId> {
687 self.admin_id
688 }
689
690 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
691 FederationApi {
692 api_secret: self.api_secret.clone(),
693 peers: self.peers.clone(),
694 peers_keys: self.peers_keys.clone(),
695 admin_id: self.admin_id,
696 module_id: Some(id),
697 connection_pool: self.connection_pool.clone(),
698 }
699 .into()
700 }
701
702 #[instrument(
703 target = LOG_CLIENT_NET_API,
704 skip_all,
705 fields(
706 peer_id = %peer_id,
707 method = %method,
708 params = %params.params,
709 )
710 )]
711 async fn request_raw(
712 &self,
713 peer_id: PeerId,
714 method: &str,
715 params: &ApiRequestErased,
716 ) -> ServerResult<Value> {
717 let method = match self.module_id {
718 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
719 None => ApiMethod::Core(method.to_string()),
720 };
721
722 self.request(peer_id, method, params.clone()).await
723 }
724
725 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
726 let peers = self.peers.clone();
727
728 WatchStream::new(self.connection_pool.get_active_connection_receiver())
729 .map(move |active_urls| {
730 peers
731 .iter()
732 .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
733 .collect()
734 })
735 .boxed()
736 }
737 async fn wait_for_initialized_connections(&self) {
738 self.connection_pool
739 .wait_for_initialized_connections()
740 .await;
741 }
742}
743
744#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
746pub struct LegacyFederationStatus {
747 pub session_count: u64,
748 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
749 pub peers_online: u64,
750 pub peers_offline: u64,
751 pub peers_flagged: u64,
754 pub scheduled_shutdown: Option<u64>,
755}
756
757#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
758pub struct LegacyPeerStatus {
759 pub last_contribution: Option<u64>,
760 pub connection_status: LegacyP2PConnectionStatus,
761 pub flagged: bool,
764}
765
766#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
767#[serde(rename_all = "snake_case")]
768pub enum LegacyP2PConnectionStatus {
769 #[default]
770 Disconnected,
771 Connected,
772}
773
774#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
775pub struct StatusResponse {
776 pub server: ServerStatusLegacy,
777 pub federation: Option<LegacyFederationStatus>,
778}
779
780#[cfg(test)]
781mod tests;