fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10    GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::ModuleInstanceId;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::endpoint_constants::{
16    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
17    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
18    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
19    FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT,
20    INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT,
21    RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT,
22    SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT,
23    SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
24    START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
25    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26    VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::audit::AuditSummary;
30use fedimint_core::module::registry::ModuleDecoderRegistry;
31use fedimint_core::module::{
32    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
33};
34use fedimint_core::net::api_announcement::{
35    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
36};
37use fedimint_core::session_outcome::{
38    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
39};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::future::join_all;
46use itertools::Itertools;
47use rand::seq::SliceRandom;
48use serde_json::Value;
49use tokio::sync::OnceCell;
50use tracing::debug;
51
52use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
53use crate::api::{
54    FederationApiExt, FederationError, FederationResult, PeerResult,
55    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
56};
57use crate::query::FilterMapThreshold;
58
59/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
60/// a [`GlobalFederationApiWithCache`]
61pub trait GlobalFederationApiWithCacheExt
62where
63    Self: Sized,
64{
65    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
66}
67
68impl<T> GlobalFederationApiWithCacheExt for T
69where
70    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
71{
72    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
73        GlobalFederationApiWithCache::new(self)
74    }
75}
76
77/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
78/// a tiny bit of caching.
79///
80/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
81#[derive(Debug)]
82pub struct GlobalFederationApiWithCache<T> {
83    pub(crate) inner: T,
84    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
85    ///
86    /// This is mostly to avoid multiple client module recovery processes
87    /// re-requesting same blocks and putting burden on the federation.
88    ///
89    /// The LRU can be be fairly small, as if the modules are
90    /// (near-)bottlenecked on fetching blocks they will naturally
91    /// synchronize, or split into a handful of groups. And if they are not,
92    /// no LRU here is going to help them.
93    pub(crate) await_session_lru:
94        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
95
96    /// Like [`Self::await_session_lru`], but for
97    /// [`IGlobalFederationApi::get_session_status`].
98    ///
99    /// In theory these two LRUs have the same content, but one is locked by
100    /// potentially long-blocking operation, while the other non-blocking one.
101    /// Given how tiny they are, it's not worth complicating things to unify
102    /// them.
103    pub(crate) get_session_status_lru:
104        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
105}
106
107impl<T> GlobalFederationApiWithCache<T> {
108    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
109        Self {
110            inner,
111            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112                NonZeroUsize::new(512).expect("is non-zero"),
113            ))),
114            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
115                NonZeroUsize::new(512).expect("is non-zero"),
116            ))),
117        }
118    }
119}
120
121impl<T> GlobalFederationApiWithCache<T>
122where
123    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
124{
125    pub(crate) async fn await_block_raw(
126        &self,
127        block_index: u64,
128        decoders: &ModuleDecoderRegistry,
129    ) -> anyhow::Result<SessionOutcome> {
130        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
132            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
133            ApiRequestErased::new(block_index),
134        )
135        .await?
136        .try_into_inner(decoders)
137        .map_err(|e| anyhow!(e.to_string()))
138    }
139
140    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
141        let mut peers = self.all_peers().iter().copied().collect_vec();
142        peers.shuffle(&mut rand::thread_rng());
143        peers.into_iter()
144    }
145
146    pub(crate) async fn get_session_status_raw_v2(
147        &self,
148        block_index: u64,
149        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
150        decoders: &ModuleDecoderRegistry,
151    ) -> anyhow::Result<SessionStatus> {
152        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
153        let params = ApiRequestErased::new(block_index);
154        let mut last_error = None;
155        // fetch serially
156        for peer_id in self.select_peers_for_status() {
157            match self
158                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
159                    SESSION_STATUS_V2_ENDPOINT.to_string(),
160                    params.clone(),
161                    peer_id,
162                )
163                .await
164                .map_err(anyhow::Error::from)
165                .and_then(|s| Ok(s.try_into_inner(decoders)?))
166            {
167                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
168                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
169                        // early return
170                        return Ok(SessionStatus::Complete(
171                            signed_session_outcome.session_outcome,
172                        ));
173                    }
174                    last_error = Some(format_err!("Invalid signature"));
175                }
176                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
177                    // no signature: use fallback method
178                    return self.get_session_status_raw(block_index, decoders).await;
179                }
180                Err(err) => {
181                    last_error = Some(err);
182                }
183            }
184            // if we loop then we must have last_error
185            assert!(last_error.is_some());
186        }
187        Err(last_error.expect("must have at least one peer"))
188    }
189
190    pub(crate) async fn get_session_status_raw(
191        &self,
192        block_index: u64,
193        decoders: &ModuleDecoderRegistry,
194    ) -> anyhow::Result<SessionStatus> {
195        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
196        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
197            SESSION_STATUS_ENDPOINT.to_string(),
198            ApiRequestErased::new(block_index),
199        )
200        .await?
201        .try_into_inner(&decoders.clone().with_fallback())
202        .map_err(|e| anyhow!(e))
203    }
204}
205
206#[apply(async_trait_maybe_send!)]
207impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
208where
209    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
210{
211    fn all_peers(&self) -> &BTreeSet<PeerId> {
212        self.inner.all_peers()
213    }
214
215    fn self_peer(&self) -> Option<PeerId> {
216        self.inner.self_peer()
217    }
218
219    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
220        self.inner.with_module(id)
221    }
222
223    /// Make request to a specific federation peer by `peer_id`
224    async fn request_raw(
225        &self,
226        peer_id: PeerId,
227        method: &str,
228        params: &ApiRequestErased,
229    ) -> PeerResult<Value> {
230        self.inner.request_raw(peer_id, method, params).await
231    }
232}
233
234#[apply(async_trait_maybe_send!)]
235impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
236where
237    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
238{
239    async fn await_block(
240        &self,
241        session_idx: u64,
242        decoders: &ModuleDecoderRegistry,
243    ) -> anyhow::Result<SessionOutcome> {
244        let mut lru_lock = self.await_session_lru.lock().await;
245
246        let entry_arc = lru_lock
247            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
248            .clone();
249
250        // we drop the lru lock so requests for other `session_idx` can work in parallel
251        drop(lru_lock);
252
253        entry_arc
254            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
255            .await
256            .cloned()
257    }
258
259    async fn get_session_status(
260        &self,
261        session_idx: u64,
262        decoders: &ModuleDecoderRegistry,
263        core_api_version: ApiVersion,
264        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
265    ) -> anyhow::Result<SessionStatus> {
266        let mut lru_lock = self.get_session_status_lru.lock().await;
267
268        let entry_arc = lru_lock
269            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
270            .clone();
271
272        // we drop the lru lock so requests for other `session_idx` can work in parallel
273        drop(lru_lock);
274
275        enum NoCacheErr {
276            Initial,
277            Pending(Vec<AcceptedItem>),
278            Err(anyhow::Error),
279        }
280        match entry_arc
281            .get_or_try_init(|| async {
282                let session_status =
283                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
284                        self.get_session_status_raw(session_idx, decoders).await
285                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
286                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
287                            .await
288                    } else {
289                        self.get_session_status_raw(session_idx, decoders).await
290                    };
291                match session_status {
292                    Err(e) => Err(NoCacheErr::Err(e)),
293                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
294                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
295                    // only status we can cache (hance outer Ok)
296                    Ok(SessionStatus::Complete(s)) => Ok(s),
297                }
298            })
299            .await
300            .cloned()
301        {
302            Ok(s) => Ok(SessionStatus::Complete(s)),
303            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
304            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
305            Err(NoCacheErr::Err(e)) => Err(e),
306        }
307    }
308
309    async fn submit_transaction(
310        &self,
311        tx: Transaction,
312    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
313        self.request_current_consensus_retry(
314            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
315            ApiRequestErased::new(SerdeTransaction::from(&tx)),
316        )
317        .await
318    }
319
320    async fn session_count(&self) -> FederationResult<u64> {
321        self.request_current_consensus(
322            SESSION_COUNT_ENDPOINT.to_owned(),
323            ApiRequestErased::default(),
324        )
325        .await
326    }
327
328    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
329        self.request_current_consensus_retry(
330            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
331            ApiRequestErased::new(txid),
332        )
333        .await
334    }
335
336    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
337        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
338            .await
339    }
340
341    async fn download_backup(
342        &self,
343        id: &secp256k1::PublicKey,
344    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
345        self.request_with_strategy(
346            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
347            RECOVER_ENDPOINT.to_owned(),
348            ApiRequestErased::new(id),
349        )
350        .await
351    }
352
353    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
354        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
355            .await
356    }
357
358    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
359        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
360            .await
361    }
362
363    async fn set_local_params(
364        &self,
365        name: String,
366        federation_name: Option<String>,
367        disable_base_fees: Option<bool>,
368        auth: ApiAuth,
369    ) -> FederationResult<String> {
370        self.request_admin(
371            SET_LOCAL_PARAMS_ENDPOINT,
372            ApiRequestErased::new(SetLocalParamsRequest {
373                name,
374                federation_name,
375                disable_base_fees,
376            }),
377            auth,
378        )
379        .await
380    }
381
382    async fn add_peer_connection_info(
383        &self,
384        info: String,
385        auth: ApiAuth,
386    ) -> FederationResult<String> {
387        self.request_admin(
388            ADD_PEER_SETUP_CODE_ENDPOINT,
389            ApiRequestErased::new(info),
390            auth,
391        )
392        .await
393    }
394
395    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
396        self.request_admin(
397            RESET_PEER_SETUP_CODES_ENDPOINT,
398            ApiRequestErased::default(),
399            auth,
400        )
401        .await
402    }
403
404    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
405        self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
406            .await
407    }
408
409    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
410        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
411            .await
412    }
413
414    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
415        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
416            .await
417    }
418
419    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
420        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
421            .await
422    }
423
424    async fn get_verify_config_hash(
425        &self,
426        auth: ApiAuth,
427    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
428        self.request_admin(
429            VERIFY_CONFIG_HASH_ENDPOINT,
430            ApiRequestErased::default(),
431            auth,
432        )
433        .await
434    }
435
436    async fn verified_configs(
437        &self,
438        auth: ApiAuth,
439    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
440        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
441            .await
442    }
443
444    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
445        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
446            .await
447    }
448
449    async fn status(&self) -> FederationResult<StatusResponse> {
450        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
451            .await
452    }
453
454    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
455        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
456            .await
457    }
458
459    async fn guardian_config_backup(
460        &self,
461        auth: ApiAuth,
462    ) -> FederationResult<GuardianConfigBackup> {
463        self.request_admin(
464            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
465            ApiRequestErased::default(),
466            auth,
467        )
468        .await
469    }
470
471    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
472        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
473            .await
474    }
475
476    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
477        self.request_admin(
478            RESTART_FEDERATION_SETUP_ENDPOINT,
479            ApiRequestErased::default(),
480            auth,
481        )
482        .await
483    }
484
485    async fn submit_api_announcement(
486        &self,
487        announcement_peer_id: PeerId,
488        announcement: SignedApiAnnouncement,
489    ) -> FederationResult<()> {
490        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
491            let announcement_inner = announcement.clone();
492            async move {
493                (
494                    peer_id,
495                    self.request_single_peer::<()>(
496                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
497                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
498                            signed_api_announcement: announcement_inner,
499                            peer_id: announcement_peer_id,
500                        }),
501                        peer_id,
502                    )
503                    .await,
504                )
505            }
506        }))
507        .await
508        .into_iter()
509        .filter_map(|(peer_id, result)| match result {
510            Ok(()) => None,
511            Err(e) => Some((peer_id, e)),
512        })
513        .collect::<BTreeMap<_, _>>();
514
515        if peer_errors.is_empty() {
516            Ok(())
517        } else {
518            Err(FederationError {
519                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
520                params: serde_json::to_value(announcement).expect("can be serialized"),
521                general: None,
522                peer_errors,
523            })
524        }
525    }
526
527    async fn api_announcements(
528        &self,
529        guardian: PeerId,
530    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
531        self.request_single_peer(
532            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
533            ApiRequestErased::default(),
534            guardian,
535        )
536        .await
537    }
538
539    async fn sign_api_announcement(
540        &self,
541        api_url: SafeUrl,
542        auth: ApiAuth,
543    ) -> FederationResult<SignedApiAnnouncement> {
544        self.request_admin(
545            SIGN_API_ANNOUNCEMENT_ENDPOINT,
546            ApiRequestErased::new(api_url),
547            auth,
548        )
549        .await
550    }
551
552    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
553        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
554            .await
555    }
556
557    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
558        self.request_admin(
559            BACKUP_STATISTICS_ENDPOINT,
560            ApiRequestErased::default(),
561            auth,
562        )
563        .await
564    }
565
566    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
567        self.request_single_peer(
568            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
569            ApiRequestErased::default(),
570            peer_id,
571        )
572        .await
573    }
574
575    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
576        self.request_single_peer(
577            INVITE_CODE_ENDPOINT.to_owned(),
578            ApiRequestErased::default(),
579            guardian,
580        )
581        .await
582    }
583}