1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::ServerResult;
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::backup::SignedBackupRequest;
12use fedimint_core::core::{ModuleInstanceId, ModuleKind};
13use fedimint_core::endpoint_constants::{
14 ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16 BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
17 GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18 RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19 SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20 SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT,
23};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use futures::stream::BoxStream;
43use itertools::Itertools;
44use rand::seq::SliceRandom;
45use serde_json::Value;
46use tokio::sync::OnceCell;
47use tracing::{debug, trace};
48
49use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
50use crate::api::{
51 FederationApiExt, FederationError, FederationResult,
52 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
53};
54use crate::query::FilterMapThreshold;
55
56pub trait GlobalFederationApiWithCacheExt
59where
60 Self: Sized,
61{
62 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
63}
64
65impl<T> GlobalFederationApiWithCacheExt for T
66where
67 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
68{
69 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
70 GlobalFederationApiWithCache::new(self)
71 }
72}
73
74#[derive(Debug)]
79pub struct GlobalFederationApiWithCache<T> {
80 pub(crate) inner: T,
81 pub(crate) await_session_lru:
91 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
92
93 pub(crate) get_session_status_lru:
101 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
102}
103
104impl<T> GlobalFederationApiWithCache<T> {
105 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
106 Self {
107 inner,
108 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
109 NonZeroUsize::new(512).expect("is non-zero"),
110 ))),
111 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112 NonZeroUsize::new(512).expect("is non-zero"),
113 ))),
114 }
115 }
116}
117
118impl<T> GlobalFederationApiWithCache<T>
119where
120 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
121{
122 pub(crate) async fn await_block_raw(
123 &self,
124 block_index: u64,
125 decoders: &ModuleDecoderRegistry,
126 ) -> anyhow::Result<SessionOutcome> {
127 if block_index % 100 == 0 {
128 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129 } else {
130 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131 }
132 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
133 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
134 ApiRequestErased::new(block_index),
135 )
136 .await?
137 .try_into_inner(decoders)
138 .map_err(|e| anyhow!(e.to_string()))
139 }
140
141 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
142 let mut peers = self.all_peers().iter().copied().collect_vec();
143 peers.shuffle(&mut rand::thread_rng());
144 peers.into_iter()
145 }
146
147 pub(crate) async fn get_session_status_raw_v2(
148 &self,
149 block_index: u64,
150 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
151 decoders: &ModuleDecoderRegistry,
152 ) -> anyhow::Result<SessionStatus> {
153 if block_index % 100 == 0 {
154 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
155 } else {
156 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
157 }
158 let params = ApiRequestErased::new(block_index);
159 let mut last_error = None;
160 for peer_id in self.select_peers_for_status() {
162 match self
163 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
164 SESSION_STATUS_V2_ENDPOINT.to_string(),
165 params.clone(),
166 peer_id,
167 )
168 .await
169 .map_err(anyhow::Error::from)
170 .and_then(|s| Ok(s.try_into_inner(decoders)?))
171 {
172 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
173 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
174 return Ok(SessionStatus::Complete(
176 signed_session_outcome.session_outcome,
177 ));
178 }
179 last_error = Some(format_err!("Invalid signature"));
180 }
181 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
182 return self.get_session_status_raw(block_index, decoders).await;
184 }
185 Err(err) => {
186 last_error = Some(err);
187 }
188 }
189 assert!(last_error.is_some());
191 }
192 Err(last_error.expect("must have at least one peer"))
193 }
194
195 pub(crate) async fn get_session_status_raw(
196 &self,
197 block_index: u64,
198 decoders: &ModuleDecoderRegistry,
199 ) -> anyhow::Result<SessionStatus> {
200 if block_index % 100 == 0 {
201 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
202 } else {
203 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
204 }
205 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
206 SESSION_STATUS_ENDPOINT.to_string(),
207 ApiRequestErased::new(block_index),
208 )
209 .await?
210 .try_into_inner(&decoders.clone().with_fallback())
211 .map_err(|e| anyhow!(e))
212 }
213}
214
215#[apply(async_trait_maybe_send!)]
216impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
217where
218 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
219{
220 fn all_peers(&self) -> &BTreeSet<PeerId> {
221 self.inner.all_peers()
222 }
223
224 fn self_peer(&self) -> Option<PeerId> {
225 self.inner.self_peer()
226 }
227
228 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
229 self.inner.with_module(id)
230 }
231
232 async fn request_raw(
234 &self,
235 peer_id: PeerId,
236 method: &str,
237 params: &ApiRequestErased,
238 ) -> ServerResult<Value> {
239 self.inner.request_raw(peer_id, method, params).await
240 }
241
242 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
243 self.inner.connection_status_stream()
244 }
245
246 async fn wait_for_initialized_connections(&self) {
247 self.inner.wait_for_initialized_connections().await;
248 }
249}
250
251#[apply(async_trait_maybe_send!)]
252impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
253where
254 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
255{
256 async fn await_block(
257 &self,
258 session_idx: u64,
259 decoders: &ModuleDecoderRegistry,
260 ) -> anyhow::Result<SessionOutcome> {
261 let mut lru_lock = self.await_session_lru.lock().await;
262
263 let entry_arc = lru_lock
264 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
265 .clone();
266
267 drop(lru_lock);
269
270 entry_arc
271 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
272 .await
273 .cloned()
274 }
275
276 async fn get_session_status(
277 &self,
278 session_idx: u64,
279 decoders: &ModuleDecoderRegistry,
280 core_api_version: ApiVersion,
281 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
282 ) -> anyhow::Result<SessionStatus> {
283 let mut lru_lock = self.get_session_status_lru.lock().await;
284
285 let entry_arc = lru_lock
286 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
287 .clone();
288
289 drop(lru_lock);
291
292 enum NoCacheErr {
293 Initial,
294 Pending(Vec<AcceptedItem>),
295 Err(anyhow::Error),
296 }
297 match entry_arc
298 .get_or_try_init(|| async {
299 let session_status =
300 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
301 self.get_session_status_raw(session_idx, decoders).await
302 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
303 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
304 .await
305 } else {
306 self.get_session_status_raw(session_idx, decoders).await
307 };
308 match session_status {
309 Err(e) => Err(NoCacheErr::Err(e)),
310 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
311 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
312 Ok(SessionStatus::Complete(s)) => Ok(s),
314 }
315 })
316 .await
317 .cloned()
318 {
319 Ok(s) => Ok(SessionStatus::Complete(s)),
320 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
321 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
322 Err(NoCacheErr::Err(e)) => Err(e),
323 }
324 }
325
326 async fn submit_transaction(
327 &self,
328 tx: Transaction,
329 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
330 self.request_current_consensus_retry(
331 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
332 ApiRequestErased::new(SerdeTransaction::from(&tx)),
333 )
334 .await
335 }
336
337 async fn session_count(&self) -> FederationResult<u64> {
338 self.request_current_consensus(
339 SESSION_COUNT_ENDPOINT.to_owned(),
340 ApiRequestErased::default(),
341 )
342 .await
343 }
344
345 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
346 self.request_current_consensus_retry(
347 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
348 ApiRequestErased::new(txid),
349 )
350 .await
351 }
352
353 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
354 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
355 .await
356 }
357
358 async fn download_backup(
359 &self,
360 id: &secp256k1::PublicKey,
361 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
362 self.request_with_strategy(
363 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
364 RECOVER_ENDPOINT.to_owned(),
365 ApiRequestErased::new(id),
366 )
367 .await
368 }
369
370 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
371 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
372 .await
373 }
374
375 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
376 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
377 .await
378 }
379
380 async fn set_local_params(
381 &self,
382 name: String,
383 federation_name: Option<String>,
384 disable_base_fees: Option<bool>,
385 enabled_modules: Option<BTreeSet<ModuleKind>>,
386 auth: ApiAuth,
387 ) -> FederationResult<String> {
388 self.request_admin(
389 SET_LOCAL_PARAMS_ENDPOINT,
390 ApiRequestErased::new(SetLocalParamsRequest {
391 name,
392 federation_name,
393 disable_base_fees,
394 enabled_modules,
395 }),
396 auth,
397 )
398 .await
399 }
400
401 async fn add_peer_connection_info(
402 &self,
403 info: String,
404 auth: ApiAuth,
405 ) -> FederationResult<String> {
406 self.request_admin(
407 ADD_PEER_SETUP_CODE_ENDPOINT,
408 ApiRequestErased::new(info),
409 auth,
410 )
411 .await
412 }
413
414 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
415 self.request_admin(
416 RESET_PEER_SETUP_CODES_ENDPOINT,
417 ApiRequestErased::default(),
418 auth,
419 )
420 .await
421 }
422
423 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
424 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
425 .await
426 }
427
428 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
429 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
430 .await
431 }
432
433 async fn status(&self) -> FederationResult<StatusResponse> {
434 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
435 .await
436 }
437
438 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
439 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
440 .await
441 }
442
443 async fn guardian_config_backup(
444 &self,
445 auth: ApiAuth,
446 ) -> FederationResult<GuardianConfigBackup> {
447 self.request_admin(
448 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
449 ApiRequestErased::default(),
450 auth,
451 )
452 .await
453 }
454
455 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
456 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
457 .await
458 }
459
460 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
461 self.request_admin(
462 RESTART_FEDERATION_SETUP_ENDPOINT,
463 ApiRequestErased::default(),
464 auth,
465 )
466 .await
467 }
468
469 async fn submit_api_announcement(
470 &self,
471 announcement_peer_id: PeerId,
472 announcement: SignedApiAnnouncement,
473 ) -> FederationResult<()> {
474 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
475 let announcement_inner = announcement.clone();
476 async move {
477 (
478 peer_id,
479 self.request_single_peer::<()>(
480 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
481 ApiRequestErased::new(SignedApiAnnouncementSubmission {
482 signed_api_announcement: announcement_inner,
483 peer_id: announcement_peer_id,
484 }),
485 peer_id,
486 )
487 .await,
488 )
489 }
490 }))
491 .await
492 .into_iter()
493 .filter_map(|(peer_id, result)| match result {
494 Ok(()) => None,
495 Err(e) => Some((peer_id, e)),
496 })
497 .collect::<BTreeMap<_, _>>();
498
499 if peer_errors.is_empty() {
500 Ok(())
501 } else {
502 Err(FederationError {
503 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
504 params: serde_json::to_value(announcement).expect("can be serialized"),
505 general: None,
506 peer_errors,
507 })
508 }
509 }
510
511 async fn api_announcements(
512 &self,
513 guardian: PeerId,
514 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
515 self.request_single_peer(
516 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
517 ApiRequestErased::default(),
518 guardian,
519 )
520 .await
521 }
522
523 async fn sign_api_announcement(
524 &self,
525 api_url: SafeUrl,
526 auth: ApiAuth,
527 ) -> FederationResult<SignedApiAnnouncement> {
528 self.request_admin(
529 SIGN_API_ANNOUNCEMENT_ENDPOINT,
530 ApiRequestErased::new(api_url),
531 auth,
532 )
533 .await
534 }
535
536 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
537 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
538 .await
539 }
540
541 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
542 self.request_admin(
543 BACKUP_STATISTICS_ENDPOINT,
544 ApiRequestErased::default(),
545 auth,
546 )
547 .await
548 }
549
550 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
551 self.request_single_peer(
552 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
553 ApiRequestErased::default(),
554 peer_id,
555 )
556 .await
557 }
558
559 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
560 self.request_single_peer(
561 INVITE_CODE_ENDPOINT.to_owned(),
562 ApiRequestErased::default(),
563 guardian,
564 )
565 .await
566 }
567
568 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
569 self.request_admin(
570 CHANGE_PASSWORD_ENDPOINT,
571 ApiRequestErased::new(new_password),
572 auth,
573 )
574 .await
575 }
576}