1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10 GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::ModuleInstanceId;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::endpoint_constants::{
16 ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
17 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
18 BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
19 FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
20 INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT,
21 RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT,
22 SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT,
23 SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
24 START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
25 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26 VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::audit::AuditSummary;
30use fedimint_core::module::registry::ModuleDecoderRegistry;
31use fedimint_core::module::{
32 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
33};
34use fedimint_core::net::api_announcement::{
35 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
36};
37use fedimint_core::session_outcome::{
38 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
39};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::future::join_all;
46use itertools::Itertools;
47use rand::seq::SliceRandom;
48use serde_json::Value;
49use tokio::sync::OnceCell;
50use tracing::debug;
51
52use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
53use crate::api::{
54 FederationApiExt, FederationError, FederationResult, PeerResult,
55 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
56};
57use crate::query::FilterMapThreshold;
58
59pub trait GlobalFederationApiWithCacheExt
62where
63 Self: Sized,
64{
65 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
66}
67
68impl<T> GlobalFederationApiWithCacheExt for T
69where
70 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
71{
72 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
73 GlobalFederationApiWithCache::new(self)
74 }
75}
76
77#[derive(Debug)]
82pub struct GlobalFederationApiWithCache<T> {
83 pub(crate) inner: T,
84 pub(crate) await_session_lru:
94 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
95
96 pub(crate) get_session_status_lru:
104 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
105}
106
107impl<T> GlobalFederationApiWithCache<T> {
108 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
109 Self {
110 inner,
111 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112 NonZeroUsize::new(512).expect("is non-zero"),
113 ))),
114 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
115 NonZeroUsize::new(512).expect("is non-zero"),
116 ))),
117 }
118 }
119}
120
121impl<T> GlobalFederationApiWithCache<T>
122where
123 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
124{
125 pub(crate) async fn await_block_raw(
126 &self,
127 block_index: u64,
128 decoders: &ModuleDecoderRegistry,
129 ) -> anyhow::Result<SessionOutcome> {
130 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
132 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
133 ApiRequestErased::new(block_index),
134 )
135 .await?
136 .try_into_inner(decoders)
137 .map_err(|e| anyhow!(e.to_string()))
138 }
139
140 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
141 let mut peers = self.all_peers().iter().copied().collect_vec();
142 peers.shuffle(&mut rand::thread_rng());
143 peers.into_iter()
144 }
145
146 pub(crate) async fn get_session_status_raw_v2(
147 &self,
148 block_index: u64,
149 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
150 decoders: &ModuleDecoderRegistry,
151 ) -> anyhow::Result<SessionStatus> {
152 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
153 let params = ApiRequestErased::new(block_index);
154 let mut last_error = None;
155 for peer_id in self.select_peers_for_status() {
157 match self
158 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
159 SESSION_STATUS_V2_ENDPOINT.to_string(),
160 params.clone(),
161 peer_id,
162 )
163 .await
164 .map_err(anyhow::Error::from)
165 .and_then(|s| Ok(s.try_into_inner(decoders)?))
166 {
167 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
168 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
169 return Ok(SessionStatus::Complete(
171 signed_session_outcome.session_outcome,
172 ));
173 }
174 last_error = Some(format_err!("Invalid signature"));
175 }
176 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
177 return self.get_session_status_raw(block_index, decoders).await;
179 }
180 Err(err) => {
181 last_error = Some(err);
182 }
183 }
184 assert!(last_error.is_some());
186 }
187 Err(last_error.expect("must have at least one peer"))
188 }
189
190 pub(crate) async fn get_session_status_raw(
191 &self,
192 block_index: u64,
193 decoders: &ModuleDecoderRegistry,
194 ) -> anyhow::Result<SessionStatus> {
195 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
196 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
197 SESSION_STATUS_ENDPOINT.to_string(),
198 ApiRequestErased::new(block_index),
199 )
200 .await?
201 .try_into_inner(&decoders.clone().with_fallback())
202 .map_err(|e| anyhow!(e))
203 }
204}
205
206#[apply(async_trait_maybe_send!)]
207impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
208where
209 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
210{
211 fn all_peers(&self) -> &BTreeSet<PeerId> {
212 self.inner.all_peers()
213 }
214
215 fn self_peer(&self) -> Option<PeerId> {
216 self.inner.self_peer()
217 }
218
219 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
220 self.inner.with_module(id)
221 }
222
223 async fn request_raw(
225 &self,
226 peer_id: PeerId,
227 method: &str,
228 params: &ApiRequestErased,
229 ) -> PeerResult<Value> {
230 self.inner.request_raw(peer_id, method, params).await
231 }
232}
233
234#[apply(async_trait_maybe_send!)]
235impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
236where
237 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
238{
239 async fn await_block(
240 &self,
241 session_idx: u64,
242 decoders: &ModuleDecoderRegistry,
243 ) -> anyhow::Result<SessionOutcome> {
244 let mut lru_lock = self.await_session_lru.lock().await;
245
246 let entry_arc = lru_lock
247 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
248 .clone();
249
250 drop(lru_lock);
252
253 entry_arc
254 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
255 .await
256 .cloned()
257 }
258
259 async fn get_session_status(
260 &self,
261 session_idx: u64,
262 decoders: &ModuleDecoderRegistry,
263 core_api_version: ApiVersion,
264 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
265 ) -> anyhow::Result<SessionStatus> {
266 let mut lru_lock = self.get_session_status_lru.lock().await;
267
268 let entry_arc = lru_lock
269 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
270 .clone();
271
272 drop(lru_lock);
274
275 enum NoCacheErr {
276 Initial,
277 Pending(Vec<AcceptedItem>),
278 Err(anyhow::Error),
279 }
280 match entry_arc
281 .get_or_try_init(|| async {
282 let session_status =
283 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
284 self.get_session_status_raw(session_idx, decoders).await
285 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
286 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
287 .await
288 } else {
289 self.get_session_status_raw(session_idx, decoders).await
290 };
291 match session_status {
292 Err(e) => Err(NoCacheErr::Err(e)),
293 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
294 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
295 Ok(SessionStatus::Complete(s)) => Ok(s),
297 }
298 })
299 .await
300 .cloned()
301 {
302 Ok(s) => Ok(SessionStatus::Complete(s)),
303 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
304 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
305 Err(NoCacheErr::Err(e)) => Err(e),
306 }
307 }
308
309 async fn submit_transaction(
310 &self,
311 tx: Transaction,
312 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
313 self.request_current_consensus_retry(
314 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
315 ApiRequestErased::new(SerdeTransaction::from(&tx)),
316 )
317 .await
318 }
319
320 async fn session_count(&self) -> FederationResult<u64> {
321 self.request_current_consensus(
322 SESSION_COUNT_ENDPOINT.to_owned(),
323 ApiRequestErased::default(),
324 )
325 .await
326 }
327
328 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
329 self.request_current_consensus_retry(
330 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
331 ApiRequestErased::new(txid),
332 )
333 .await
334 }
335
336 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
337 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
338 .await
339 }
340
341 async fn download_backup(
342 &self,
343 id: &secp256k1::PublicKey,
344 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
345 self.request_with_strategy(
346 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
347 RECOVER_ENDPOINT.to_owned(),
348 ApiRequestErased::new(id),
349 )
350 .await
351 }
352
353 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
354 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
355 .await
356 }
357
358 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
359 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
360 .await
361 }
362
363 async fn set_local_params(
364 &self,
365 name: String,
366 federation_name: Option<String>,
367 disable_base_fees: Option<bool>,
368 auth: ApiAuth,
369 ) -> FederationResult<String> {
370 self.request_admin(
371 SET_LOCAL_PARAMS_ENDPOINT,
372 ApiRequestErased::new(SetLocalParamsRequest {
373 name,
374 federation_name,
375 disable_base_fees,
376 }),
377 auth,
378 )
379 .await
380 }
381
382 async fn add_peer_connection_info(
383 &self,
384 info: String,
385 auth: ApiAuth,
386 ) -> FederationResult<String> {
387 self.request_admin(
388 ADD_PEER_SETUP_CODE_ENDPOINT,
389 ApiRequestErased::new(info),
390 auth,
391 )
392 .await
393 }
394
395 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
396 self.request_admin(
397 RESET_PEER_SETUP_CODES_ENDPOINT,
398 ApiRequestErased::default(),
399 auth,
400 )
401 .await
402 }
403
404 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
405 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
406 .await
407 }
408
409 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
410 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
411 .await
412 }
413
414 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
415 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
416 .await
417 }
418
419 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
420 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
421 .await
422 }
423
424 async fn get_verify_config_hash(
425 &self,
426 auth: ApiAuth,
427 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
428 self.request_admin(
429 VERIFY_CONFIG_HASH_ENDPOINT,
430 ApiRequestErased::default(),
431 auth,
432 )
433 .await
434 }
435
436 async fn verified_configs(
437 &self,
438 auth: ApiAuth,
439 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
440 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
441 .await
442 }
443
444 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
445 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
446 .await
447 }
448
449 async fn status(&self) -> FederationResult<StatusResponse> {
450 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
451 .await
452 }
453
454 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
455 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
456 .await
457 }
458
459 async fn guardian_config_backup(
460 &self,
461 auth: ApiAuth,
462 ) -> FederationResult<GuardianConfigBackup> {
463 self.request_admin(
464 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
465 ApiRequestErased::default(),
466 auth,
467 )
468 .await
469 }
470
471 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
472 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
473 .await
474 }
475
476 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
477 self.request_admin(
478 RESTART_FEDERATION_SETUP_ENDPOINT,
479 ApiRequestErased::default(),
480 auth,
481 )
482 .await
483 }
484
485 async fn submit_api_announcement(
486 &self,
487 announcement_peer_id: PeerId,
488 announcement: SignedApiAnnouncement,
489 ) -> FederationResult<()> {
490 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
491 let announcement_inner = announcement.clone();
492 async move {
493 (
494 peer_id,
495 self.request_single_peer::<()>(
496 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
497 ApiRequestErased::new(SignedApiAnnouncementSubmission {
498 signed_api_announcement: announcement_inner,
499 peer_id: announcement_peer_id,
500 }),
501 peer_id,
502 )
503 .await,
504 )
505 }
506 }))
507 .await
508 .into_iter()
509 .filter_map(|(peer_id, result)| match result {
510 Ok(()) => None,
511 Err(e) => Some((peer_id, e)),
512 })
513 .collect::<BTreeMap<_, _>>();
514
515 if peer_errors.is_empty() {
516 Ok(())
517 } else {
518 Err(FederationError {
519 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
520 params: serde_json::to_value(announcement).expect("can be serialized"),
521 general: None,
522 peer_errors,
523 })
524 }
525 }
526
527 async fn api_announcements(
528 &self,
529 guardian: PeerId,
530 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
531 self.request_single_peer(
532 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
533 ApiRequestErased::default(),
534 guardian,
535 )
536 .await
537 }
538
539 async fn sign_api_announcement(
540 &self,
541 api_url: SafeUrl,
542 auth: ApiAuth,
543 ) -> FederationResult<SignedApiAnnouncement> {
544 self.request_admin(
545 SIGN_API_ANNOUNCEMENT_ENDPOINT,
546 ApiRequestErased::new(api_url),
547 auth,
548 )
549 .await
550 }
551
552 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
553 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
554 .await
555 }
556
557 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
558 self.request_admin(
559 BACKUP_STATISTICS_ENDPOINT,
560 ApiRequestErased::default(),
561 auth,
562 )
563 .await
564 }
565
566 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
567 self.request_single_peer(
568 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
569 ApiRequestErased::default(),
570 peer_id,
571 )
572 .await
573 }
574
575 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
576 self.request_single_peer(
577 INVITE_CODE_ENDPOINT.to_owned(),
578 ApiRequestErased::default(),
579 guardian,
580 )
581 .await
582 }
583}