1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::{DynGuaridianConnection, PeerStatus, ServerResult};
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::backup::SignedBackupRequest;
12use fedimint_core::core::{ModuleInstanceId, ModuleKind};
13use fedimint_core::endpoint_constants::{
14 ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16 BACKUP_STATISTICS_ENDPOINT, CHAIN_ID_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
17 FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
18 GUARDIAN_METADATA_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
19 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
20 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
21 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
22 SIGN_API_ANNOUNCEMENT_ENDPOINT, SIGN_GUARDIAN_METADATA_ENDPOINT, START_DKG_ENDPOINT,
23 STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_GUARDIAN_METADATA_ENDPOINT,
24 SUBMIT_TRANSACTION_ENDPOINT,
25};
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::audit::AuditSummary;
28use fedimint_core::module::registry::ModuleDecoderRegistry;
29use fedimint_core::module::{
30 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
31};
32use fedimint_core::net::api_announcement::{
33 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
34};
35use fedimint_core::session_outcome::{
36 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
37};
38use fedimint_core::task::{MaybeSend, MaybeSync};
39use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
40use fedimint_core::util::SafeUrl;
41use fedimint_core::{ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
42use fedimint_logging::LOG_CLIENT_NET_API;
43use futures::future::join_all;
44use futures::stream::BoxStream;
45use itertools::Itertools;
46use rand::seq::SliceRandom;
47use serde_json::Value;
48use tokio::sync::OnceCell;
49use tracing::{debug, trace};
50
51use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
52use crate::api::{
53 FederationApiExt, FederationError, FederationResult,
54 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
55};
56use crate::query::FilterMapThreshold;
57
58pub trait GlobalFederationApiWithCacheExt
61where
62 Self: Sized,
63{
64 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72 GlobalFederationApiWithCache::new(self)
73 }
74}
75
76#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82 pub(crate) inner: T,
83 pub(crate) await_session_lru:
93 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
94
95 pub(crate) get_session_status_lru:
103 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
104}
105
106impl<T> GlobalFederationApiWithCache<T> {
107 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
108 Self {
109 inner,
110 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111 NonZeroUsize::new(512).expect("is non-zero"),
112 ))),
113 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
114 NonZeroUsize::new(512).expect("is non-zero"),
115 ))),
116 }
117 }
118}
119
120impl<T> GlobalFederationApiWithCache<T>
121where
122 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
123{
124 pub(crate) async fn await_block_raw(
125 &self,
126 block_index: u64,
127 decoders: &ModuleDecoderRegistry,
128 ) -> anyhow::Result<SessionOutcome> {
129 if block_index.is_multiple_of(100) {
130 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131 } else {
132 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
133 }
134 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
135 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
136 ApiRequestErased::new(block_index),
137 )
138 .await?
139 .try_into_inner(decoders)
140 .map_err(|e| anyhow!(e.to_string()))
141 }
142
143 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
144 let mut peers = self.all_peers().iter().copied().collect_vec();
145 peers.shuffle(&mut rand::thread_rng());
146 peers.into_iter()
147 }
148
149 pub(crate) async fn get_session_status_raw_v2(
150 &self,
151 block_index: u64,
152 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
153 decoders: &ModuleDecoderRegistry,
154 ) -> anyhow::Result<SessionStatus> {
155 if block_index.is_multiple_of(100) {
156 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
157 } else {
158 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
159 }
160 let params = ApiRequestErased::new(block_index);
161 let mut last_error = None;
162 for peer_id in self.select_peers_for_status() {
164 match self
165 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
166 SESSION_STATUS_V2_ENDPOINT.to_string(),
167 params.clone(),
168 peer_id,
169 )
170 .await
171 .map_err(anyhow::Error::from)
172 .and_then(|s| Ok(s.try_into_inner(decoders)?))
173 {
174 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
175 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
176 return Ok(SessionStatus::Complete(
178 signed_session_outcome.session_outcome,
179 ));
180 }
181 last_error = Some(format_err!("Invalid signature"));
182 }
183 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
184 return self.get_session_status_raw(block_index, decoders).await;
186 }
187 Err(err) => {
188 last_error = Some(err);
189 }
190 }
191 assert!(last_error.is_some());
193 }
194 Err(last_error.expect("must have at least one peer"))
195 }
196
197 pub(crate) async fn get_session_status_raw(
198 &self,
199 block_index: u64,
200 decoders: &ModuleDecoderRegistry,
201 ) -> anyhow::Result<SessionStatus> {
202 if block_index.is_multiple_of(100) {
203 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
204 } else {
205 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
206 }
207 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
208 SESSION_STATUS_ENDPOINT.to_string(),
209 ApiRequestErased::new(block_index),
210 )
211 .await?
212 .try_into_inner(&decoders.clone().with_fallback())
213 .map_err(|e| anyhow!(e))
214 }
215}
216
217#[apply(async_trait_maybe_send!)]
218impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
219where
220 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
221{
222 fn all_peers(&self) -> &BTreeSet<PeerId> {
223 self.inner.all_peers()
224 }
225
226 fn self_peer(&self) -> Option<PeerId> {
227 self.inner.self_peer()
228 }
229
230 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
231 self.inner.with_module(id)
232 }
233
234 async fn request_raw(
236 &self,
237 peer_id: PeerId,
238 method: &str,
239 params: &ApiRequestErased,
240 ) -> ServerResult<Value> {
241 self.inner.request_raw(peer_id, method, params).await
242 }
243
244 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, PeerStatus>> {
245 self.inner.connection_status_stream()
246 }
247
248 async fn wait_for_initialized_connections(&self) {
249 self.inner.wait_for_initialized_connections().await;
250 }
251
252 async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection> {
253 self.inner.get_peer_connection(peer_id).await
254 }
255}
256
257#[apply(async_trait_maybe_send!)]
258impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
259where
260 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
261{
262 async fn await_block(
263 &self,
264 session_idx: u64,
265 decoders: &ModuleDecoderRegistry,
266 ) -> anyhow::Result<SessionOutcome> {
267 let mut lru_lock = self.await_session_lru.lock().await;
268
269 let entry_arc = lru_lock
270 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
271 .clone();
272
273 drop(lru_lock);
275
276 entry_arc
277 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
278 .await
279 .cloned()
280 }
281
282 async fn get_session_status(
283 &self,
284 session_idx: u64,
285 decoders: &ModuleDecoderRegistry,
286 core_api_version: ApiVersion,
287 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
288 ) -> anyhow::Result<SessionStatus> {
289 enum NoCacheErr {
290 Initial,
291 Pending(Vec<AcceptedItem>),
292 Err(anyhow::Error),
293 }
294
295 let mut lru_lock = self.get_session_status_lru.lock().await;
296
297 let entry_arc = lru_lock
298 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
299 .clone();
300
301 drop(lru_lock);
303
304 match entry_arc
305 .get_or_try_init(|| async {
306 let session_status =
307 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
308 self.get_session_status_raw(session_idx, decoders).await
309 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
310 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
311 .await
312 } else {
313 self.get_session_status_raw(session_idx, decoders).await
314 };
315 match session_status {
316 Err(e) => Err(NoCacheErr::Err(e)),
317 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
318 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
319 Ok(SessionStatus::Complete(s)) => Ok(s),
321 }
322 })
323 .await
324 .cloned()
325 {
326 Ok(s) => Ok(SessionStatus::Complete(s)),
327 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
328 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
329 Err(NoCacheErr::Err(e)) => Err(e),
330 }
331 }
332
333 async fn submit_transaction(
334 &self,
335 tx: Transaction,
336 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
337 self.request_current_consensus_retry(
338 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
339 ApiRequestErased::new(SerdeTransaction::from(&tx)),
340 )
341 .await
342 }
343
344 async fn session_count(&self) -> FederationResult<u64> {
345 self.request_current_consensus(
346 SESSION_COUNT_ENDPOINT.to_owned(),
347 ApiRequestErased::default(),
348 )
349 .await
350 }
351
352 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
353 self.request_current_consensus_retry(
354 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
355 ApiRequestErased::new(txid),
356 )
357 .await
358 }
359
360 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
361 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
362 .await
363 }
364
365 async fn download_backup(
366 &self,
367 id: &secp256k1::PublicKey,
368 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
369 self.request_with_strategy(
370 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
371 RECOVER_ENDPOINT.to_owned(),
372 ApiRequestErased::new(id),
373 )
374 .await
375 }
376
377 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
378 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
379 .await
380 }
381
382 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
383 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
384 .await
385 }
386
387 async fn set_local_params(
388 &self,
389 name: String,
390 federation_name: Option<String>,
391 disable_base_fees: Option<bool>,
392 enabled_modules: Option<BTreeSet<ModuleKind>>,
393 federation_size: Option<u32>,
394 auth: ApiAuth,
395 ) -> FederationResult<String> {
396 self.request_admin(
397 SET_LOCAL_PARAMS_ENDPOINT,
398 ApiRequestErased::new(SetLocalParamsRequest {
399 name,
400 federation_name,
401 disable_base_fees,
402 enabled_modules,
403 federation_size,
404 }),
405 auth,
406 )
407 .await
408 }
409
410 async fn add_peer_connection_info(
411 &self,
412 info: String,
413 auth: ApiAuth,
414 ) -> FederationResult<String> {
415 self.request_admin(
416 ADD_PEER_SETUP_CODE_ENDPOINT,
417 ApiRequestErased::new(info),
418 auth,
419 )
420 .await
421 }
422
423 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
424 self.request_admin(
425 RESET_PEER_SETUP_CODES_ENDPOINT,
426 ApiRequestErased::default(),
427 auth,
428 )
429 .await
430 }
431
432 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
433 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
434 .await
435 }
436
437 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
438 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
439 .await
440 }
441
442 async fn status(&self) -> FederationResult<StatusResponse> {
443 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
444 .await
445 }
446
447 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
448 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
449 .await
450 }
451
452 async fn guardian_config_backup(
453 &self,
454 auth: ApiAuth,
455 ) -> FederationResult<GuardianConfigBackup> {
456 self.request_admin(
457 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
458 ApiRequestErased::default(),
459 auth,
460 )
461 .await
462 }
463
464 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
465 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
466 .await
467 }
468
469 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
470 self.request_admin(
471 RESTART_FEDERATION_SETUP_ENDPOINT,
472 ApiRequestErased::default(),
473 auth,
474 )
475 .await
476 }
477
478 async fn submit_api_announcement(
479 &self,
480 announcement_peer_id: PeerId,
481 announcement: SignedApiAnnouncement,
482 ) -> FederationResult<()> {
483 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
484 let announcement_inner = announcement.clone();
485 async move {
486 (
487 peer_id,
488 self.request_single_peer::<()>(
489 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
490 ApiRequestErased::new(SignedApiAnnouncementSubmission {
491 signed_api_announcement: announcement_inner,
492 peer_id: announcement_peer_id,
493 }),
494 peer_id,
495 )
496 .await,
497 )
498 }
499 }))
500 .await
501 .into_iter()
502 .filter_map(|(peer_id, result)| match result {
503 Ok(()) => None,
504 Err(e) => Some((peer_id, e)),
505 })
506 .collect::<BTreeMap<_, _>>();
507
508 if peer_errors.is_empty() {
509 Ok(())
510 } else {
511 Err(FederationError {
512 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
513 params: serde_json::to_value(announcement).expect("can be serialized"),
514 general: None,
515 peer_errors,
516 })
517 }
518 }
519
520 async fn api_announcements(
521 &self,
522 guardian: PeerId,
523 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
524 self.request_single_peer(
525 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
526 ApiRequestErased::default(),
527 guardian,
528 )
529 .await
530 }
531
532 async fn sign_api_announcement(
533 &self,
534 api_url: SafeUrl,
535 auth: ApiAuth,
536 ) -> FederationResult<SignedApiAnnouncement> {
537 self.request_admin(
538 SIGN_API_ANNOUNCEMENT_ENDPOINT,
539 ApiRequestErased::new(api_url),
540 auth,
541 )
542 .await
543 }
544
545 async fn submit_guardian_metadata(
546 &self,
547 announcement_peer_id: PeerId,
548 metadata: fedimint_core::net::guardian_metadata::SignedGuardianMetadata,
549 ) -> FederationResult<()> {
550 use fedimint_core::net::guardian_metadata::SignedGuardianMetadataSubmission;
551 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
552 let metadata_inner = metadata.clone();
553 async move {
554 (
555 peer_id,
556 self.request_single_peer::<()>(
557 SUBMIT_GUARDIAN_METADATA_ENDPOINT.into(),
558 ApiRequestErased::new(SignedGuardianMetadataSubmission {
559 signed_guardian_metadata: metadata_inner,
560 peer_id: announcement_peer_id,
561 }),
562 peer_id,
563 )
564 .await,
565 )
566 }
567 }))
568 .await
569 .into_iter()
570 .filter_map(|(peer_id, result)| match result {
571 Ok(()) => None,
572 Err(e) => Some((peer_id, e)),
573 })
574 .collect::<BTreeMap<_, _>>();
575
576 if peer_errors.is_empty() {
577 Ok(())
578 } else {
579 Err(FederationError {
580 method: SUBMIT_GUARDIAN_METADATA_ENDPOINT.to_string(),
581 params: serde_json::to_value(&metadata).expect("can be serialized"),
582 general: None,
583 peer_errors,
584 })
585 }
586 }
587
588 async fn guardian_metadata(
589 &self,
590 guardian: PeerId,
591 ) -> ServerResult<BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata>>
592 {
593 self.request_single_peer(
594 GUARDIAN_METADATA_ENDPOINT.to_owned(),
595 ApiRequestErased::default(),
596 guardian,
597 )
598 .await
599 }
600
601 async fn sign_guardian_metadata(
602 &self,
603 metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
604 auth: ApiAuth,
605 ) -> FederationResult<fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
606 self.request_admin(
607 SIGN_GUARDIAN_METADATA_ENDPOINT,
608 ApiRequestErased::new(metadata),
609 auth,
610 )
611 .await
612 }
613
614 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
615 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
616 .await
617 }
618
619 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
620 self.request_admin(
621 BACKUP_STATISTICS_ENDPOINT,
622 ApiRequestErased::default(),
623 auth,
624 )
625 .await
626 }
627
628 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
629 self.request_single_peer(
630 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
631 ApiRequestErased::default(),
632 peer_id,
633 )
634 .await
635 }
636
637 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
638 self.request_single_peer(
639 INVITE_CODE_ENDPOINT.to_owned(),
640 ApiRequestErased::default(),
641 guardian,
642 )
643 .await
644 }
645
646 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
647 self.request_admin(
648 CHANGE_PASSWORD_ENDPOINT,
649 ApiRequestErased::new(new_password),
650 auth,
651 )
652 .await
653 }
654
655 async fn chain_id(&self) -> FederationResult<ChainId> {
656 self.request_current_consensus(CHAIN_ID_ENDPOINT.to_owned(), ApiRequestErased::default())
657 .await
658 }
659}