1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_connectors::ServerResult;
10use fedimint_core::admin_client::{
11 GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
12};
13use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
14use fedimint_core::core::ModuleInstanceId;
15use fedimint_core::core::backup::SignedBackupRequest;
16use fedimint_core::endpoint_constants::{
17 ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
18 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
19 BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
20 CONFIG_GEN_PEERS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT,
21 GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
22 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
23 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
24 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
25 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
26 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
27 VERIFY_CONFIG_HASH_ENDPOINT,
28};
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::audit::AuditSummary;
31use fedimint_core::module::registry::ModuleDecoderRegistry;
32use fedimint_core::module::{
33 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
34};
35use fedimint_core::net::api_announcement::{
36 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
37};
38use fedimint_core::session_outcome::{
39 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
40};
41use fedimint_core::task::{MaybeSend, MaybeSync};
42use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
43use fedimint_core::util::SafeUrl;
44use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
45use fedimint_logging::LOG_CLIENT_NET_API;
46use futures::future::join_all;
47use itertools::Itertools;
48use rand::seq::SliceRandom;
49use serde_json::Value;
50use tokio::sync::OnceCell;
51use tracing::{debug, trace};
52
53use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
54use crate::api::{
55 FederationApiExt, FederationError, FederationResult,
56 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
57};
58use crate::query::FilterMapThreshold;
59
60pub trait GlobalFederationApiWithCacheExt
63where
64 Self: Sized,
65{
66 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
67}
68
69impl<T> GlobalFederationApiWithCacheExt for T
70where
71 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
72{
73 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
74 GlobalFederationApiWithCache::new(self)
75 }
76}
77
78#[derive(Debug)]
83pub struct GlobalFederationApiWithCache<T> {
84 pub(crate) inner: T,
85 pub(crate) await_session_lru:
95 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
96
97 pub(crate) get_session_status_lru:
105 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
106}
107
108impl<T> GlobalFederationApiWithCache<T> {
109 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
110 Self {
111 inner,
112 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113 NonZeroUsize::new(512).expect("is non-zero"),
114 ))),
115 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
116 NonZeroUsize::new(512).expect("is non-zero"),
117 ))),
118 }
119 }
120}
121
122impl<T> GlobalFederationApiWithCache<T>
123where
124 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
125{
126 pub(crate) async fn await_block_raw(
127 &self,
128 block_index: u64,
129 decoders: &ModuleDecoderRegistry,
130 ) -> anyhow::Result<SessionOutcome> {
131 if block_index % 100 == 0 {
132 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
133 } else {
134 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
135 }
136 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
137 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
138 ApiRequestErased::new(block_index),
139 )
140 .await?
141 .try_into_inner(decoders)
142 .map_err(|e| anyhow!(e.to_string()))
143 }
144
145 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
146 let mut peers = self.all_peers().iter().copied().collect_vec();
147 peers.shuffle(&mut rand::thread_rng());
148 peers.into_iter()
149 }
150
151 pub(crate) async fn get_session_status_raw_v2(
152 &self,
153 block_index: u64,
154 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
155 decoders: &ModuleDecoderRegistry,
156 ) -> anyhow::Result<SessionStatus> {
157 if block_index % 100 == 0 {
158 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
159 } else {
160 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
161 }
162 let params = ApiRequestErased::new(block_index);
163 let mut last_error = None;
164 for peer_id in self.select_peers_for_status() {
166 match self
167 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
168 SESSION_STATUS_V2_ENDPOINT.to_string(),
169 params.clone(),
170 peer_id,
171 )
172 .await
173 .map_err(anyhow::Error::from)
174 .and_then(|s| Ok(s.try_into_inner(decoders)?))
175 {
176 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
177 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
178 return Ok(SessionStatus::Complete(
180 signed_session_outcome.session_outcome,
181 ));
182 }
183 last_error = Some(format_err!("Invalid signature"));
184 }
185 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
186 return self.get_session_status_raw(block_index, decoders).await;
188 }
189 Err(err) => {
190 last_error = Some(err);
191 }
192 }
193 assert!(last_error.is_some());
195 }
196 Err(last_error.expect("must have at least one peer"))
197 }
198
199 pub(crate) async fn get_session_status_raw(
200 &self,
201 block_index: u64,
202 decoders: &ModuleDecoderRegistry,
203 ) -> anyhow::Result<SessionStatus> {
204 if block_index % 100 == 0 {
205 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
206 } else {
207 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
208 }
209 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
210 SESSION_STATUS_ENDPOINT.to_string(),
211 ApiRequestErased::new(block_index),
212 )
213 .await?
214 .try_into_inner(&decoders.clone().with_fallback())
215 .map_err(|e| anyhow!(e))
216 }
217}
218
219#[apply(async_trait_maybe_send!)]
220impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
221where
222 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
223{
224 fn all_peers(&self) -> &BTreeSet<PeerId> {
225 self.inner.all_peers()
226 }
227
228 fn self_peer(&self) -> Option<PeerId> {
229 self.inner.self_peer()
230 }
231
232 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
233 self.inner.with_module(id)
234 }
235
236 async fn request_raw(
238 &self,
239 peer_id: PeerId,
240 method: &str,
241 params: &ApiRequestErased,
242 ) -> ServerResult<Value> {
243 self.inner.request_raw(peer_id, method, params).await
244 }
245}
246
247#[apply(async_trait_maybe_send!)]
248impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
249where
250 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
251{
252 async fn await_block(
253 &self,
254 session_idx: u64,
255 decoders: &ModuleDecoderRegistry,
256 ) -> anyhow::Result<SessionOutcome> {
257 let mut lru_lock = self.await_session_lru.lock().await;
258
259 let entry_arc = lru_lock
260 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
261 .clone();
262
263 drop(lru_lock);
265
266 entry_arc
267 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
268 .await
269 .cloned()
270 }
271
272 async fn get_session_status(
273 &self,
274 session_idx: u64,
275 decoders: &ModuleDecoderRegistry,
276 core_api_version: ApiVersion,
277 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
278 ) -> anyhow::Result<SessionStatus> {
279 let mut lru_lock = self.get_session_status_lru.lock().await;
280
281 let entry_arc = lru_lock
282 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
283 .clone();
284
285 drop(lru_lock);
287
288 enum NoCacheErr {
289 Initial,
290 Pending(Vec<AcceptedItem>),
291 Err(anyhow::Error),
292 }
293 match entry_arc
294 .get_or_try_init(|| async {
295 let session_status =
296 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
297 self.get_session_status_raw(session_idx, decoders).await
298 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
299 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
300 .await
301 } else {
302 self.get_session_status_raw(session_idx, decoders).await
303 };
304 match session_status {
305 Err(e) => Err(NoCacheErr::Err(e)),
306 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
307 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
308 Ok(SessionStatus::Complete(s)) => Ok(s),
310 }
311 })
312 .await
313 .cloned()
314 {
315 Ok(s) => Ok(SessionStatus::Complete(s)),
316 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
317 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
318 Err(NoCacheErr::Err(e)) => Err(e),
319 }
320 }
321
322 async fn submit_transaction(
323 &self,
324 tx: Transaction,
325 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
326 self.request_current_consensus_retry(
327 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
328 ApiRequestErased::new(SerdeTransaction::from(&tx)),
329 )
330 .await
331 }
332
333 async fn session_count(&self) -> FederationResult<u64> {
334 self.request_current_consensus(
335 SESSION_COUNT_ENDPOINT.to_owned(),
336 ApiRequestErased::default(),
337 )
338 .await
339 }
340
341 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
342 self.request_current_consensus_retry(
343 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
344 ApiRequestErased::new(txid),
345 )
346 .await
347 }
348
349 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
350 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
351 .await
352 }
353
354 async fn download_backup(
355 &self,
356 id: &secp256k1::PublicKey,
357 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
358 self.request_with_strategy(
359 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
360 RECOVER_ENDPOINT.to_owned(),
361 ApiRequestErased::new(id),
362 )
363 .await
364 }
365
366 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
367 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
368 .await
369 }
370
371 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
372 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
373 .await
374 }
375
376 async fn set_local_params(
377 &self,
378 name: String,
379 federation_name: Option<String>,
380 disable_base_fees: Option<bool>,
381 auth: ApiAuth,
382 ) -> FederationResult<String> {
383 self.request_admin(
384 SET_LOCAL_PARAMS_ENDPOINT,
385 ApiRequestErased::new(SetLocalParamsRequest {
386 name,
387 federation_name,
388 disable_base_fees,
389 }),
390 auth,
391 )
392 .await
393 }
394
395 async fn add_peer_connection_info(
396 &self,
397 info: String,
398 auth: ApiAuth,
399 ) -> FederationResult<String> {
400 self.request_admin(
401 ADD_PEER_SETUP_CODE_ENDPOINT,
402 ApiRequestErased::new(info),
403 auth,
404 )
405 .await
406 }
407
408 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
409 self.request_admin(
410 RESET_PEER_SETUP_CODES_ENDPOINT,
411 ApiRequestErased::default(),
412 auth,
413 )
414 .await
415 }
416
417 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
418 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
419 .await
420 }
421
422 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
423 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
424 .await
425 }
426
427 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
428 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
429 .await
430 }
431
432 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
433 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
434 .await
435 }
436
437 async fn get_verify_config_hash(
438 &self,
439 auth: ApiAuth,
440 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
441 self.request_admin(
442 VERIFY_CONFIG_HASH_ENDPOINT,
443 ApiRequestErased::default(),
444 auth,
445 )
446 .await
447 }
448
449 async fn verified_configs(
450 &self,
451 auth: ApiAuth,
452 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
453 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
454 .await
455 }
456
457 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
458 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
459 .await
460 }
461
462 async fn status(&self) -> FederationResult<StatusResponse> {
463 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
464 .await
465 }
466
467 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
468 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
469 .await
470 }
471
472 async fn guardian_config_backup(
473 &self,
474 auth: ApiAuth,
475 ) -> FederationResult<GuardianConfigBackup> {
476 self.request_admin(
477 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
478 ApiRequestErased::default(),
479 auth,
480 )
481 .await
482 }
483
484 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
485 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
486 .await
487 }
488
489 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
490 self.request_admin(
491 RESTART_FEDERATION_SETUP_ENDPOINT,
492 ApiRequestErased::default(),
493 auth,
494 )
495 .await
496 }
497
498 async fn submit_api_announcement(
499 &self,
500 announcement_peer_id: PeerId,
501 announcement: SignedApiAnnouncement,
502 ) -> FederationResult<()> {
503 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
504 let announcement_inner = announcement.clone();
505 async move {
506 (
507 peer_id,
508 self.request_single_peer::<()>(
509 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
510 ApiRequestErased::new(SignedApiAnnouncementSubmission {
511 signed_api_announcement: announcement_inner,
512 peer_id: announcement_peer_id,
513 }),
514 peer_id,
515 )
516 .await,
517 )
518 }
519 }))
520 .await
521 .into_iter()
522 .filter_map(|(peer_id, result)| match result {
523 Ok(()) => None,
524 Err(e) => Some((peer_id, e)),
525 })
526 .collect::<BTreeMap<_, _>>();
527
528 if peer_errors.is_empty() {
529 Ok(())
530 } else {
531 Err(FederationError {
532 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
533 params: serde_json::to_value(announcement).expect("can be serialized"),
534 general: None,
535 peer_errors,
536 })
537 }
538 }
539
540 async fn api_announcements(
541 &self,
542 guardian: PeerId,
543 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
544 self.request_single_peer(
545 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
546 ApiRequestErased::default(),
547 guardian,
548 )
549 .await
550 }
551
552 async fn sign_api_announcement(
553 &self,
554 api_url: SafeUrl,
555 auth: ApiAuth,
556 ) -> FederationResult<SignedApiAnnouncement> {
557 self.request_admin(
558 SIGN_API_ANNOUNCEMENT_ENDPOINT,
559 ApiRequestErased::new(api_url),
560 auth,
561 )
562 .await
563 }
564
565 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
566 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
567 .await
568 }
569
570 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
571 self.request_admin(
572 BACKUP_STATISTICS_ENDPOINT,
573 ApiRequestErased::default(),
574 auth,
575 )
576 .await
577 }
578
579 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
580 self.request_single_peer(
581 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
582 ApiRequestErased::default(),
583 peer_id,
584 )
585 .await
586 }
587
588 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
589 self.request_single_peer(
590 INVITE_CODE_ENDPOINT.to_owned(),
591 ApiRequestErased::default(),
592 guardian,
593 )
594 .await
595 }
596
597 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
598 self.request_admin(
599 CHANGE_PASSWORD_ENDPOINT,
600 ApiRequestErased::new(new_password),
601 auth,
602 )
603 .await
604 }
605}