fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_connectors::ServerResult;
10use fedimint_core::admin_client::{
11    GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
12};
13use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
14use fedimint_core::core::ModuleInstanceId;
15use fedimint_core::core::backup::SignedBackupRequest;
16use fedimint_core::endpoint_constants::{
17    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
18    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
19    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
20    CONFIG_GEN_PEERS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT,
21    GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
22    RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
23    SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
24    SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
25    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
26    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
27    VERIFY_CONFIG_HASH_ENDPOINT,
28};
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::audit::AuditSummary;
31use fedimint_core::module::registry::ModuleDecoderRegistry;
32use fedimint_core::module::{
33    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
34};
35use fedimint_core::net::api_announcement::{
36    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
37};
38use fedimint_core::session_outcome::{
39    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
40};
41use fedimint_core::task::{MaybeSend, MaybeSync};
42use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
43use fedimint_core::util::SafeUrl;
44use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
45use fedimint_logging::LOG_CLIENT_NET_API;
46use futures::future::join_all;
47use itertools::Itertools;
48use rand::seq::SliceRandom;
49use serde_json::Value;
50use tokio::sync::OnceCell;
51use tracing::{debug, trace};
52
53use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
54use crate::api::{
55    FederationApiExt, FederationError, FederationResult,
56    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
57};
58use crate::query::FilterMapThreshold;
59
60/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
61/// a [`GlobalFederationApiWithCache`]
62pub trait GlobalFederationApiWithCacheExt
63where
64    Self: Sized,
65{
66    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
67}
68
69impl<T> GlobalFederationApiWithCacheExt for T
70where
71    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
72{
73    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
74        GlobalFederationApiWithCache::new(self)
75    }
76}
77
78/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
79/// a tiny bit of caching.
80///
81/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
82#[derive(Debug)]
83pub struct GlobalFederationApiWithCache<T> {
84    pub(crate) inner: T,
85    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
86    ///
87    /// This is mostly to avoid multiple client module recovery processes
88    /// re-requesting same blocks and putting burden on the federation.
89    ///
90    /// The LRU can be be fairly small, as if the modules are
91    /// (near-)bottlenecked on fetching blocks they will naturally
92    /// synchronize, or split into a handful of groups. And if they are not,
93    /// no LRU here is going to help them.
94    pub(crate) await_session_lru:
95        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
96
97    /// Like [`Self::await_session_lru`], but for
98    /// [`IGlobalFederationApi::get_session_status`].
99    ///
100    /// In theory these two LRUs have the same content, but one is locked by
101    /// potentially long-blocking operation, while the other non-blocking one.
102    /// Given how tiny they are, it's not worth complicating things to unify
103    /// them.
104    pub(crate) get_session_status_lru:
105        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
106}
107
108impl<T> GlobalFederationApiWithCache<T> {
109    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
110        Self {
111            inner,
112            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113                NonZeroUsize::new(512).expect("is non-zero"),
114            ))),
115            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
116                NonZeroUsize::new(512).expect("is non-zero"),
117            ))),
118        }
119    }
120}
121
122impl<T> GlobalFederationApiWithCache<T>
123where
124    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
125{
126    pub(crate) async fn await_block_raw(
127        &self,
128        block_index: u64,
129        decoders: &ModuleDecoderRegistry,
130    ) -> anyhow::Result<SessionOutcome> {
131        if block_index % 100 == 0 {
132            debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
133        } else {
134            trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
135        }
136        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
137            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
138            ApiRequestErased::new(block_index),
139        )
140        .await?
141        .try_into_inner(decoders)
142        .map_err(|e| anyhow!(e.to_string()))
143    }
144
145    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
146        let mut peers = self.all_peers().iter().copied().collect_vec();
147        peers.shuffle(&mut rand::thread_rng());
148        peers.into_iter()
149    }
150
151    pub(crate) async fn get_session_status_raw_v2(
152        &self,
153        block_index: u64,
154        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
155        decoders: &ModuleDecoderRegistry,
156    ) -> anyhow::Result<SessionStatus> {
157        if block_index % 100 == 0 {
158            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
159        } else {
160            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
161        }
162        let params = ApiRequestErased::new(block_index);
163        let mut last_error = None;
164        // fetch serially
165        for peer_id in self.select_peers_for_status() {
166            match self
167                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
168                    SESSION_STATUS_V2_ENDPOINT.to_string(),
169                    params.clone(),
170                    peer_id,
171                )
172                .await
173                .map_err(anyhow::Error::from)
174                .and_then(|s| Ok(s.try_into_inner(decoders)?))
175            {
176                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
177                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
178                        // early return
179                        return Ok(SessionStatus::Complete(
180                            signed_session_outcome.session_outcome,
181                        ));
182                    }
183                    last_error = Some(format_err!("Invalid signature"));
184                }
185                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
186                    // no signature: use fallback method
187                    return self.get_session_status_raw(block_index, decoders).await;
188                }
189                Err(err) => {
190                    last_error = Some(err);
191                }
192            }
193            // if we loop then we must have last_error
194            assert!(last_error.is_some());
195        }
196        Err(last_error.expect("must have at least one peer"))
197    }
198
199    pub(crate) async fn get_session_status_raw(
200        &self,
201        block_index: u64,
202        decoders: &ModuleDecoderRegistry,
203    ) -> anyhow::Result<SessionStatus> {
204        if block_index % 100 == 0 {
205            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
206        } else {
207            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
208        }
209        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
210            SESSION_STATUS_ENDPOINT.to_string(),
211            ApiRequestErased::new(block_index),
212        )
213        .await?
214        .try_into_inner(&decoders.clone().with_fallback())
215        .map_err(|e| anyhow!(e))
216    }
217}
218
219#[apply(async_trait_maybe_send!)]
220impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
221where
222    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
223{
224    fn all_peers(&self) -> &BTreeSet<PeerId> {
225        self.inner.all_peers()
226    }
227
228    fn self_peer(&self) -> Option<PeerId> {
229        self.inner.self_peer()
230    }
231
232    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
233        self.inner.with_module(id)
234    }
235
236    /// Make request to a specific federation peer by `peer_id`
237    async fn request_raw(
238        &self,
239        peer_id: PeerId,
240        method: &str,
241        params: &ApiRequestErased,
242    ) -> ServerResult<Value> {
243        self.inner.request_raw(peer_id, method, params).await
244    }
245}
246
247#[apply(async_trait_maybe_send!)]
248impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
249where
250    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
251{
252    async fn await_block(
253        &self,
254        session_idx: u64,
255        decoders: &ModuleDecoderRegistry,
256    ) -> anyhow::Result<SessionOutcome> {
257        let mut lru_lock = self.await_session_lru.lock().await;
258
259        let entry_arc = lru_lock
260            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
261            .clone();
262
263        // we drop the lru lock so requests for other `session_idx` can work in parallel
264        drop(lru_lock);
265
266        entry_arc
267            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
268            .await
269            .cloned()
270    }
271
272    async fn get_session_status(
273        &self,
274        session_idx: u64,
275        decoders: &ModuleDecoderRegistry,
276        core_api_version: ApiVersion,
277        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
278    ) -> anyhow::Result<SessionStatus> {
279        let mut lru_lock = self.get_session_status_lru.lock().await;
280
281        let entry_arc = lru_lock
282            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
283            .clone();
284
285        // we drop the lru lock so requests for other `session_idx` can work in parallel
286        drop(lru_lock);
287
288        enum NoCacheErr {
289            Initial,
290            Pending(Vec<AcceptedItem>),
291            Err(anyhow::Error),
292        }
293        match entry_arc
294            .get_or_try_init(|| async {
295                let session_status =
296                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
297                        self.get_session_status_raw(session_idx, decoders).await
298                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
299                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
300                            .await
301                    } else {
302                        self.get_session_status_raw(session_idx, decoders).await
303                    };
304                match session_status {
305                    Err(e) => Err(NoCacheErr::Err(e)),
306                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
307                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
308                    // only status we can cache (hance outer Ok)
309                    Ok(SessionStatus::Complete(s)) => Ok(s),
310                }
311            })
312            .await
313            .cloned()
314        {
315            Ok(s) => Ok(SessionStatus::Complete(s)),
316            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
317            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
318            Err(NoCacheErr::Err(e)) => Err(e),
319        }
320    }
321
322    async fn submit_transaction(
323        &self,
324        tx: Transaction,
325    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
326        self.request_current_consensus_retry(
327            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
328            ApiRequestErased::new(SerdeTransaction::from(&tx)),
329        )
330        .await
331    }
332
333    async fn session_count(&self) -> FederationResult<u64> {
334        self.request_current_consensus(
335            SESSION_COUNT_ENDPOINT.to_owned(),
336            ApiRequestErased::default(),
337        )
338        .await
339    }
340
341    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
342        self.request_current_consensus_retry(
343            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
344            ApiRequestErased::new(txid),
345        )
346        .await
347    }
348
349    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
350        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
351            .await
352    }
353
354    async fn download_backup(
355        &self,
356        id: &secp256k1::PublicKey,
357    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
358        self.request_with_strategy(
359            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
360            RECOVER_ENDPOINT.to_owned(),
361            ApiRequestErased::new(id),
362        )
363        .await
364    }
365
366    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
367        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
368            .await
369    }
370
371    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
372        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
373            .await
374    }
375
376    async fn set_local_params(
377        &self,
378        name: String,
379        federation_name: Option<String>,
380        disable_base_fees: Option<bool>,
381        auth: ApiAuth,
382    ) -> FederationResult<String> {
383        self.request_admin(
384            SET_LOCAL_PARAMS_ENDPOINT,
385            ApiRequestErased::new(SetLocalParamsRequest {
386                name,
387                federation_name,
388                disable_base_fees,
389            }),
390            auth,
391        )
392        .await
393    }
394
395    async fn add_peer_connection_info(
396        &self,
397        info: String,
398        auth: ApiAuth,
399    ) -> FederationResult<String> {
400        self.request_admin(
401            ADD_PEER_SETUP_CODE_ENDPOINT,
402            ApiRequestErased::new(info),
403            auth,
404        )
405        .await
406    }
407
408    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
409        self.request_admin(
410            RESET_PEER_SETUP_CODES_ENDPOINT,
411            ApiRequestErased::default(),
412            auth,
413        )
414        .await
415    }
416
417    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
418        self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
419            .await
420    }
421
422    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
423        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
424            .await
425    }
426
427    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
428        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
429            .await
430    }
431
432    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
433        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
434            .await
435    }
436
437    async fn get_verify_config_hash(
438        &self,
439        auth: ApiAuth,
440    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
441        self.request_admin(
442            VERIFY_CONFIG_HASH_ENDPOINT,
443            ApiRequestErased::default(),
444            auth,
445        )
446        .await
447    }
448
449    async fn verified_configs(
450        &self,
451        auth: ApiAuth,
452    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
453        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
454            .await
455    }
456
457    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
458        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
459            .await
460    }
461
462    async fn status(&self) -> FederationResult<StatusResponse> {
463        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
464            .await
465    }
466
467    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
468        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
469            .await
470    }
471
472    async fn guardian_config_backup(
473        &self,
474        auth: ApiAuth,
475    ) -> FederationResult<GuardianConfigBackup> {
476        self.request_admin(
477            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
478            ApiRequestErased::default(),
479            auth,
480        )
481        .await
482    }
483
484    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
485        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
486            .await
487    }
488
489    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
490        self.request_admin(
491            RESTART_FEDERATION_SETUP_ENDPOINT,
492            ApiRequestErased::default(),
493            auth,
494        )
495        .await
496    }
497
498    async fn submit_api_announcement(
499        &self,
500        announcement_peer_id: PeerId,
501        announcement: SignedApiAnnouncement,
502    ) -> FederationResult<()> {
503        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
504            let announcement_inner = announcement.clone();
505            async move {
506                (
507                    peer_id,
508                    self.request_single_peer::<()>(
509                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
510                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
511                            signed_api_announcement: announcement_inner,
512                            peer_id: announcement_peer_id,
513                        }),
514                        peer_id,
515                    )
516                    .await,
517                )
518            }
519        }))
520        .await
521        .into_iter()
522        .filter_map(|(peer_id, result)| match result {
523            Ok(()) => None,
524            Err(e) => Some((peer_id, e)),
525        })
526        .collect::<BTreeMap<_, _>>();
527
528        if peer_errors.is_empty() {
529            Ok(())
530        } else {
531            Err(FederationError {
532                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
533                params: serde_json::to_value(announcement).expect("can be serialized"),
534                general: None,
535                peer_errors,
536            })
537        }
538    }
539
540    async fn api_announcements(
541        &self,
542        guardian: PeerId,
543    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
544        self.request_single_peer(
545            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
546            ApiRequestErased::default(),
547            guardian,
548        )
549        .await
550    }
551
552    async fn sign_api_announcement(
553        &self,
554        api_url: SafeUrl,
555        auth: ApiAuth,
556    ) -> FederationResult<SignedApiAnnouncement> {
557        self.request_admin(
558            SIGN_API_ANNOUNCEMENT_ENDPOINT,
559            ApiRequestErased::new(api_url),
560            auth,
561        )
562        .await
563    }
564
565    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
566        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
567            .await
568    }
569
570    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
571        self.request_admin(
572            BACKUP_STATISTICS_ENDPOINT,
573            ApiRequestErased::default(),
574            auth,
575        )
576        .await
577    }
578
579    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
580        self.request_single_peer(
581            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
582            ApiRequestErased::default(),
583            peer_id,
584        )
585        .await
586    }
587
588    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
589        self.request_single_peer(
590            INVITE_CODE_ENDPOINT.to_owned(),
591            ApiRequestErased::default(),
592            guardian,
593        )
594        .await
595    }
596
597    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
598        self.request_admin(
599            CHANGE_PASSWORD_ENDPOINT,
600            ApiRequestErased::new(new_password),
601            auth,
602        )
603        .await
604    }
605}