1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::ServerResult;
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::backup::SignedBackupRequest;
12use fedimint_core::core::{ModuleInstanceId, ModuleKind};
13use fedimint_core::endpoint_constants::{
14 ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16 BACKUP_STATISTICS_ENDPOINT, CHAIN_ID_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
17 FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
18 GUARDIAN_METADATA_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
19 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
20 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
21 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
22 SIGN_API_ANNOUNCEMENT_ENDPOINT, SIGN_GUARDIAN_METADATA_ENDPOINT, START_DKG_ENDPOINT,
23 STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_GUARDIAN_METADATA_ENDPOINT,
24 SUBMIT_TRANSACTION_ENDPOINT,
25};
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::audit::AuditSummary;
28use fedimint_core::module::registry::ModuleDecoderRegistry;
29use fedimint_core::module::{
30 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
31};
32use fedimint_core::net::api_announcement::{
33 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
34};
35use fedimint_core::session_outcome::{
36 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
37};
38use fedimint_core::task::{MaybeSend, MaybeSync};
39use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
40use fedimint_core::util::SafeUrl;
41use fedimint_core::{ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
42use fedimint_logging::LOG_CLIENT_NET_API;
43use futures::future::join_all;
44use futures::stream::BoxStream;
45use itertools::Itertools;
46use rand::seq::SliceRandom;
47use serde_json::Value;
48use tokio::sync::OnceCell;
49use tracing::{debug, trace};
50
51use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
52use crate::api::{
53 FederationApiExt, FederationError, FederationResult,
54 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
55};
56use crate::query::FilterMapThreshold;
57
58pub trait GlobalFederationApiWithCacheExt
61where
62 Self: Sized,
63{
64 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72 GlobalFederationApiWithCache::new(self)
73 }
74}
75
76#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82 pub(crate) inner: T,
83 pub(crate) await_session_lru:
93 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
94
95 pub(crate) get_session_status_lru:
103 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
104}
105
106impl<T> GlobalFederationApiWithCache<T> {
107 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
108 Self {
109 inner,
110 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111 NonZeroUsize::new(512).expect("is non-zero"),
112 ))),
113 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
114 NonZeroUsize::new(512).expect("is non-zero"),
115 ))),
116 }
117 }
118}
119
120impl<T> GlobalFederationApiWithCache<T>
121where
122 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
123{
124 pub(crate) async fn await_block_raw(
125 &self,
126 block_index: u64,
127 decoders: &ModuleDecoderRegistry,
128 ) -> anyhow::Result<SessionOutcome> {
129 if block_index % 100 == 0 {
130 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131 } else {
132 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
133 }
134 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
135 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
136 ApiRequestErased::new(block_index),
137 )
138 .await?
139 .try_into_inner(decoders)
140 .map_err(|e| anyhow!(e.to_string()))
141 }
142
143 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
144 let mut peers = self.all_peers().iter().copied().collect_vec();
145 peers.shuffle(&mut rand::thread_rng());
146 peers.into_iter()
147 }
148
149 pub(crate) async fn get_session_status_raw_v2(
150 &self,
151 block_index: u64,
152 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
153 decoders: &ModuleDecoderRegistry,
154 ) -> anyhow::Result<SessionStatus> {
155 if block_index % 100 == 0 {
156 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
157 } else {
158 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
159 }
160 let params = ApiRequestErased::new(block_index);
161 let mut last_error = None;
162 for peer_id in self.select_peers_for_status() {
164 match self
165 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
166 SESSION_STATUS_V2_ENDPOINT.to_string(),
167 params.clone(),
168 peer_id,
169 )
170 .await
171 .map_err(anyhow::Error::from)
172 .and_then(|s| Ok(s.try_into_inner(decoders)?))
173 {
174 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
175 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
176 return Ok(SessionStatus::Complete(
178 signed_session_outcome.session_outcome,
179 ));
180 }
181 last_error = Some(format_err!("Invalid signature"));
182 }
183 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
184 return self.get_session_status_raw(block_index, decoders).await;
186 }
187 Err(err) => {
188 last_error = Some(err);
189 }
190 }
191 assert!(last_error.is_some());
193 }
194 Err(last_error.expect("must have at least one peer"))
195 }
196
197 pub(crate) async fn get_session_status_raw(
198 &self,
199 block_index: u64,
200 decoders: &ModuleDecoderRegistry,
201 ) -> anyhow::Result<SessionStatus> {
202 if block_index % 100 == 0 {
203 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
204 } else {
205 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
206 }
207 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
208 SESSION_STATUS_ENDPOINT.to_string(),
209 ApiRequestErased::new(block_index),
210 )
211 .await?
212 .try_into_inner(&decoders.clone().with_fallback())
213 .map_err(|e| anyhow!(e))
214 }
215}
216
217#[apply(async_trait_maybe_send!)]
218impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
219where
220 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
221{
222 fn all_peers(&self) -> &BTreeSet<PeerId> {
223 self.inner.all_peers()
224 }
225
226 fn self_peer(&self) -> Option<PeerId> {
227 self.inner.self_peer()
228 }
229
230 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
231 self.inner.with_module(id)
232 }
233
234 async fn request_raw(
236 &self,
237 peer_id: PeerId,
238 method: &str,
239 params: &ApiRequestErased,
240 ) -> ServerResult<Value> {
241 self.inner.request_raw(peer_id, method, params).await
242 }
243
244 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
245 self.inner.connection_status_stream()
246 }
247
248 async fn wait_for_initialized_connections(&self) {
249 self.inner.wait_for_initialized_connections().await;
250 }
251}
252
253#[apply(async_trait_maybe_send!)]
254impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
255where
256 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
257{
258 async fn await_block(
259 &self,
260 session_idx: u64,
261 decoders: &ModuleDecoderRegistry,
262 ) -> anyhow::Result<SessionOutcome> {
263 let mut lru_lock = self.await_session_lru.lock().await;
264
265 let entry_arc = lru_lock
266 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
267 .clone();
268
269 drop(lru_lock);
271
272 entry_arc
273 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
274 .await
275 .cloned()
276 }
277
278 async fn get_session_status(
279 &self,
280 session_idx: u64,
281 decoders: &ModuleDecoderRegistry,
282 core_api_version: ApiVersion,
283 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
284 ) -> anyhow::Result<SessionStatus> {
285 let mut lru_lock = self.get_session_status_lru.lock().await;
286
287 let entry_arc = lru_lock
288 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
289 .clone();
290
291 drop(lru_lock);
293
294 enum NoCacheErr {
295 Initial,
296 Pending(Vec<AcceptedItem>),
297 Err(anyhow::Error),
298 }
299 match entry_arc
300 .get_or_try_init(|| async {
301 let session_status =
302 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
303 self.get_session_status_raw(session_idx, decoders).await
304 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
305 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
306 .await
307 } else {
308 self.get_session_status_raw(session_idx, decoders).await
309 };
310 match session_status {
311 Err(e) => Err(NoCacheErr::Err(e)),
312 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
313 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
314 Ok(SessionStatus::Complete(s)) => Ok(s),
316 }
317 })
318 .await
319 .cloned()
320 {
321 Ok(s) => Ok(SessionStatus::Complete(s)),
322 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
323 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
324 Err(NoCacheErr::Err(e)) => Err(e),
325 }
326 }
327
328 async fn submit_transaction(
329 &self,
330 tx: Transaction,
331 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
332 self.request_current_consensus_retry(
333 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
334 ApiRequestErased::new(SerdeTransaction::from(&tx)),
335 )
336 .await
337 }
338
339 async fn session_count(&self) -> FederationResult<u64> {
340 self.request_current_consensus(
341 SESSION_COUNT_ENDPOINT.to_owned(),
342 ApiRequestErased::default(),
343 )
344 .await
345 }
346
347 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
348 self.request_current_consensus_retry(
349 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
350 ApiRequestErased::new(txid),
351 )
352 .await
353 }
354
355 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
356 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
357 .await
358 }
359
360 async fn download_backup(
361 &self,
362 id: &secp256k1::PublicKey,
363 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
364 self.request_with_strategy(
365 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
366 RECOVER_ENDPOINT.to_owned(),
367 ApiRequestErased::new(id),
368 )
369 .await
370 }
371
372 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
373 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
374 .await
375 }
376
377 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
378 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
379 .await
380 }
381
382 async fn set_local_params(
383 &self,
384 name: String,
385 federation_name: Option<String>,
386 disable_base_fees: Option<bool>,
387 enabled_modules: Option<BTreeSet<ModuleKind>>,
388 auth: ApiAuth,
389 ) -> FederationResult<String> {
390 self.request_admin(
391 SET_LOCAL_PARAMS_ENDPOINT,
392 ApiRequestErased::new(SetLocalParamsRequest {
393 name,
394 federation_name,
395 disable_base_fees,
396 enabled_modules,
397 }),
398 auth,
399 )
400 .await
401 }
402
403 async fn add_peer_connection_info(
404 &self,
405 info: String,
406 auth: ApiAuth,
407 ) -> FederationResult<String> {
408 self.request_admin(
409 ADD_PEER_SETUP_CODE_ENDPOINT,
410 ApiRequestErased::new(info),
411 auth,
412 )
413 .await
414 }
415
416 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
417 self.request_admin(
418 RESET_PEER_SETUP_CODES_ENDPOINT,
419 ApiRequestErased::default(),
420 auth,
421 )
422 .await
423 }
424
425 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
426 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
427 .await
428 }
429
430 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
431 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
432 .await
433 }
434
435 async fn status(&self) -> FederationResult<StatusResponse> {
436 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
437 .await
438 }
439
440 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
441 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
442 .await
443 }
444
445 async fn guardian_config_backup(
446 &self,
447 auth: ApiAuth,
448 ) -> FederationResult<GuardianConfigBackup> {
449 self.request_admin(
450 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
451 ApiRequestErased::default(),
452 auth,
453 )
454 .await
455 }
456
457 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
458 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
459 .await
460 }
461
462 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
463 self.request_admin(
464 RESTART_FEDERATION_SETUP_ENDPOINT,
465 ApiRequestErased::default(),
466 auth,
467 )
468 .await
469 }
470
471 async fn submit_api_announcement(
472 &self,
473 announcement_peer_id: PeerId,
474 announcement: SignedApiAnnouncement,
475 ) -> FederationResult<()> {
476 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
477 let announcement_inner = announcement.clone();
478 async move {
479 (
480 peer_id,
481 self.request_single_peer::<()>(
482 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
483 ApiRequestErased::new(SignedApiAnnouncementSubmission {
484 signed_api_announcement: announcement_inner,
485 peer_id: announcement_peer_id,
486 }),
487 peer_id,
488 )
489 .await,
490 )
491 }
492 }))
493 .await
494 .into_iter()
495 .filter_map(|(peer_id, result)| match result {
496 Ok(()) => None,
497 Err(e) => Some((peer_id, e)),
498 })
499 .collect::<BTreeMap<_, _>>();
500
501 if peer_errors.is_empty() {
502 Ok(())
503 } else {
504 Err(FederationError {
505 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
506 params: serde_json::to_value(announcement).expect("can be serialized"),
507 general: None,
508 peer_errors,
509 })
510 }
511 }
512
513 async fn api_announcements(
514 &self,
515 guardian: PeerId,
516 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
517 self.request_single_peer(
518 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
519 ApiRequestErased::default(),
520 guardian,
521 )
522 .await
523 }
524
525 async fn sign_api_announcement(
526 &self,
527 api_url: SafeUrl,
528 auth: ApiAuth,
529 ) -> FederationResult<SignedApiAnnouncement> {
530 self.request_admin(
531 SIGN_API_ANNOUNCEMENT_ENDPOINT,
532 ApiRequestErased::new(api_url),
533 auth,
534 )
535 .await
536 }
537
538 async fn submit_guardian_metadata(
539 &self,
540 announcement_peer_id: PeerId,
541 metadata: fedimint_core::net::guardian_metadata::SignedGuardianMetadata,
542 ) -> FederationResult<()> {
543 use fedimint_core::net::guardian_metadata::SignedGuardianMetadataSubmission;
544 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
545 let metadata_inner = metadata.clone();
546 async move {
547 (
548 peer_id,
549 self.request_single_peer::<()>(
550 SUBMIT_GUARDIAN_METADATA_ENDPOINT.into(),
551 ApiRequestErased::new(SignedGuardianMetadataSubmission {
552 signed_guardian_metadata: metadata_inner,
553 peer_id: announcement_peer_id,
554 }),
555 peer_id,
556 )
557 .await,
558 )
559 }
560 }))
561 .await
562 .into_iter()
563 .filter_map(|(peer_id, result)| match result {
564 Ok(()) => None,
565 Err(e) => Some((peer_id, e)),
566 })
567 .collect::<BTreeMap<_, _>>();
568
569 if peer_errors.is_empty() {
570 Ok(())
571 } else {
572 Err(FederationError {
573 method: SUBMIT_GUARDIAN_METADATA_ENDPOINT.to_string(),
574 params: serde_json::to_value(&metadata).expect("can be serialized"),
575 general: None,
576 peer_errors,
577 })
578 }
579 }
580
581 async fn guardian_metadata(
582 &self,
583 guardian: PeerId,
584 ) -> ServerResult<BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata>>
585 {
586 self.request_single_peer(
587 GUARDIAN_METADATA_ENDPOINT.to_owned(),
588 ApiRequestErased::default(),
589 guardian,
590 )
591 .await
592 }
593
594 async fn sign_guardian_metadata(
595 &self,
596 metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
597 auth: ApiAuth,
598 ) -> FederationResult<fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
599 self.request_admin(
600 SIGN_GUARDIAN_METADATA_ENDPOINT,
601 ApiRequestErased::new(metadata),
602 auth,
603 )
604 .await
605 }
606
607 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
608 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
609 .await
610 }
611
612 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
613 self.request_admin(
614 BACKUP_STATISTICS_ENDPOINT,
615 ApiRequestErased::default(),
616 auth,
617 )
618 .await
619 }
620
621 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
622 self.request_single_peer(
623 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
624 ApiRequestErased::default(),
625 peer_id,
626 )
627 .await
628 }
629
630 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
631 self.request_single_peer(
632 INVITE_CODE_ENDPOINT.to_owned(),
633 ApiRequestErased::default(),
634 guardian,
635 )
636 .await
637 }
638
639 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
640 self.request_admin(
641 CHANGE_PASSWORD_ENDPOINT,
642 ApiRequestErased::new(new_password),
643 auth,
644 )
645 .await
646 }
647
648 async fn chain_id(&self) -> FederationResult<ChainId> {
649 self.request_current_consensus(CHAIN_ID_ENDPOINT.to_owned(), ApiRequestErased::default())
650 .await
651 }
652}