1mod error;
2pub mod global_api;
3pub mod net;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::fmt::Debug;
7use std::future::pending;
8use std::pin::Pin;
9use std::result;
10use std::sync::Arc;
11
12use anyhow::{Context, anyhow};
13use bitcoin::secp256k1;
14pub use error::{FederationError, OutputOutcomeError};
15pub use fedimint_connectors::ServerResult;
16pub use fedimint_connectors::error::ServerError;
17use fedimint_connectors::{
18 ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
19};
20use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
39};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::stream::{BoxStream, FuturesUnordered};
42use futures::{Future, StreamExt};
43use global_api::with_cache::GlobalFederationApiWithCache;
44use jsonrpsee_core::DeserializeOwned;
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use tokio::sync::watch;
48use tokio_stream::wrappers::WatchStream;
49use tracing::{debug, instrument, trace, warn};
50
51use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
52
53pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
54
55pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
56 ApiVersion { major: 0, minor: 1 };
57
58pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
59pub type FederationResult<T> = Result<T, FederationError>;
60pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
61
62pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
63
64#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
68pub struct ApiVersionSet {
69 pub core: ApiVersion,
70 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
71}
72
73#[apply(async_trait_maybe_send!)]
75pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
76 fn all_peers(&self) -> &BTreeSet<PeerId>;
84
85 fn self_peer(&self) -> Option<PeerId>;
90
91 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
92
93 async fn request_raw(
95 &self,
96 peer_id: PeerId,
97 method: &str,
98 params: &ApiRequestErased,
99 ) -> ServerResult<Value>;
100
101 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
105 async fn wait_for_initialized_connections(&self);
110}
111
112#[apply(async_trait_maybe_send!)]
115pub trait FederationApiExt: IRawFederationApi {
116 async fn request_single_peer<Ret>(
117 &self,
118 method: String,
119 params: ApiRequestErased,
120 peer: PeerId,
121 ) -> ServerResult<Ret>
122 where
123 Ret: DeserializeOwned,
124 {
125 self.request_raw(peer, &method, ¶ms)
126 .await
127 .and_then(|v| {
128 serde_json::from_value(v)
129 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
130 })
131 }
132
133 async fn request_single_peer_federation<FedRet>(
134 &self,
135 method: String,
136 params: ApiRequestErased,
137 peer_id: PeerId,
138 ) -> FederationResult<FedRet>
139 where
140 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
141 {
142 self.request_raw(peer_id, &method, ¶ms)
143 .await
144 .and_then(|v| {
145 serde_json::from_value(v)
146 .map_err(|e| ServerError::ResponseDeserialization(e.into()))
147 })
148 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
149 }
150
151 #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
154 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
155 &self,
156 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
157 method: String,
158 params: ApiRequestErased,
159 ) -> FederationResult<FR> {
160 #[cfg(not(target_family = "wasm"))]
164 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
165 #[cfg(target_family = "wasm")]
166 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
167
168 for peer in self.all_peers() {
169 futures.push(Box::pin({
170 let method = &method;
171 let params = ¶ms;
172 async move {
173 let result = self
174 .request_single_peer(method.clone(), params.clone(), *peer)
175 .await;
176
177 (*peer, result)
178 }
179 }));
180 }
181
182 let mut peer_errors = BTreeMap::new();
183 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
184
185 loop {
186 let (peer, result) = futures
187 .next()
188 .await
189 .expect("Query strategy ran out of peers to query without returning a result");
190
191 match result {
192 Ok(response) => match strategy.process(peer, response) {
193 QueryStep::Retry(peers) => {
194 for peer in peers {
195 futures.push(Box::pin({
196 let method = &method;
197 let params = ¶ms;
198 async move {
199 let result = self
200 .request_single_peer(method.clone(), params.clone(), peer)
201 .await;
202
203 (peer, result)
204 }
205 }));
206 }
207 }
208 QueryStep::Success(response) => return Ok(response),
209 QueryStep::Failure(e) => {
210 peer_errors.insert(peer, e);
211 }
212 QueryStep::Continue => {}
213 },
214 Err(e) => {
215 e.report_if_unusual(peer, "RequestWithStrategy");
216 peer_errors.insert(peer, e);
217 }
218 }
219
220 if peer_errors.len() == peer_error_threshold {
221 return Err(FederationError::peer_errors(
222 method.clone(),
223 params.params.clone(),
224 peer_errors,
225 ));
226 }
227 }
228 }
229
230 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
231 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
232 &self,
233 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
234 method: String,
235 params: ApiRequestErased,
236 ) -> FR {
237 #[cfg(not(target_family = "wasm"))]
241 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
242 #[cfg(target_family = "wasm")]
243 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
244
245 for peer in self.all_peers() {
246 futures.push(Box::pin({
247 let method = &method;
248 let params = ¶ms;
249 async move {
250 let response = util::retry(
251 format!("api-request-{method}-{peer}"),
252 api_networking_backoff(),
253 || async {
254 self.request_single_peer(method.clone(), params.clone(), *peer)
255 .await
256 .inspect_err(|e| {
257 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
258 })
259 .map_err(|e| anyhow!(e.to_string()))
260 },
261 )
262 .await
263 .expect("Number of retries has no limit");
264
265 (*peer, response)
266 }
267 }));
268 }
269
270 loop {
271 let (peer, response) = match futures.next().await {
272 Some(t) => t,
273 None => pending().await,
274 };
275
276 match strategy.process(peer, response) {
277 QueryStep::Retry(peers) => {
278 for peer in peers {
279 futures.push(Box::pin({
280 let method = &method;
281 let params = ¶ms;
282 async move {
283 let response = util::retry(
284 format!("api-request-{method}-{peer}"),
285 api_networking_backoff(),
286 || async {
287 self.request_single_peer(
288 method.clone(),
289 params.clone(),
290 peer,
291 )
292 .await
293 .inspect_err(|err| {
294 if err.is_unusual() {
295 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
296 }
297 })
298 .map_err(|e| anyhow!(e.to_string()))
299 },
300 )
301 .await
302 .expect("Number of retries has no limit");
303
304 (peer, response)
305 }
306 }));
307 }
308 }
309 QueryStep::Success(response) => return response,
310 QueryStep::Failure(e) => {
311 warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
312 }
313 QueryStep::Continue => {}
314 }
315 }
316 }
317
318 async fn request_current_consensus<Ret>(
319 &self,
320 method: String,
321 params: ApiRequestErased,
322 ) -> FederationResult<Ret>
323 where
324 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
325 {
326 self.request_with_strategy(
327 ThresholdConsensus::new(self.all_peers().to_num_peers()),
328 method,
329 params,
330 )
331 .await
332 }
333
334 async fn request_current_consensus_retry<Ret>(
335 &self,
336 method: String,
337 params: ApiRequestErased,
338 ) -> Ret
339 where
340 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
341 {
342 self.request_with_strategy_retry(
343 ThresholdConsensus::new(self.all_peers().to_num_peers()),
344 method,
345 params,
346 )
347 .await
348 }
349
350 async fn request_admin<Ret>(
351 &self,
352 method: &str,
353 params: ApiRequestErased,
354 auth: ApiAuth,
355 ) -> FederationResult<Ret>
356 where
357 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
358 {
359 let Some(self_peer_id) = self.self_peer() else {
360 return Err(FederationError::general(
361 method,
362 params,
363 anyhow::format_err!("Admin peer_id not set"),
364 ));
365 };
366
367 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
368 .await
369 }
370
371 async fn request_admin_no_auth<Ret>(
372 &self,
373 method: &str,
374 params: ApiRequestErased,
375 ) -> FederationResult<Ret>
376 where
377 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
378 {
379 let Some(self_peer_id) = self.self_peer() else {
380 return Err(FederationError::general(
381 method,
382 params,
383 anyhow::format_err!("Admin peer_id not set"),
384 ));
385 };
386
387 self.request_single_peer_federation(method.into(), params, self_peer_id)
388 .await
389 }
390}
391
392#[apply(async_trait_maybe_send!)]
393impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
394
395pub trait IModuleFederationApi: IRawFederationApi {}
397
398dyn_newtype_define! {
399 #[derive(Clone)]
400 pub DynModuleApi(Arc<IModuleFederationApi>)
401}
402
403dyn_newtype_define! {
404 #[derive(Clone)]
405 pub DynGlobalApi(Arc<IGlobalFederationApi>)
406}
407
408impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
409 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
410 self.inner.as_ref()
411 }
412}
413
414impl DynGlobalApi {
415 pub fn new(
416 connectors: ConnectorRegistry,
417 peers: BTreeMap<PeerId, SafeUrl>,
418 api_secret: Option<&str>,
419 ) -> anyhow::Result<Self> {
420 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
421 connectors, peers, None, api_secret,
422 ))
423 .into())
424 }
425 pub fn new_admin(
426 connectors: ConnectorRegistry,
427 peer: PeerId,
428 url: SafeUrl,
429 api_secret: Option<&str>,
430 ) -> anyhow::Result<DynGlobalApi> {
431 Ok(GlobalFederationApiWithCache::new(FederationApi::new(
432 connectors,
433 [(peer, url)].into(),
434 Some(peer),
435 api_secret,
436 ))
437 .into())
438 }
439
440 pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
441 Self::new_admin(
444 connectors,
445 PeerId::from(1024),
446 url,
447 None,
449 )
450 }
451}
452
453#[apply(async_trait_maybe_send!)]
455pub trait IGlobalFederationApi: IRawFederationApi {
456 async fn submit_transaction(
457 &self,
458 tx: Transaction,
459 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
460
461 async fn await_block(
462 &self,
463 block_index: u64,
464 decoders: &ModuleDecoderRegistry,
465 ) -> anyhow::Result<SessionOutcome>;
466
467 async fn get_session_status(
468 &self,
469 block_index: u64,
470 decoders: &ModuleDecoderRegistry,
471 core_api_version: ApiVersion,
472 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
473 ) -> anyhow::Result<SessionStatus>;
474
475 async fn session_count(&self) -> FederationResult<u64>;
476
477 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
478
479 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
480
481 async fn download_backup(
482 &self,
483 id: &secp256k1::PublicKey,
484 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
485
486 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
490
491 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
492
493 async fn set_local_params(
494 &self,
495 name: String,
496 federation_name: Option<String>,
497 disable_base_fees: Option<bool>,
498 enabled_modules: Option<BTreeSet<ModuleKind>>,
499 auth: ApiAuth,
500 ) -> FederationResult<String>;
501
502 async fn add_peer_connection_info(
503 &self,
504 info: String,
505 auth: ApiAuth,
506 ) -> FederationResult<String>;
507
508 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
510
511 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
513
514 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
518
519 async fn status(&self) -> FederationResult<StatusResponse>;
521
522 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
524
525 async fn guardian_config_backup(&self, auth: ApiAuth)
527 -> FederationResult<GuardianConfigBackup>;
528
529 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
531
532 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
533
534 async fn submit_api_announcement(
536 &self,
537 peer_id: PeerId,
538 announcement: SignedApiAnnouncement,
539 ) -> FederationResult<()>;
540
541 async fn api_announcements(
542 &self,
543 guardian: PeerId,
544 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
545
546 async fn sign_api_announcement(
547 &self,
548 api_url: SafeUrl,
549 auth: ApiAuth,
550 ) -> FederationResult<SignedApiAnnouncement>;
551
552 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
553
554 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
556
557 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
559
560 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
563
564 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
567}
568
569pub fn deserialize_outcome<R>(
570 outcome: &SerdeOutputOutcome,
571 module_decoder: &Decoder,
572) -> OutputOutcomeResult<R>
573where
574 R: OutputOutcome + MaybeSend,
575{
576 let dyn_outcome = outcome
577 .try_into_inner_known_module_kind(module_decoder)
578 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
579
580 let source_instance = dyn_outcome.module_instance_id();
581
582 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
583 let target_type = std::any::type_name::<R>();
584 OutputOutcomeError::ResponseDeserialization(anyhow!(
585 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
586 ))
587 })
588}
589
590#[derive(Clone, Debug)]
601pub struct FederationApi {
602 peers: BTreeMap<PeerId, SafeUrl>,
604 peers_keys: BTreeSet<PeerId>,
606 admin_id: Option<PeerId>,
608 module_id: Option<ModuleInstanceId>,
610 api_secret: Option<String>,
612 connection_pool: ConnectionPool<dyn IGuardianConnection>,
614}
615
616impl FederationApi {
617 pub fn new(
618 connectors: ConnectorRegistry,
619 peers: BTreeMap<PeerId, SafeUrl>,
620 admin_peer_id: Option<PeerId>,
621 api_secret: Option<&str>,
622 ) -> Self {
623 Self {
624 peers_keys: peers.keys().copied().collect(),
625 peers,
626 admin_id: admin_peer_id,
627 module_id: None,
628 api_secret: api_secret.map(ToOwned::to_owned),
629 connection_pool: ConnectionPool::new(connectors),
630 }
631 }
632
633 async fn get_or_create_connection(
634 &self,
635 url: &SafeUrl,
636 api_secret: Option<&str>,
637 ) -> ServerResult<DynGuaridianConnection> {
638 self.connection_pool
639 .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
640 let conn = connectors
641 .connect_guardian(&url, api_secret.as_deref())
642 .await?;
643 Ok(conn)
644 })
645 .await
646 }
647
648 async fn request(
649 &self,
650 peer: PeerId,
651 method: ApiMethod,
652 request: ApiRequestErased,
653 ) -> ServerResult<Value> {
654 trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
655 let url = self
656 .peers
657 .get(&peer)
658 .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
659 let conn = self
660 .get_or_create_connection(url, self.api_secret.as_deref())
661 .await
662 .context("Failed to connect to peer")
663 .map_err(ServerError::Connection)?;
664 let res = conn.request(method.clone(), request).await;
665
666 trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
667
668 res
669 }
670
671 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
675 self.connection_pool.get_active_connection_receiver()
676 }
677}
678
679impl IModuleFederationApi for FederationApi {}
680
681#[apply(async_trait_maybe_send!)]
682impl IRawFederationApi for FederationApi {
683 fn all_peers(&self) -> &BTreeSet<PeerId> {
684 &self.peers_keys
685 }
686
687 fn self_peer(&self) -> Option<PeerId> {
688 self.admin_id
689 }
690
691 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
692 FederationApi {
693 api_secret: self.api_secret.clone(),
694 peers: self.peers.clone(),
695 peers_keys: self.peers_keys.clone(),
696 admin_id: self.admin_id,
697 module_id: Some(id),
698 connection_pool: self.connection_pool.clone(),
699 }
700 .into()
701 }
702
703 #[instrument(
704 target = LOG_CLIENT_NET_API,
705 skip_all,
706 fields(
707 peer_id = %peer_id,
708 method = %method,
709 params = %params.params,
710 )
711 )]
712 async fn request_raw(
713 &self,
714 peer_id: PeerId,
715 method: &str,
716 params: &ApiRequestErased,
717 ) -> ServerResult<Value> {
718 let method = match self.module_id {
719 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
720 None => ApiMethod::Core(method.to_string()),
721 };
722
723 self.request(peer_id, method, params.clone()).await
724 }
725
726 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
727 let peers = self.peers.clone();
728
729 WatchStream::new(self.connection_pool.get_active_connection_receiver())
730 .map(move |active_urls| {
731 peers
732 .iter()
733 .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
734 .collect()
735 })
736 .boxed()
737 }
738 async fn wait_for_initialized_connections(&self) {
739 self.connection_pool
740 .wait_for_initialized_connections()
741 .await;
742 }
743}
744
745#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
747pub struct LegacyFederationStatus {
748 pub session_count: u64,
749 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
750 pub peers_online: u64,
751 pub peers_offline: u64,
752 pub peers_flagged: u64,
755 pub scheduled_shutdown: Option<u64>,
756}
757
758#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
759pub struct LegacyPeerStatus {
760 pub last_contribution: Option<u64>,
761 pub connection_status: LegacyP2PConnectionStatus,
762 pub flagged: bool,
765}
766
767#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
768#[serde(rename_all = "snake_case")]
769pub enum LegacyP2PConnectionStatus {
770 #[default]
771 Disconnected,
772 Connected,
773}
774
775#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
776pub struct StatusResponse {
777 pub server: ServerStatusLegacy,
778 pub federation: Option<LegacyFederationStatus>,
779}
780
781#[cfg(test)]
782mod tests;