1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::{DynGuaridianConnection, ServerResult};
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::backup::SignedBackupRequest;
12use fedimint_core::core::{ModuleInstanceId, ModuleKind};
13use fedimint_core::endpoint_constants::{
14 ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16 BACKUP_STATISTICS_ENDPOINT, CHAIN_ID_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
17 FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
18 GUARDIAN_METADATA_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
19 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
20 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
21 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
22 SIGN_API_ANNOUNCEMENT_ENDPOINT, SIGN_GUARDIAN_METADATA_ENDPOINT, START_DKG_ENDPOINT,
23 STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_GUARDIAN_METADATA_ENDPOINT,
24 SUBMIT_TRANSACTION_ENDPOINT,
25};
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::audit::AuditSummary;
28use fedimint_core::module::registry::ModuleDecoderRegistry;
29use fedimint_core::module::{
30 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
31};
32use fedimint_core::net::api_announcement::{
33 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
34};
35use fedimint_core::session_outcome::{
36 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
37};
38use fedimint_core::task::{MaybeSend, MaybeSync};
39use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
40use fedimint_core::util::SafeUrl;
41use fedimint_core::{ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
42use fedimint_logging::LOG_CLIENT_NET_API;
43use futures::future::join_all;
44use futures::stream::BoxStream;
45use itertools::Itertools;
46use rand::seq::SliceRandom;
47use serde_json::Value;
48use tokio::sync::OnceCell;
49use tracing::{debug, trace};
50
51use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
52use crate::api::{
53 FederationApiExt, FederationError, FederationResult,
54 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
55};
56use crate::query::FilterMapThreshold;
57
58pub trait GlobalFederationApiWithCacheExt
61where
62 Self: Sized,
63{
64 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72 GlobalFederationApiWithCache::new(self)
73 }
74}
75
76#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82 pub(crate) inner: T,
83 pub(crate) await_session_lru:
93 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
94
95 pub(crate) get_session_status_lru:
103 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
104}
105
106impl<T> GlobalFederationApiWithCache<T> {
107 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
108 Self {
109 inner,
110 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111 NonZeroUsize::new(512).expect("is non-zero"),
112 ))),
113 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
114 NonZeroUsize::new(512).expect("is non-zero"),
115 ))),
116 }
117 }
118}
119
120impl<T> GlobalFederationApiWithCache<T>
121where
122 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
123{
124 pub(crate) async fn await_block_raw(
125 &self,
126 block_index: u64,
127 decoders: &ModuleDecoderRegistry,
128 ) -> anyhow::Result<SessionOutcome> {
129 if block_index.is_multiple_of(100) {
130 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131 } else {
132 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
133 }
134 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
135 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
136 ApiRequestErased::new(block_index),
137 )
138 .await?
139 .try_into_inner(decoders)
140 .map_err(|e| anyhow!(e.to_string()))
141 }
142
143 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
144 let mut peers = self.all_peers().iter().copied().collect_vec();
145 peers.shuffle(&mut rand::thread_rng());
146 peers.into_iter()
147 }
148
149 pub(crate) async fn get_session_status_raw_v2(
150 &self,
151 block_index: u64,
152 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
153 decoders: &ModuleDecoderRegistry,
154 ) -> anyhow::Result<SessionStatus> {
155 if block_index.is_multiple_of(100) {
156 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
157 } else {
158 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
159 }
160 let params = ApiRequestErased::new(block_index);
161 let mut last_error = None;
162 for peer_id in self.select_peers_for_status() {
164 match self
165 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
166 SESSION_STATUS_V2_ENDPOINT.to_string(),
167 params.clone(),
168 peer_id,
169 )
170 .await
171 .map_err(anyhow::Error::from)
172 .and_then(|s| Ok(s.try_into_inner(decoders)?))
173 {
174 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
175 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
176 return Ok(SessionStatus::Complete(
178 signed_session_outcome.session_outcome,
179 ));
180 }
181 last_error = Some(format_err!("Invalid signature"));
182 }
183 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
184 return self.get_session_status_raw(block_index, decoders).await;
186 }
187 Err(err) => {
188 last_error = Some(err);
189 }
190 }
191 assert!(last_error.is_some());
193 }
194 Err(last_error.expect("must have at least one peer"))
195 }
196
197 pub(crate) async fn get_session_status_raw(
198 &self,
199 block_index: u64,
200 decoders: &ModuleDecoderRegistry,
201 ) -> anyhow::Result<SessionStatus> {
202 if block_index.is_multiple_of(100) {
203 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
204 } else {
205 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
206 }
207 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
208 SESSION_STATUS_ENDPOINT.to_string(),
209 ApiRequestErased::new(block_index),
210 )
211 .await?
212 .try_into_inner(&decoders.clone().with_fallback())
213 .map_err(|e| anyhow!(e))
214 }
215}
216
217#[apply(async_trait_maybe_send!)]
218impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
219where
220 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
221{
222 fn all_peers(&self) -> &BTreeSet<PeerId> {
223 self.inner.all_peers()
224 }
225
226 fn self_peer(&self) -> Option<PeerId> {
227 self.inner.self_peer()
228 }
229
230 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
231 self.inner.with_module(id)
232 }
233
234 async fn request_raw(
236 &self,
237 peer_id: PeerId,
238 method: &str,
239 params: &ApiRequestErased,
240 ) -> ServerResult<Value> {
241 self.inner.request_raw(peer_id, method, params).await
242 }
243
244 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
245 self.inner.connection_status_stream()
246 }
247
248 async fn wait_for_initialized_connections(&self) {
249 self.inner.wait_for_initialized_connections().await;
250 }
251
252 async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection> {
253 self.inner.get_peer_connection(peer_id).await
254 }
255}
256
257#[apply(async_trait_maybe_send!)]
258impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
259where
260 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
261{
262 async fn await_block(
263 &self,
264 session_idx: u64,
265 decoders: &ModuleDecoderRegistry,
266 ) -> anyhow::Result<SessionOutcome> {
267 let mut lru_lock = self.await_session_lru.lock().await;
268
269 let entry_arc = lru_lock
270 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
271 .clone();
272
273 drop(lru_lock);
275
276 entry_arc
277 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
278 .await
279 .cloned()
280 }
281
282 async fn get_session_status(
283 &self,
284 session_idx: u64,
285 decoders: &ModuleDecoderRegistry,
286 core_api_version: ApiVersion,
287 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
288 ) -> anyhow::Result<SessionStatus> {
289 let mut lru_lock = self.get_session_status_lru.lock().await;
290
291 let entry_arc = lru_lock
292 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
293 .clone();
294
295 drop(lru_lock);
297
298 enum NoCacheErr {
299 Initial,
300 Pending(Vec<AcceptedItem>),
301 Err(anyhow::Error),
302 }
303 match entry_arc
304 .get_or_try_init(|| async {
305 let session_status =
306 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
307 self.get_session_status_raw(session_idx, decoders).await
308 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
309 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
310 .await
311 } else {
312 self.get_session_status_raw(session_idx, decoders).await
313 };
314 match session_status {
315 Err(e) => Err(NoCacheErr::Err(e)),
316 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
317 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
318 Ok(SessionStatus::Complete(s)) => Ok(s),
320 }
321 })
322 .await
323 .cloned()
324 {
325 Ok(s) => Ok(SessionStatus::Complete(s)),
326 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
327 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
328 Err(NoCacheErr::Err(e)) => Err(e),
329 }
330 }
331
332 async fn submit_transaction(
333 &self,
334 tx: Transaction,
335 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
336 self.request_current_consensus_retry(
337 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
338 ApiRequestErased::new(SerdeTransaction::from(&tx)),
339 )
340 .await
341 }
342
343 async fn session_count(&self) -> FederationResult<u64> {
344 self.request_current_consensus(
345 SESSION_COUNT_ENDPOINT.to_owned(),
346 ApiRequestErased::default(),
347 )
348 .await
349 }
350
351 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
352 self.request_current_consensus_retry(
353 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
354 ApiRequestErased::new(txid),
355 )
356 .await
357 }
358
359 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
360 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
361 .await
362 }
363
364 async fn download_backup(
365 &self,
366 id: &secp256k1::PublicKey,
367 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
368 self.request_with_strategy(
369 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
370 RECOVER_ENDPOINT.to_owned(),
371 ApiRequestErased::new(id),
372 )
373 .await
374 }
375
376 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
377 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
378 .await
379 }
380
381 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
382 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
383 .await
384 }
385
386 async fn set_local_params(
387 &self,
388 name: String,
389 federation_name: Option<String>,
390 disable_base_fees: Option<bool>,
391 enabled_modules: Option<BTreeSet<ModuleKind>>,
392 federation_size: Option<u32>,
393 auth: ApiAuth,
394 ) -> FederationResult<String> {
395 self.request_admin(
396 SET_LOCAL_PARAMS_ENDPOINT,
397 ApiRequestErased::new(SetLocalParamsRequest {
398 name,
399 federation_name,
400 disable_base_fees,
401 enabled_modules,
402 federation_size,
403 }),
404 auth,
405 )
406 .await
407 }
408
409 async fn add_peer_connection_info(
410 &self,
411 info: String,
412 auth: ApiAuth,
413 ) -> FederationResult<String> {
414 self.request_admin(
415 ADD_PEER_SETUP_CODE_ENDPOINT,
416 ApiRequestErased::new(info),
417 auth,
418 )
419 .await
420 }
421
422 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
423 self.request_admin(
424 RESET_PEER_SETUP_CODES_ENDPOINT,
425 ApiRequestErased::default(),
426 auth,
427 )
428 .await
429 }
430
431 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
432 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
433 .await
434 }
435
436 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
437 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
438 .await
439 }
440
441 async fn status(&self) -> FederationResult<StatusResponse> {
442 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
443 .await
444 }
445
446 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
447 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
448 .await
449 }
450
451 async fn guardian_config_backup(
452 &self,
453 auth: ApiAuth,
454 ) -> FederationResult<GuardianConfigBackup> {
455 self.request_admin(
456 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
457 ApiRequestErased::default(),
458 auth,
459 )
460 .await
461 }
462
463 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
464 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
465 .await
466 }
467
468 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
469 self.request_admin(
470 RESTART_FEDERATION_SETUP_ENDPOINT,
471 ApiRequestErased::default(),
472 auth,
473 )
474 .await
475 }
476
477 async fn submit_api_announcement(
478 &self,
479 announcement_peer_id: PeerId,
480 announcement: SignedApiAnnouncement,
481 ) -> FederationResult<()> {
482 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
483 let announcement_inner = announcement.clone();
484 async move {
485 (
486 peer_id,
487 self.request_single_peer::<()>(
488 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
489 ApiRequestErased::new(SignedApiAnnouncementSubmission {
490 signed_api_announcement: announcement_inner,
491 peer_id: announcement_peer_id,
492 }),
493 peer_id,
494 )
495 .await,
496 )
497 }
498 }))
499 .await
500 .into_iter()
501 .filter_map(|(peer_id, result)| match result {
502 Ok(()) => None,
503 Err(e) => Some((peer_id, e)),
504 })
505 .collect::<BTreeMap<_, _>>();
506
507 if peer_errors.is_empty() {
508 Ok(())
509 } else {
510 Err(FederationError {
511 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
512 params: serde_json::to_value(announcement).expect("can be serialized"),
513 general: None,
514 peer_errors,
515 })
516 }
517 }
518
519 async fn api_announcements(
520 &self,
521 guardian: PeerId,
522 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
523 self.request_single_peer(
524 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
525 ApiRequestErased::default(),
526 guardian,
527 )
528 .await
529 }
530
531 async fn sign_api_announcement(
532 &self,
533 api_url: SafeUrl,
534 auth: ApiAuth,
535 ) -> FederationResult<SignedApiAnnouncement> {
536 self.request_admin(
537 SIGN_API_ANNOUNCEMENT_ENDPOINT,
538 ApiRequestErased::new(api_url),
539 auth,
540 )
541 .await
542 }
543
544 async fn submit_guardian_metadata(
545 &self,
546 announcement_peer_id: PeerId,
547 metadata: fedimint_core::net::guardian_metadata::SignedGuardianMetadata,
548 ) -> FederationResult<()> {
549 use fedimint_core::net::guardian_metadata::SignedGuardianMetadataSubmission;
550 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
551 let metadata_inner = metadata.clone();
552 async move {
553 (
554 peer_id,
555 self.request_single_peer::<()>(
556 SUBMIT_GUARDIAN_METADATA_ENDPOINT.into(),
557 ApiRequestErased::new(SignedGuardianMetadataSubmission {
558 signed_guardian_metadata: metadata_inner,
559 peer_id: announcement_peer_id,
560 }),
561 peer_id,
562 )
563 .await,
564 )
565 }
566 }))
567 .await
568 .into_iter()
569 .filter_map(|(peer_id, result)| match result {
570 Ok(()) => None,
571 Err(e) => Some((peer_id, e)),
572 })
573 .collect::<BTreeMap<_, _>>();
574
575 if peer_errors.is_empty() {
576 Ok(())
577 } else {
578 Err(FederationError {
579 method: SUBMIT_GUARDIAN_METADATA_ENDPOINT.to_string(),
580 params: serde_json::to_value(&metadata).expect("can be serialized"),
581 general: None,
582 peer_errors,
583 })
584 }
585 }
586
587 async fn guardian_metadata(
588 &self,
589 guardian: PeerId,
590 ) -> ServerResult<BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata>>
591 {
592 self.request_single_peer(
593 GUARDIAN_METADATA_ENDPOINT.to_owned(),
594 ApiRequestErased::default(),
595 guardian,
596 )
597 .await
598 }
599
600 async fn sign_guardian_metadata(
601 &self,
602 metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
603 auth: ApiAuth,
604 ) -> FederationResult<fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
605 self.request_admin(
606 SIGN_GUARDIAN_METADATA_ENDPOINT,
607 ApiRequestErased::new(metadata),
608 auth,
609 )
610 .await
611 }
612
613 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
614 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
615 .await
616 }
617
618 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
619 self.request_admin(
620 BACKUP_STATISTICS_ENDPOINT,
621 ApiRequestErased::default(),
622 auth,
623 )
624 .await
625 }
626
627 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
628 self.request_single_peer(
629 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
630 ApiRequestErased::default(),
631 peer_id,
632 )
633 .await
634 }
635
636 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
637 self.request_single_peer(
638 INVITE_CODE_ENDPOINT.to_owned(),
639 ApiRequestErased::default(),
640 guardian,
641 )
642 .await
643 }
644
645 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
646 self.request_admin(
647 CHANGE_PASSWORD_ENDPOINT,
648 ApiRequestErased::new(new_password),
649 auth,
650 )
651 .await
652 }
653
654 async fn chain_id(&self) -> FederationResult<ChainId> {
655 self.request_current_consensus(CHAIN_ID_ENDPOINT.to_owned(), ApiRequestErased::default())
656 .await
657 }
658}