1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::ServerResult;
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14 ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15 AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16 BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
17 GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18 RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19 SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20 SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT,
23};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use itertools::Itertools;
43use rand::seq::SliceRandom;
44use serde_json::Value;
45use tokio::sync::OnceCell;
46use tracing::{debug, trace};
47
48use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
49use crate::api::{
50 FederationApiExt, FederationError, FederationResult,
51 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
52};
53use crate::query::FilterMapThreshold;
54
55pub trait GlobalFederationApiWithCacheExt
58where
59 Self: Sized,
60{
61 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
62}
63
64impl<T> GlobalFederationApiWithCacheExt for T
65where
66 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
67{
68 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
69 GlobalFederationApiWithCache::new(self)
70 }
71}
72
73#[derive(Debug)]
78pub struct GlobalFederationApiWithCache<T> {
79 pub(crate) inner: T,
80 pub(crate) await_session_lru:
90 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
91
92 pub(crate) get_session_status_lru:
100 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
101}
102
103impl<T> GlobalFederationApiWithCache<T> {
104 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
105 Self {
106 inner,
107 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
108 NonZeroUsize::new(512).expect("is non-zero"),
109 ))),
110 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111 NonZeroUsize::new(512).expect("is non-zero"),
112 ))),
113 }
114 }
115}
116
117impl<T> GlobalFederationApiWithCache<T>
118where
119 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
120{
121 pub(crate) async fn await_block_raw(
122 &self,
123 block_index: u64,
124 decoders: &ModuleDecoderRegistry,
125 ) -> anyhow::Result<SessionOutcome> {
126 if block_index % 100 == 0 {
127 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
128 } else {
129 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
130 }
131 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
132 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
133 ApiRequestErased::new(block_index),
134 )
135 .await?
136 .try_into_inner(decoders)
137 .map_err(|e| anyhow!(e.to_string()))
138 }
139
140 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
141 let mut peers = self.all_peers().iter().copied().collect_vec();
142 peers.shuffle(&mut rand::thread_rng());
143 peers.into_iter()
144 }
145
146 pub(crate) async fn get_session_status_raw_v2(
147 &self,
148 block_index: u64,
149 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
150 decoders: &ModuleDecoderRegistry,
151 ) -> anyhow::Result<SessionStatus> {
152 if block_index % 100 == 0 {
153 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
154 } else {
155 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
156 }
157 let params = ApiRequestErased::new(block_index);
158 let mut last_error = None;
159 for peer_id in self.select_peers_for_status() {
161 match self
162 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
163 SESSION_STATUS_V2_ENDPOINT.to_string(),
164 params.clone(),
165 peer_id,
166 )
167 .await
168 .map_err(anyhow::Error::from)
169 .and_then(|s| Ok(s.try_into_inner(decoders)?))
170 {
171 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
172 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
173 return Ok(SessionStatus::Complete(
175 signed_session_outcome.session_outcome,
176 ));
177 }
178 last_error = Some(format_err!("Invalid signature"));
179 }
180 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
181 return self.get_session_status_raw(block_index, decoders).await;
183 }
184 Err(err) => {
185 last_error = Some(err);
186 }
187 }
188 assert!(last_error.is_some());
190 }
191 Err(last_error.expect("must have at least one peer"))
192 }
193
194 pub(crate) async fn get_session_status_raw(
195 &self,
196 block_index: u64,
197 decoders: &ModuleDecoderRegistry,
198 ) -> anyhow::Result<SessionStatus> {
199 if block_index % 100 == 0 {
200 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
201 } else {
202 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
203 }
204 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
205 SESSION_STATUS_ENDPOINT.to_string(),
206 ApiRequestErased::new(block_index),
207 )
208 .await?
209 .try_into_inner(&decoders.clone().with_fallback())
210 .map_err(|e| anyhow!(e))
211 }
212}
213
214#[apply(async_trait_maybe_send!)]
215impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
216where
217 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
218{
219 fn all_peers(&self) -> &BTreeSet<PeerId> {
220 self.inner.all_peers()
221 }
222
223 fn self_peer(&self) -> Option<PeerId> {
224 self.inner.self_peer()
225 }
226
227 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
228 self.inner.with_module(id)
229 }
230
231 async fn request_raw(
233 &self,
234 peer_id: PeerId,
235 method: &str,
236 params: &ApiRequestErased,
237 ) -> ServerResult<Value> {
238 self.inner.request_raw(peer_id, method, params).await
239 }
240}
241
242#[apply(async_trait_maybe_send!)]
243impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
244where
245 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
246{
247 async fn await_block(
248 &self,
249 session_idx: u64,
250 decoders: &ModuleDecoderRegistry,
251 ) -> anyhow::Result<SessionOutcome> {
252 let mut lru_lock = self.await_session_lru.lock().await;
253
254 let entry_arc = lru_lock
255 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
256 .clone();
257
258 drop(lru_lock);
260
261 entry_arc
262 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
263 .await
264 .cloned()
265 }
266
267 async fn get_session_status(
268 &self,
269 session_idx: u64,
270 decoders: &ModuleDecoderRegistry,
271 core_api_version: ApiVersion,
272 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
273 ) -> anyhow::Result<SessionStatus> {
274 let mut lru_lock = self.get_session_status_lru.lock().await;
275
276 let entry_arc = lru_lock
277 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
278 .clone();
279
280 drop(lru_lock);
282
283 enum NoCacheErr {
284 Initial,
285 Pending(Vec<AcceptedItem>),
286 Err(anyhow::Error),
287 }
288 match entry_arc
289 .get_or_try_init(|| async {
290 let session_status =
291 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
292 self.get_session_status_raw(session_idx, decoders).await
293 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
294 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
295 .await
296 } else {
297 self.get_session_status_raw(session_idx, decoders).await
298 };
299 match session_status {
300 Err(e) => Err(NoCacheErr::Err(e)),
301 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
302 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
303 Ok(SessionStatus::Complete(s)) => Ok(s),
305 }
306 })
307 .await
308 .cloned()
309 {
310 Ok(s) => Ok(SessionStatus::Complete(s)),
311 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
312 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
313 Err(NoCacheErr::Err(e)) => Err(e),
314 }
315 }
316
317 async fn submit_transaction(
318 &self,
319 tx: Transaction,
320 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
321 self.request_current_consensus_retry(
322 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
323 ApiRequestErased::new(SerdeTransaction::from(&tx)),
324 )
325 .await
326 }
327
328 async fn session_count(&self) -> FederationResult<u64> {
329 self.request_current_consensus(
330 SESSION_COUNT_ENDPOINT.to_owned(),
331 ApiRequestErased::default(),
332 )
333 .await
334 }
335
336 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
337 self.request_current_consensus_retry(
338 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
339 ApiRequestErased::new(txid),
340 )
341 .await
342 }
343
344 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
345 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
346 .await
347 }
348
349 async fn download_backup(
350 &self,
351 id: &secp256k1::PublicKey,
352 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
353 self.request_with_strategy(
354 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
355 RECOVER_ENDPOINT.to_owned(),
356 ApiRequestErased::new(id),
357 )
358 .await
359 }
360
361 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
362 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
363 .await
364 }
365
366 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
367 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
368 .await
369 }
370
371 async fn set_local_params(
372 &self,
373 name: String,
374 federation_name: Option<String>,
375 disable_base_fees: Option<bool>,
376 auth: ApiAuth,
377 ) -> FederationResult<String> {
378 self.request_admin(
379 SET_LOCAL_PARAMS_ENDPOINT,
380 ApiRequestErased::new(SetLocalParamsRequest {
381 name,
382 federation_name,
383 disable_base_fees,
384 }),
385 auth,
386 )
387 .await
388 }
389
390 async fn add_peer_connection_info(
391 &self,
392 info: String,
393 auth: ApiAuth,
394 ) -> FederationResult<String> {
395 self.request_admin(
396 ADD_PEER_SETUP_CODE_ENDPOINT,
397 ApiRequestErased::new(info),
398 auth,
399 )
400 .await
401 }
402
403 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
404 self.request_admin(
405 RESET_PEER_SETUP_CODES_ENDPOINT,
406 ApiRequestErased::default(),
407 auth,
408 )
409 .await
410 }
411
412 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
413 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
414 .await
415 }
416
417 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
418 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
419 .await
420 }
421
422 async fn status(&self) -> FederationResult<StatusResponse> {
423 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
424 .await
425 }
426
427 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
428 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
429 .await
430 }
431
432 async fn guardian_config_backup(
433 &self,
434 auth: ApiAuth,
435 ) -> FederationResult<GuardianConfigBackup> {
436 self.request_admin(
437 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
438 ApiRequestErased::default(),
439 auth,
440 )
441 .await
442 }
443
444 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
445 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
446 .await
447 }
448
449 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
450 self.request_admin(
451 RESTART_FEDERATION_SETUP_ENDPOINT,
452 ApiRequestErased::default(),
453 auth,
454 )
455 .await
456 }
457
458 async fn submit_api_announcement(
459 &self,
460 announcement_peer_id: PeerId,
461 announcement: SignedApiAnnouncement,
462 ) -> FederationResult<()> {
463 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
464 let announcement_inner = announcement.clone();
465 async move {
466 (
467 peer_id,
468 self.request_single_peer::<()>(
469 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
470 ApiRequestErased::new(SignedApiAnnouncementSubmission {
471 signed_api_announcement: announcement_inner,
472 peer_id: announcement_peer_id,
473 }),
474 peer_id,
475 )
476 .await,
477 )
478 }
479 }))
480 .await
481 .into_iter()
482 .filter_map(|(peer_id, result)| match result {
483 Ok(()) => None,
484 Err(e) => Some((peer_id, e)),
485 })
486 .collect::<BTreeMap<_, _>>();
487
488 if peer_errors.is_empty() {
489 Ok(())
490 } else {
491 Err(FederationError {
492 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
493 params: serde_json::to_value(announcement).expect("can be serialized"),
494 general: None,
495 peer_errors,
496 })
497 }
498 }
499
500 async fn api_announcements(
501 &self,
502 guardian: PeerId,
503 ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
504 self.request_single_peer(
505 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
506 ApiRequestErased::default(),
507 guardian,
508 )
509 .await
510 }
511
512 async fn sign_api_announcement(
513 &self,
514 api_url: SafeUrl,
515 auth: ApiAuth,
516 ) -> FederationResult<SignedApiAnnouncement> {
517 self.request_admin(
518 SIGN_API_ANNOUNCEMENT_ENDPOINT,
519 ApiRequestErased::new(api_url),
520 auth,
521 )
522 .await
523 }
524
525 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
526 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
527 .await
528 }
529
530 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
531 self.request_admin(
532 BACKUP_STATISTICS_ENDPOINT,
533 ApiRequestErased::default(),
534 auth,
535 )
536 .await
537 }
538
539 async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
540 self.request_single_peer(
541 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
542 ApiRequestErased::default(),
543 peer_id,
544 )
545 .await
546 }
547
548 async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
549 self.request_single_peer(
550 INVITE_CODE_ENDPOINT.to_owned(),
551 ApiRequestErased::default(),
552 guardian,
553 )
554 .await
555 }
556
557 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
558 self.request_admin(
559 CHANGE_PASSWORD_ENDPOINT,
560 ApiRequestErased::new(new_password),
561 auth,
562 )
563 .await
564 }
565}