fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::ServerResult;
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::backup::SignedBackupRequest;
12use fedimint_core::core::{ModuleInstanceId, ModuleKind};
13use fedimint_core::endpoint_constants::{
14    ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15    AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16    BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
17    GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18    RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19    SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20    SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT,
23};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use futures::stream::BoxStream;
43use itertools::Itertools;
44use rand::seq::SliceRandom;
45use serde_json::Value;
46use tokio::sync::OnceCell;
47use tracing::{debug, trace};
48
49use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
50use crate::api::{
51    FederationApiExt, FederationError, FederationResult,
52    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
53};
54use crate::query::FilterMapThreshold;
55
56/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
57/// a [`GlobalFederationApiWithCache`]
58pub trait GlobalFederationApiWithCacheExt
59where
60    Self: Sized,
61{
62    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
63}
64
65impl<T> GlobalFederationApiWithCacheExt for T
66where
67    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
68{
69    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
70        GlobalFederationApiWithCache::new(self)
71    }
72}
73
74/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
75/// a tiny bit of caching.
76///
77/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
78#[derive(Debug)]
79pub struct GlobalFederationApiWithCache<T> {
80    pub(crate) inner: T,
81    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
82    ///
83    /// This is mostly to avoid multiple client module recovery processes
84    /// re-requesting same blocks and putting burden on the federation.
85    ///
86    /// The LRU can be be fairly small, as if the modules are
87    /// (near-)bottlenecked on fetching blocks they will naturally
88    /// synchronize, or split into a handful of groups. And if they are not,
89    /// no LRU here is going to help them.
90    pub(crate) await_session_lru:
91        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
92
93    /// Like [`Self::await_session_lru`], but for
94    /// [`IGlobalFederationApi::get_session_status`].
95    ///
96    /// In theory these two LRUs have the same content, but one is locked by
97    /// potentially long-blocking operation, while the other non-blocking one.
98    /// Given how tiny they are, it's not worth complicating things to unify
99    /// them.
100    pub(crate) get_session_status_lru:
101        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
102}
103
104impl<T> GlobalFederationApiWithCache<T> {
105    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
106        Self {
107            inner,
108            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
109                NonZeroUsize::new(512).expect("is non-zero"),
110            ))),
111            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112                NonZeroUsize::new(512).expect("is non-zero"),
113            ))),
114        }
115    }
116}
117
118impl<T> GlobalFederationApiWithCache<T>
119where
120    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
121{
122    pub(crate) async fn await_block_raw(
123        &self,
124        block_index: u64,
125        decoders: &ModuleDecoderRegistry,
126    ) -> anyhow::Result<SessionOutcome> {
127        if block_index % 100 == 0 {
128            debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129        } else {
130            trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
131        }
132        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
133            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
134            ApiRequestErased::new(block_index),
135        )
136        .await?
137        .try_into_inner(decoders)
138        .map_err(|e| anyhow!(e.to_string()))
139    }
140
141    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
142        let mut peers = self.all_peers().iter().copied().collect_vec();
143        peers.shuffle(&mut rand::thread_rng());
144        peers.into_iter()
145    }
146
147    pub(crate) async fn get_session_status_raw_v2(
148        &self,
149        block_index: u64,
150        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
151        decoders: &ModuleDecoderRegistry,
152    ) -> anyhow::Result<SessionStatus> {
153        if block_index % 100 == 0 {
154            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
155        } else {
156            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
157        }
158        let params = ApiRequestErased::new(block_index);
159        let mut last_error = None;
160        // fetch serially
161        for peer_id in self.select_peers_for_status() {
162            match self
163                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
164                    SESSION_STATUS_V2_ENDPOINT.to_string(),
165                    params.clone(),
166                    peer_id,
167                )
168                .await
169                .map_err(anyhow::Error::from)
170                .and_then(|s| Ok(s.try_into_inner(decoders)?))
171            {
172                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
173                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
174                        // early return
175                        return Ok(SessionStatus::Complete(
176                            signed_session_outcome.session_outcome,
177                        ));
178                    }
179                    last_error = Some(format_err!("Invalid signature"));
180                }
181                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
182                    // no signature: use fallback method
183                    return self.get_session_status_raw(block_index, decoders).await;
184                }
185                Err(err) => {
186                    last_error = Some(err);
187                }
188            }
189            // if we loop then we must have last_error
190            assert!(last_error.is_some());
191        }
192        Err(last_error.expect("must have at least one peer"))
193    }
194
195    pub(crate) async fn get_session_status_raw(
196        &self,
197        block_index: u64,
198        decoders: &ModuleDecoderRegistry,
199    ) -> anyhow::Result<SessionStatus> {
200        if block_index % 100 == 0 {
201            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
202        } else {
203            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
204        }
205        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
206            SESSION_STATUS_ENDPOINT.to_string(),
207            ApiRequestErased::new(block_index),
208        )
209        .await?
210        .try_into_inner(&decoders.clone().with_fallback())
211        .map_err(|e| anyhow!(e))
212    }
213}
214
215#[apply(async_trait_maybe_send!)]
216impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
217where
218    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
219{
220    fn all_peers(&self) -> &BTreeSet<PeerId> {
221        self.inner.all_peers()
222    }
223
224    fn self_peer(&self) -> Option<PeerId> {
225        self.inner.self_peer()
226    }
227
228    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
229        self.inner.with_module(id)
230    }
231
232    /// Make request to a specific federation peer by `peer_id`
233    async fn request_raw(
234        &self,
235        peer_id: PeerId,
236        method: &str,
237        params: &ApiRequestErased,
238    ) -> ServerResult<Value> {
239        self.inner.request_raw(peer_id, method, params).await
240    }
241
242    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
243        self.inner.connection_status_stream()
244    }
245
246    async fn wait_for_initialized_connections(&self) {
247        self.inner.wait_for_initialized_connections().await;
248    }
249}
250
251#[apply(async_trait_maybe_send!)]
252impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
253where
254    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
255{
256    async fn await_block(
257        &self,
258        session_idx: u64,
259        decoders: &ModuleDecoderRegistry,
260    ) -> anyhow::Result<SessionOutcome> {
261        let mut lru_lock = self.await_session_lru.lock().await;
262
263        let entry_arc = lru_lock
264            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
265            .clone();
266
267        // we drop the lru lock so requests for other `session_idx` can work in parallel
268        drop(lru_lock);
269
270        entry_arc
271            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
272            .await
273            .cloned()
274    }
275
276    async fn get_session_status(
277        &self,
278        session_idx: u64,
279        decoders: &ModuleDecoderRegistry,
280        core_api_version: ApiVersion,
281        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
282    ) -> anyhow::Result<SessionStatus> {
283        let mut lru_lock = self.get_session_status_lru.lock().await;
284
285        let entry_arc = lru_lock
286            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
287            .clone();
288
289        // we drop the lru lock so requests for other `session_idx` can work in parallel
290        drop(lru_lock);
291
292        enum NoCacheErr {
293            Initial,
294            Pending(Vec<AcceptedItem>),
295            Err(anyhow::Error),
296        }
297        match entry_arc
298            .get_or_try_init(|| async {
299                let session_status =
300                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
301                        self.get_session_status_raw(session_idx, decoders).await
302                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
303                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
304                            .await
305                    } else {
306                        self.get_session_status_raw(session_idx, decoders).await
307                    };
308                match session_status {
309                    Err(e) => Err(NoCacheErr::Err(e)),
310                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
311                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
312                    // only status we can cache (hance outer Ok)
313                    Ok(SessionStatus::Complete(s)) => Ok(s),
314                }
315            })
316            .await
317            .cloned()
318        {
319            Ok(s) => Ok(SessionStatus::Complete(s)),
320            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
321            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
322            Err(NoCacheErr::Err(e)) => Err(e),
323        }
324    }
325
326    async fn submit_transaction(
327        &self,
328        tx: Transaction,
329    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
330        self.request_current_consensus_retry(
331            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
332            ApiRequestErased::new(SerdeTransaction::from(&tx)),
333        )
334        .await
335    }
336
337    async fn session_count(&self) -> FederationResult<u64> {
338        self.request_current_consensus(
339            SESSION_COUNT_ENDPOINT.to_owned(),
340            ApiRequestErased::default(),
341        )
342        .await
343    }
344
345    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
346        self.request_current_consensus_retry(
347            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
348            ApiRequestErased::new(txid),
349        )
350        .await
351    }
352
353    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
354        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
355            .await
356    }
357
358    async fn download_backup(
359        &self,
360        id: &secp256k1::PublicKey,
361    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
362        self.request_with_strategy(
363            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
364            RECOVER_ENDPOINT.to_owned(),
365            ApiRequestErased::new(id),
366        )
367        .await
368    }
369
370    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
371        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
372            .await
373    }
374
375    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
376        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
377            .await
378    }
379
380    async fn set_local_params(
381        &self,
382        name: String,
383        federation_name: Option<String>,
384        disable_base_fees: Option<bool>,
385        enabled_modules: Option<BTreeSet<ModuleKind>>,
386        auth: ApiAuth,
387    ) -> FederationResult<String> {
388        self.request_admin(
389            SET_LOCAL_PARAMS_ENDPOINT,
390            ApiRequestErased::new(SetLocalParamsRequest {
391                name,
392                federation_name,
393                disable_base_fees,
394                enabled_modules,
395            }),
396            auth,
397        )
398        .await
399    }
400
401    async fn add_peer_connection_info(
402        &self,
403        info: String,
404        auth: ApiAuth,
405    ) -> FederationResult<String> {
406        self.request_admin(
407            ADD_PEER_SETUP_CODE_ENDPOINT,
408            ApiRequestErased::new(info),
409            auth,
410        )
411        .await
412    }
413
414    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
415        self.request_admin(
416            RESET_PEER_SETUP_CODES_ENDPOINT,
417            ApiRequestErased::default(),
418            auth,
419        )
420        .await
421    }
422
423    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
424        self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
425            .await
426    }
427
428    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
429        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
430            .await
431    }
432
433    async fn status(&self) -> FederationResult<StatusResponse> {
434        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
435            .await
436    }
437
438    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
439        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
440            .await
441    }
442
443    async fn guardian_config_backup(
444        &self,
445        auth: ApiAuth,
446    ) -> FederationResult<GuardianConfigBackup> {
447        self.request_admin(
448            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
449            ApiRequestErased::default(),
450            auth,
451        )
452        .await
453    }
454
455    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
456        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
457            .await
458    }
459
460    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
461        self.request_admin(
462            RESTART_FEDERATION_SETUP_ENDPOINT,
463            ApiRequestErased::default(),
464            auth,
465        )
466        .await
467    }
468
469    async fn submit_api_announcement(
470        &self,
471        announcement_peer_id: PeerId,
472        announcement: SignedApiAnnouncement,
473    ) -> FederationResult<()> {
474        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
475            let announcement_inner = announcement.clone();
476            async move {
477                (
478                    peer_id,
479                    self.request_single_peer::<()>(
480                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
481                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
482                            signed_api_announcement: announcement_inner,
483                            peer_id: announcement_peer_id,
484                        }),
485                        peer_id,
486                    )
487                    .await,
488                )
489            }
490        }))
491        .await
492        .into_iter()
493        .filter_map(|(peer_id, result)| match result {
494            Ok(()) => None,
495            Err(e) => Some((peer_id, e)),
496        })
497        .collect::<BTreeMap<_, _>>();
498
499        if peer_errors.is_empty() {
500            Ok(())
501        } else {
502            Err(FederationError {
503                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
504                params: serde_json::to_value(announcement).expect("can be serialized"),
505                general: None,
506                peer_errors,
507            })
508        }
509    }
510
511    async fn api_announcements(
512        &self,
513        guardian: PeerId,
514    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
515        self.request_single_peer(
516            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
517            ApiRequestErased::default(),
518            guardian,
519        )
520        .await
521    }
522
523    async fn sign_api_announcement(
524        &self,
525        api_url: SafeUrl,
526        auth: ApiAuth,
527    ) -> FederationResult<SignedApiAnnouncement> {
528        self.request_admin(
529            SIGN_API_ANNOUNCEMENT_ENDPOINT,
530            ApiRequestErased::new(api_url),
531            auth,
532        )
533        .await
534    }
535
536    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
537        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
538            .await
539    }
540
541    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
542        self.request_admin(
543            BACKUP_STATISTICS_ENDPOINT,
544            ApiRequestErased::default(),
545            auth,
546        )
547        .await
548    }
549
550    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
551        self.request_single_peer(
552            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
553            ApiRequestErased::default(),
554            peer_id,
555        )
556        .await
557    }
558
559    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
560        self.request_single_peer(
561            INVITE_CODE_ENDPOINT.to_owned(),
562            ApiRequestErased::default(),
563            guardian,
564        )
565        .await
566    }
567
568    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
569        self.request_admin(
570            CHANGE_PASSWORD_ENDPOINT,
571            ApiRequestErased::new(new_password),
572            auth,
573        )
574        .await
575    }
576}