1mod error;
2pub mod global_api;
3pub mod net;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::fmt::Debug;
7use std::future::pending;
8use std::pin::Pin;
9use std::result;
10use std::sync::Arc;
11
12use anyhow::{Context, anyhow};
13use bitcoin::secp256k1;
14pub use error::{FederationError, OutputOutcomeError};
15pub use fedimint_connectors::ServerResult;
16pub use fedimint_connectors::error::ServerError;
17use fedimint_connectors::{
18 ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
19};
20use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
39};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::stream::FuturesUnordered;
42use futures::{Future, StreamExt};
43use global_api::with_cache::GlobalFederationApiWithCache;
44use jsonrpsee_core::DeserializeOwned;
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use tracing::{debug, instrument, trace, warn};
48
49use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
50
51pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
52
53pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
54 ApiVersion { major: 0, minor: 1 };
55
56pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
57pub type FederationResult<T> = Result<T, FederationError>;
58pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
59
60pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
61
62#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
66pub struct ApiVersionSet {
67 pub core: ApiVersion,
68 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
69}
70
71#[apply(async_trait_maybe_send!)]
73pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
74 fn all_peers(&self) -> &BTreeSet<PeerId>;
82
83 fn self_peer(&self) -> Option<PeerId>;
88
89 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
90
91 async fn request_raw(
93 &self,
94 peer_id: PeerId,
95 method: &str,
96 params: &ApiRequestErased,
97 ) -> ServerResult<Value>;
98}
99
100#[apply(async_trait_maybe_send!)]
103pub trait FederationApiExt: IRawFederationApi {
104 async fn request_single_peer<Ret>(
105 &self,
106 method: String,
107 params: ApiRequestErased,
108 peer: PeerId,
109 ) -> ServerResult<Ret>
110 where
111 Ret: DeserializeOwned,
112 {
113 self.request_raw(peer, &method, ¶ms)
114 .await
115 .and_then(|v| {
116 serde_json::from_value(v)
117 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
118 })
119 }
120
121 async fn request_single_peer_federation<FedRet>(
122 &self,
123 method: String,
124 params: ApiRequestErased,
125 peer_id: PeerId,
126 ) -> FederationResult<FedRet>
127 where
128 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
129 {
130 self.request_raw(peer_id, &method, ¶ms)
131 .await
132 .and_then(|v| {
133 serde_json::from_value(v)
134 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
135 })
136 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
137 }
138
139 #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
142 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
143 &self,
144 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
145 method: String,
146 params: ApiRequestErased,
147 ) -> FederationResult<FR> {
148 #[cfg(not(target_family = "wasm"))]
152 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
153 #[cfg(target_family = "wasm")]
154 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
155
156 for peer in self.all_peers() {
157 futures.push(Box::pin({
158 let method = &method;
159 let params = ¶ms;
160 async move {
161 let result = self
162 .request_single_peer(method.clone(), params.clone(), *peer)
163 .await;
164
165 (*peer, result)
166 }
167 }));
168 }
169
170 let mut peer_errors = BTreeMap::new();
171 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
172
173 loop {
174 let (peer, result) = futures
175 .next()
176 .await
177 .expect("Query strategy ran out of peers to query without returning a result");
178
179 match result {
180 Ok(response) => match strategy.process(peer, response) {
181 QueryStep::Retry(peers) => {
182 for peer in peers {
183 futures.push(Box::pin({
184 let method = &method;
185 let params = ¶ms;
186 async move {
187 let result = self
188 .request_single_peer(method.clone(), params.clone(), peer)
189 .await;
190
191 (peer, result)
192 }
193 }));
194 }
195 }
196 QueryStep::Success(response) => return Ok(response),
197 QueryStep::Failure(e) => {
198 peer_errors.insert(peer, e);
199 }
200 QueryStep::Continue => {}
201 },
202 Err(e) => {
203 e.report_if_unusual(peer, "RequestWithStrategy");
204 peer_errors.insert(peer, e);
205 }
206 }
207
208 if peer_errors.len() == peer_error_threshold {
209 return Err(FederationError::peer_errors(
210 method.clone(),
211 params.params.clone(),
212 peer_errors,
213 ));
214 }
215 }
216 }
217
218 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
219 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
220 &self,
221 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
222 method: String,
223 params: ApiRequestErased,
224 ) -> FR {
225 #[cfg(not(target_family = "wasm"))]
229 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
230 #[cfg(target_family = "wasm")]
231 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
232
233 for peer in self.all_peers() {
234 futures.push(Box::pin({
235 let method = &method;
236 let params = ¶ms;
237 async move {
238 let response = util::retry(
239 format!("api-request-{method}-{peer}"),
240 api_networking_backoff(),
241 || async {
242 self.request_single_peer(method.clone(), params.clone(), *peer)
243 .await
244 .inspect_err(|e| {
245 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
246 })
247 .map_err(|e| anyhow!(e.to_string()))
248 },
249 )
250 .await
251 .expect("Number of retries has no limit");
252
253 (*peer, response)
254 }
255 }));
256 }
257
258 loop {
259 let (peer, response) = match futures.next().await {
260 Some(t) => t,
261 None => pending().await,
262 };
263
264 match strategy.process(peer, response) {
265 QueryStep::Retry(peers) => {
266 for peer in peers {
267 futures.push(Box::pin({
268 let method = &method;
269 let params = ¶ms;
270 async move {
271 let response = util::retry(
272 format!("api-request-{method}-{peer}"),
273 api_networking_backoff(),
274 || async {
275 self.request_single_peer(
276 method.clone(),
277 params.clone(),
278 peer,
279 )
280 .await
281 .inspect_err(|err| {
282 if err.is_unusual() {
283 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
284 }
285 })
286 .map_err(|e| anyhow!(e.to_string()))
287 },
288 )
289 .await
290 .expect("Number of retries has no limit");
291
292 (peer, response)
293 }
294 }));
295 }
296 }
297 QueryStep::Success(response) => return response,
298 QueryStep::Failure(e) => {
299 warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
300 }
301 QueryStep::Continue => {}
302 }
303 }
304 }
305
306 async fn request_current_consensus<Ret>(
307 &self,
308 method: String,
309 params: ApiRequestErased,
310 ) -> FederationResult<Ret>
311 where
312 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
313 {
314 self.request_with_strategy(
315 ThresholdConsensus::new(self.all_peers().to_num_peers()),
316 method,
317 params,
318 )
319 .await
320 }
321
322 async fn request_current_consensus_retry<Ret>(
323 &self,
324 method: String,
325 params: ApiRequestErased,
326 ) -> Ret
327 where
328 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
329 {
330 self.request_with_strategy_retry(
331 ThresholdConsensus::new(self.all_peers().to_num_peers()),
332 method,
333 params,
334 )
335 .await
336 }
337
338 async fn request_admin<Ret>(
339 &self,
340 method: &str,
341 params: ApiRequestErased,
342 auth: ApiAuth,
343 ) -> FederationResult<Ret>
344 where
345 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
346 {
347 let Some(self_peer_id) = self.self_peer() else {
348 return Err(FederationError::general(
349 method,
350 params,
351 anyhow::format_err!("Admin peer_id not set"),
352 ));
353 };
354
355 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
356 .await
357 }
358
359 async fn request_admin_no_auth<Ret>(
360 &self,
361 method: &str,
362 params: ApiRequestErased,
363 ) -> FederationResult<Ret>
364 where
365 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
366 {
367 let Some(self_peer_id) = self.self_peer() else {
368 return Err(FederationError::general(
369 method,
370 params,
371 anyhow::format_err!("Admin peer_id not set"),
372 ));
373 };
374
375 self.request_single_peer_federation(method.into(), params, self_peer_id)
376 .await
377 }
378}
379
380#[apply(async_trait_maybe_send!)]
381impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
382
383pub trait IModuleFederationApi: IRawFederationApi {}
385
386dyn_newtype_define! {
387 #[derive(Clone)]
388 pub DynModuleApi(Arc<IModuleFederationApi>)
389}
390
391dyn_newtype_define! {
392 #[derive(Clone)]
393 pub DynGlobalApi(Arc<IGlobalFederationApi>)
394}
395
396impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
397 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
398 self.inner.as_ref()
399 }
400}
401
402impl DynGlobalApi {
403 pub fn new(
404 connectors: ConnectorRegistry,
405 peers: BTreeMap<PeerId, SafeUrl>,
406 api_secret: Option<&str>,
407 ) -> anyhow::Result<Self> {
408 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
409 connectors, peers, None, api_secret,
410 ))
411 .into())
412 }
413 pub fn new_admin(
414 connectors: ConnectorRegistry,
415 peer: PeerId,
416 url: SafeUrl,
417 api_secret: Option<&str>,
418 ) -> anyhow::Result<DynGlobalApi> {
419 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
420 connectors,
421 [(peer, url)].into(),
422 Some(peer),
423 api_secret,
424 ))
425 .into())
426 }
427
428 pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
429 Self::new_admin(
432 connectors,
433 PeerId::from(1024),
434 url,
435 None,
437 )
438 }
439}
440
441#[apply(async_trait_maybe_send!)]
443pub trait IGlobalFederationApi: IRawFederationApi {
444 async fn submit_transaction(
445 &self,
446 tx: Transaction,
447 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
448
449 async fn await_block(
450 &self,
451 block_index: u64,
452 decoders: &ModuleDecoderRegistry,
453 ) -> anyhow::Result<SessionOutcome>;
454
455 async fn get_session_status(
456 &self,
457 block_index: u64,
458 decoders: &ModuleDecoderRegistry,
459 core_api_version: ApiVersion,
460 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
461 ) -> anyhow::Result<SessionStatus>;
462
463 async fn session_count(&self) -> FederationResult<u64>;
464
465 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
466
467 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
468
469 async fn download_backup(
470 &self,
471 id: &secp256k1::PublicKey,
472 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
473
474 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
478
479 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
480
481 async fn set_local_params(
482 &self,
483 name: String,
484 federation_name: Option<String>,
485 disable_base_fees: Option<bool>,
486 auth: ApiAuth,
487 ) -> FederationResult<String>;
488
489 async fn add_peer_connection_info(
490 &self,
491 info: String,
492 auth: ApiAuth,
493 ) -> FederationResult<String>;
494
495 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
497
498 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
500
501 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
505
506 async fn status(&self) -> FederationResult<StatusResponse>;
508
509 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
511
512 async fn guardian_config_backup(&self, auth: ApiAuth)
514 -> FederationResult<GuardianConfigBackup>;
515
516 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
518
519 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
520
521 async fn submit_api_announcement(
523 &self,
524 peer_id: PeerId,
525 announcement: SignedApiAnnouncement,
526 ) -> FederationResult<()>;
527
528 async fn api_announcements(
529 &self,
530 guardian: PeerId,
531 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
532
533 async fn sign_api_announcement(
534 &self,
535 api_url: SafeUrl,
536 auth: ApiAuth,
537 ) -> FederationResult<SignedApiAnnouncement>;
538
539 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
540
541 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
543
544 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
546
547 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
550
551 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
554}
555
556pub fn deserialize_outcome<R>(
557 outcome: &SerdeOutputOutcome,
558 module_decoder: &Decoder,
559) -> OutputOutcomeResult<R>
560where
561 R: OutputOutcome + MaybeSend,
562{
563 let dyn_outcome = outcome
564 .try_into_inner_known_module_kind(module_decoder)
565 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
566
567 let source_instance = dyn_outcome.module_instance_id();
568
569 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
570 let target_type = std::any::type_name::<R>();
571 OutputOutcomeError::ResponseDeserialization(anyhow!(
572 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
573 ))
574 })
575}
576
577#[derive(Clone, Debug)]
588pub struct FederationApi {
589 peers: BTreeMap<PeerId, SafeUrl>,
591 peers_keys: BTreeSet<PeerId>,
593 admin_id: Option<PeerId>,
595 module_id: Option<ModuleInstanceId>,
597 api_secret: Option<String>,
599 connection_pool: ConnectionPool<dyn IGuardianConnection>,
601}
602
603impl FederationApi {
604 pub fn new(
605 connectors: ConnectorRegistry,
606 peers: BTreeMap<PeerId, SafeUrl>,
607 admin_peer_id: Option<PeerId>,
608 api_secret: Option<&str>,
609 ) -> Self {
610 Self {
611 peers_keys: peers.keys().copied().collect(),
612 peers,
613 admin_id: admin_peer_id,
614 module_id: None,
615 api_secret: api_secret.map(ToOwned::to_owned),
616 connection_pool: ConnectionPool::new(connectors),
617 }
618 }
619
620 async fn get_or_create_connection(
621 &self,
622 url: &SafeUrl,
623 api_secret: Option<&str>,
624 ) -> ServerResult<DynGuaridianConnection> {
625 self.connection_pool
626 .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
627 let conn = connectors
628 .connect_guardian(&url, api_secret.as_deref())
629 .await?;
630 Ok(conn)
631 })
632 .await
633 }
634
635 async fn request(
636 &self,
637 peer: PeerId,
638 method: ApiMethod,
639 request: ApiRequestErased,
640 ) -> ServerResult<Value> {
641 trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
642 let url = self
643 .peers
644 .get(&peer)
645 .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
646 let conn = self
647 .get_or_create_connection(url, self.api_secret.as_deref())
648 .await
649 .context("Failed to connect to peer")
650 .map_err(ServerError::Connection)?;
651 let res = conn.request(method.clone(), request).await;
652
653 trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
654
655 res
656 }
657}
658
659impl IModuleFederationApi for FederationApi {}
660
661#[apply(async_trait_maybe_send!)]
662impl IRawFederationApi for FederationApi {
663 fn all_peers(&self) -> &BTreeSet<PeerId> {
664 &self.peers_keys
665 }
666
667 fn self_peer(&self) -> Option<PeerId> {
668 self.admin_id
669 }
670
671 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
672 FederationApi {
673 api_secret: self.api_secret.clone(),
674 peers: self.peers.clone(),
675 peers_keys: self.peers_keys.clone(),
676 admin_id: self.admin_id,
677 module_id: Some(id),
678 connection_pool: self.connection_pool.clone(),
679 }
680 .into()
681 }
682
683 #[instrument(
684 target = LOG_CLIENT_NET_API,
685 skip_all,
686 fields(
687 peer_id = %peer_id,
688 method = %method,
689 params = %params.params,
690 )
691 )]
692 async fn request_raw(
693 &self,
694 peer_id: PeerId,
695 method: &str,
696 params: &ApiRequestErased,
697 ) -> ServerResult<Value> {
698 let method = match self.module_id {
699 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
700 None => ApiMethod::Core(method.to_string()),
701 };
702
703 self.request(peer_id, method, params.clone()).await
704 }
705}
706
707#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
709pub struct LegacyFederationStatus {
710 pub session_count: u64,
711 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
712 pub peers_online: u64,
713 pub peers_offline: u64,
714 pub peers_flagged: u64,
717 pub scheduled_shutdown: Option<u64>,
718}
719
720#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
721pub struct LegacyPeerStatus {
722 pub last_contribution: Option<u64>,
723 pub connection_status: LegacyP2PConnectionStatus,
724 pub flagged: bool,
727}
728
729#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
730#[serde(rename_all = "snake_case")]
731pub enum LegacyP2PConnectionStatus {
732 #[default]
733 Disconnected,
734 Connected,
735}
736
737#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
738pub struct StatusResponse {
739 pub server: ServerStatusLegacy,
740 pub federation: Option<LegacyFederationStatus>,
741}
742
743#[cfg(test)]
744mod tests;