1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10 GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::ModuleInstanceId;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::endpoint_constants::{
16 ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
17 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
18 BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
19 CONFIG_GEN_PEERS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT,
20 GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
21 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
22 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
23 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
24 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
25 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26 VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::audit::AuditSummary;
30use fedimint_core::module::registry::ModuleDecoderRegistry;
31use fedimint_core::module::{
32 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
33};
34use fedimint_core::net::api_announcement::{
35 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
36};
37use fedimint_core::session_outcome::{
38 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
39};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::future::join_all;
46use itertools::Itertools;
47use rand::seq::SliceRandom;
48use serde_json::Value;
49use tokio::sync::OnceCell;
50use tracing::{debug, trace};
51
52use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
53use crate::api::{
54 FederationApiExt, FederationError, FederationResult, PeerResult,
55 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
56};
57use crate::query::FilterMapThreshold;
58
59pub trait GlobalFederationApiWithCacheExt
62where
63 Self: Sized,
64{
65 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
66}
67
68impl<T> GlobalFederationApiWithCacheExt for T
69where
70 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
71{
72 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
73 GlobalFederationApiWithCache::new(self)
74 }
75}
76
77#[derive(Debug)]
82pub struct GlobalFederationApiWithCache<T> {
83 pub(crate) inner: T,
84 pub(crate) await_session_lru:
94 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
95
96 pub(crate) get_session_status_lru:
104 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
105}
106
107impl<T> GlobalFederationApiWithCache<T> {
108 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
109 Self {
110 inner,
111 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112 NonZeroUsize::new(512).expect("is non-zero"),
113 ))),
114 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
115 NonZeroUsize::new(512).expect("is non-zero"),
116 ))),
117 }
118 }
119}
120
121impl<T> GlobalFederationApiWithCache<T>
122where
123 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
124{
125 pub(crate) async fn await_block_raw(
126 &self,
127 block_index: u64,
128 decoders: &ModuleDecoderRegistry,
129 ) -> anyhow::Result<SessionOutcome> {
130 if block_index % 100 == 0 {
131 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
132 } else {
133 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
134 }
135 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
136 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
137 ApiRequestErased::new(block_index),
138 )
139 .await?
140 .try_into_inner(decoders)
141 .map_err(|e| anyhow!(e.to_string()))
142 }
143
144 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
145 let mut peers = self.all_peers().iter().copied().collect_vec();
146 peers.shuffle(&mut rand::thread_rng());
147 peers.into_iter()
148 }
149
150 pub(crate) async fn get_session_status_raw_v2(
151 &self,
152 block_index: u64,
153 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
154 decoders: &ModuleDecoderRegistry,
155 ) -> anyhow::Result<SessionStatus> {
156 if block_index % 100 == 0 {
157 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
158 } else {
159 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
160 }
161 let params = ApiRequestErased::new(block_index);
162 let mut last_error = None;
163 for peer_id in self.select_peers_for_status() {
165 match self
166 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
167 SESSION_STATUS_V2_ENDPOINT.to_string(),
168 params.clone(),
169 peer_id,
170 )
171 .await
172 .map_err(anyhow::Error::from)
173 .and_then(|s| Ok(s.try_into_inner(decoders)?))
174 {
175 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
176 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
177 return Ok(SessionStatus::Complete(
179 signed_session_outcome.session_outcome,
180 ));
181 }
182 last_error = Some(format_err!("Invalid signature"));
183 }
184 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
185 return self.get_session_status_raw(block_index, decoders).await;
187 }
188 Err(err) => {
189 last_error = Some(err);
190 }
191 }
192 assert!(last_error.is_some());
194 }
195 Err(last_error.expect("must have at least one peer"))
196 }
197
198 pub(crate) async fn get_session_status_raw(
199 &self,
200 block_index: u64,
201 decoders: &ModuleDecoderRegistry,
202 ) -> anyhow::Result<SessionStatus> {
203 if block_index % 100 == 0 {
204 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
205 } else {
206 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
207 }
208 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
209 SESSION_STATUS_ENDPOINT.to_string(),
210 ApiRequestErased::new(block_index),
211 )
212 .await?
213 .try_into_inner(&decoders.clone().with_fallback())
214 .map_err(|e| anyhow!(e))
215 }
216}
217
218#[apply(async_trait_maybe_send!)]
219impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
220where
221 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
222{
223 fn all_peers(&self) -> &BTreeSet<PeerId> {
224 self.inner.all_peers()
225 }
226
227 fn self_peer(&self) -> Option<PeerId> {
228 self.inner.self_peer()
229 }
230
231 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
232 self.inner.with_module(id)
233 }
234
235 async fn request_raw(
237 &self,
238 peer_id: PeerId,
239 method: &str,
240 params: &ApiRequestErased,
241 ) -> PeerResult<Value> {
242 self.inner.request_raw(peer_id, method, params).await
243 }
244}
245
246#[apply(async_trait_maybe_send!)]
247impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
248where
249 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
250{
251 async fn await_block(
252 &self,
253 session_idx: u64,
254 decoders: &ModuleDecoderRegistry,
255 ) -> anyhow::Result<SessionOutcome> {
256 let mut lru_lock = self.await_session_lru.lock().await;
257
258 let entry_arc = lru_lock
259 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
260 .clone();
261
262 drop(lru_lock);
264
265 entry_arc
266 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
267 .await
268 .cloned()
269 }
270
271 async fn get_session_status(
272 &self,
273 session_idx: u64,
274 decoders: &ModuleDecoderRegistry,
275 core_api_version: ApiVersion,
276 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
277 ) -> anyhow::Result<SessionStatus> {
278 let mut lru_lock = self.get_session_status_lru.lock().await;
279
280 let entry_arc = lru_lock
281 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
282 .clone();
283
284 drop(lru_lock);
286
287 enum NoCacheErr {
288 Initial,
289 Pending(Vec<AcceptedItem>),
290 Err(anyhow::Error),
291 }
292 match entry_arc
293 .get_or_try_init(|| async {
294 let session_status =
295 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
296 self.get_session_status_raw(session_idx, decoders).await
297 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
298 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
299 .await
300 } else {
301 self.get_session_status_raw(session_idx, decoders).await
302 };
303 match session_status {
304 Err(e) => Err(NoCacheErr::Err(e)),
305 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
306 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
307 Ok(SessionStatus::Complete(s)) => Ok(s),
309 }
310 })
311 .await
312 .cloned()
313 {
314 Ok(s) => Ok(SessionStatus::Complete(s)),
315 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
316 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
317 Err(NoCacheErr::Err(e)) => Err(e),
318 }
319 }
320
321 async fn submit_transaction(
322 &self,
323 tx: Transaction,
324 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
325 self.request_current_consensus_retry(
326 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
327 ApiRequestErased::new(SerdeTransaction::from(&tx)),
328 )
329 .await
330 }
331
332 async fn session_count(&self) -> FederationResult<u64> {
333 self.request_current_consensus(
334 SESSION_COUNT_ENDPOINT.to_owned(),
335 ApiRequestErased::default(),
336 )
337 .await
338 }
339
340 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
341 self.request_current_consensus_retry(
342 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
343 ApiRequestErased::new(txid),
344 )
345 .await
346 }
347
348 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
349 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
350 .await
351 }
352
353 async fn download_backup(
354 &self,
355 id: &secp256k1::PublicKey,
356 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
357 self.request_with_strategy(
358 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
359 RECOVER_ENDPOINT.to_owned(),
360 ApiRequestErased::new(id),
361 )
362 .await
363 }
364
365 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
366 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
367 .await
368 }
369
370 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
371 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
372 .await
373 }
374
375 async fn set_local_params(
376 &self,
377 name: String,
378 federation_name: Option<String>,
379 disable_base_fees: Option<bool>,
380 auth: ApiAuth,
381 ) -> FederationResult<String> {
382 self.request_admin(
383 SET_LOCAL_PARAMS_ENDPOINT,
384 ApiRequestErased::new(SetLocalParamsRequest {
385 name,
386 federation_name,
387 disable_base_fees,
388 }),
389 auth,
390 )
391 .await
392 }
393
394 async fn add_peer_connection_info(
395 &self,
396 info: String,
397 auth: ApiAuth,
398 ) -> FederationResult<String> {
399 self.request_admin(
400 ADD_PEER_SETUP_CODE_ENDPOINT,
401 ApiRequestErased::new(info),
402 auth,
403 )
404 .await
405 }
406
407 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
408 self.request_admin(
409 RESET_PEER_SETUP_CODES_ENDPOINT,
410 ApiRequestErased::default(),
411 auth,
412 )
413 .await
414 }
415
416 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
417 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
418 .await
419 }
420
421 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
422 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
423 .await
424 }
425
426 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
427 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
428 .await
429 }
430
431 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
432 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
433 .await
434 }
435
436 async fn get_verify_config_hash(
437 &self,
438 auth: ApiAuth,
439 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
440 self.request_admin(
441 VERIFY_CONFIG_HASH_ENDPOINT,
442 ApiRequestErased::default(),
443 auth,
444 )
445 .await
446 }
447
448 async fn verified_configs(
449 &self,
450 auth: ApiAuth,
451 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
452 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
453 .await
454 }
455
456 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
457 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
458 .await
459 }
460
461 async fn status(&self) -> FederationResult<StatusResponse> {
462 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
463 .await
464 }
465
466 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
467 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
468 .await
469 }
470
471 async fn guardian_config_backup(
472 &self,
473 auth: ApiAuth,
474 ) -> FederationResult<GuardianConfigBackup> {
475 self.request_admin(
476 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
477 ApiRequestErased::default(),
478 auth,
479 )
480 .await
481 }
482
483 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
484 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
485 .await
486 }
487
488 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
489 self.request_admin(
490 RESTART_FEDERATION_SETUP_ENDPOINT,
491 ApiRequestErased::default(),
492 auth,
493 )
494 .await
495 }
496
497 async fn submit_api_announcement(
498 &self,
499 announcement_peer_id: PeerId,
500 announcement: SignedApiAnnouncement,
501 ) -> FederationResult<()> {
502 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
503 let announcement_inner = announcement.clone();
504 async move {
505 (
506 peer_id,
507 self.request_single_peer::<()>(
508 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
509 ApiRequestErased::new(SignedApiAnnouncementSubmission {
510 signed_api_announcement: announcement_inner,
511 peer_id: announcement_peer_id,
512 }),
513 peer_id,
514 )
515 .await,
516 )
517 }
518 }))
519 .await
520 .into_iter()
521 .filter_map(|(peer_id, result)| match result {
522 Ok(()) => None,
523 Err(e) => Some((peer_id, e)),
524 })
525 .collect::<BTreeMap<_, _>>();
526
527 if peer_errors.is_empty() {
528 Ok(())
529 } else {
530 Err(FederationError {
531 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
532 params: serde_json::to_value(announcement).expect("can be serialized"),
533 general: None,
534 peer_errors,
535 })
536 }
537 }
538
539 async fn api_announcements(
540 &self,
541 guardian: PeerId,
542 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
543 self.request_single_peer(
544 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
545 ApiRequestErased::default(),
546 guardian,
547 )
548 .await
549 }
550
551 async fn sign_api_announcement(
552 &self,
553 api_url: SafeUrl,
554 auth: ApiAuth,
555 ) -> FederationResult<SignedApiAnnouncement> {
556 self.request_admin(
557 SIGN_API_ANNOUNCEMENT_ENDPOINT,
558 ApiRequestErased::new(api_url),
559 auth,
560 )
561 .await
562 }
563
564 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
565 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
566 .await
567 }
568
569 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
570 self.request_admin(
571 BACKUP_STATISTICS_ENDPOINT,
572 ApiRequestErased::default(),
573 auth,
574 )
575 .await
576 }
577
578 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
579 self.request_single_peer(
580 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
581 ApiRequestErased::default(),
582 peer_id,
583 )
584 .await
585 }
586
587 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
588 self.request_single_peer(
589 INVITE_CODE_ENDPOINT.to_owned(),
590 ApiRequestErased::default(),
591 guardian,
592 )
593 .await
594 }
595
596 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
597 self.request_admin(
598 CHANGE_PASSWORD_ENDPOINT,
599 ApiRequestErased::new(new_password),
600 auth,
601 )
602 .await
603 }
604}