fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::secp256k1;
8use fedimint_connectors::ServerResult;
9use fedimint_core::admin_client::{GuardianConfigBackup, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14    ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT,
15    AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
16    BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
17    GET_SETUP_CODE_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18    RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19    SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20    SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT,
23};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use itertools::Itertools;
43use rand::seq::SliceRandom;
44use serde_json::Value;
45use tokio::sync::OnceCell;
46use tracing::{debug, trace};
47
48use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
49use crate::api::{
50    FederationApiExt, FederationError, FederationResult,
51    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
52};
53use crate::query::FilterMapThreshold;
54
55/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
56/// a [`GlobalFederationApiWithCache`]
57pub trait GlobalFederationApiWithCacheExt
58where
59    Self: Sized,
60{
61    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
62}
63
64impl<T> GlobalFederationApiWithCacheExt for T
65where
66    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
67{
68    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
69        GlobalFederationApiWithCache::new(self)
70    }
71}
72
73/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
74/// a tiny bit of caching.
75///
76/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
77#[derive(Debug)]
78pub struct GlobalFederationApiWithCache<T> {
79    pub(crate) inner: T,
80    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
81    ///
82    /// This is mostly to avoid multiple client module recovery processes
83    /// re-requesting same blocks and putting burden on the federation.
84    ///
85    /// The LRU can be be fairly small, as if the modules are
86    /// (near-)bottlenecked on fetching blocks they will naturally
87    /// synchronize, or split into a handful of groups. And if they are not,
88    /// no LRU here is going to help them.
89    pub(crate) await_session_lru:
90        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
91
92    /// Like [`Self::await_session_lru`], but for
93    /// [`IGlobalFederationApi::get_session_status`].
94    ///
95    /// In theory these two LRUs have the same content, but one is locked by
96    /// potentially long-blocking operation, while the other non-blocking one.
97    /// Given how tiny they are, it's not worth complicating things to unify
98    /// them.
99    pub(crate) get_session_status_lru:
100        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
101}
102
103impl<T> GlobalFederationApiWithCache<T> {
104    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
105        Self {
106            inner,
107            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
108                NonZeroUsize::new(512).expect("is non-zero"),
109            ))),
110            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111                NonZeroUsize::new(512).expect("is non-zero"),
112            ))),
113        }
114    }
115}
116
117impl<T> GlobalFederationApiWithCache<T>
118where
119    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
120{
121    pub(crate) async fn await_block_raw(
122        &self,
123        block_index: u64,
124        decoders: &ModuleDecoderRegistry,
125    ) -> anyhow::Result<SessionOutcome> {
126        if block_index % 100 == 0 {
127            debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
128        } else {
129            trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
130        }
131        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
132            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
133            ApiRequestErased::new(block_index),
134        )
135        .await?
136        .try_into_inner(decoders)
137        .map_err(|e| anyhow!(e.to_string()))
138    }
139
140    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
141        let mut peers = self.all_peers().iter().copied().collect_vec();
142        peers.shuffle(&mut rand::thread_rng());
143        peers.into_iter()
144    }
145
146    pub(crate) async fn get_session_status_raw_v2(
147        &self,
148        block_index: u64,
149        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
150        decoders: &ModuleDecoderRegistry,
151    ) -> anyhow::Result<SessionStatus> {
152        if block_index % 100 == 0 {
153            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
154        } else {
155            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
156        }
157        let params = ApiRequestErased::new(block_index);
158        let mut last_error = None;
159        // fetch serially
160        for peer_id in self.select_peers_for_status() {
161            match self
162                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
163                    SESSION_STATUS_V2_ENDPOINT.to_string(),
164                    params.clone(),
165                    peer_id,
166                )
167                .await
168                .map_err(anyhow::Error::from)
169                .and_then(|s| Ok(s.try_into_inner(decoders)?))
170            {
171                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
172                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
173                        // early return
174                        return Ok(SessionStatus::Complete(
175                            signed_session_outcome.session_outcome,
176                        ));
177                    }
178                    last_error = Some(format_err!("Invalid signature"));
179                }
180                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
181                    // no signature: use fallback method
182                    return self.get_session_status_raw(block_index, decoders).await;
183                }
184                Err(err) => {
185                    last_error = Some(err);
186                }
187            }
188            // if we loop then we must have last_error
189            assert!(last_error.is_some());
190        }
191        Err(last_error.expect("must have at least one peer"))
192    }
193
194    pub(crate) async fn get_session_status_raw(
195        &self,
196        block_index: u64,
197        decoders: &ModuleDecoderRegistry,
198    ) -> anyhow::Result<SessionStatus> {
199        if block_index % 100 == 0 {
200            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
201        } else {
202            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
203        }
204        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
205            SESSION_STATUS_ENDPOINT.to_string(),
206            ApiRequestErased::new(block_index),
207        )
208        .await?
209        .try_into_inner(&decoders.clone().with_fallback())
210        .map_err(|e| anyhow!(e))
211    }
212}
213
214#[apply(async_trait_maybe_send!)]
215impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
216where
217    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
218{
219    fn all_peers(&self) -> &BTreeSet<PeerId> {
220        self.inner.all_peers()
221    }
222
223    fn self_peer(&self) -> Option<PeerId> {
224        self.inner.self_peer()
225    }
226
227    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
228        self.inner.with_module(id)
229    }
230
231    /// Make request to a specific federation peer by `peer_id`
232    async fn request_raw(
233        &self,
234        peer_id: PeerId,
235        method: &str,
236        params: &ApiRequestErased,
237    ) -> ServerResult<Value> {
238        self.inner.request_raw(peer_id, method, params).await
239    }
240}
241
242#[apply(async_trait_maybe_send!)]
243impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
244where
245    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
246{
247    async fn await_block(
248        &self,
249        session_idx: u64,
250        decoders: &ModuleDecoderRegistry,
251    ) -> anyhow::Result<SessionOutcome> {
252        let mut lru_lock = self.await_session_lru.lock().await;
253
254        let entry_arc = lru_lock
255            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
256            .clone();
257
258        // we drop the lru lock so requests for other `session_idx` can work in parallel
259        drop(lru_lock);
260
261        entry_arc
262            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
263            .await
264            .cloned()
265    }
266
267    async fn get_session_status(
268        &self,
269        session_idx: u64,
270        decoders: &ModuleDecoderRegistry,
271        core_api_version: ApiVersion,
272        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
273    ) -> anyhow::Result<SessionStatus> {
274        let mut lru_lock = self.get_session_status_lru.lock().await;
275
276        let entry_arc = lru_lock
277            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
278            .clone();
279
280        // we drop the lru lock so requests for other `session_idx` can work in parallel
281        drop(lru_lock);
282
283        enum NoCacheErr {
284            Initial,
285            Pending(Vec<AcceptedItem>),
286            Err(anyhow::Error),
287        }
288        match entry_arc
289            .get_or_try_init(|| async {
290                let session_status =
291                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
292                        self.get_session_status_raw(session_idx, decoders).await
293                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
294                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
295                            .await
296                    } else {
297                        self.get_session_status_raw(session_idx, decoders).await
298                    };
299                match session_status {
300                    Err(e) => Err(NoCacheErr::Err(e)),
301                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
302                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
303                    // only status we can cache (hance outer Ok)
304                    Ok(SessionStatus::Complete(s)) => Ok(s),
305                }
306            })
307            .await
308            .cloned()
309        {
310            Ok(s) => Ok(SessionStatus::Complete(s)),
311            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
312            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
313            Err(NoCacheErr::Err(e)) => Err(e),
314        }
315    }
316
317    async fn submit_transaction(
318        &self,
319        tx: Transaction,
320    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
321        self.request_current_consensus_retry(
322            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
323            ApiRequestErased::new(SerdeTransaction::from(&tx)),
324        )
325        .await
326    }
327
328    async fn session_count(&self) -> FederationResult<u64> {
329        self.request_current_consensus(
330            SESSION_COUNT_ENDPOINT.to_owned(),
331            ApiRequestErased::default(),
332        )
333        .await
334    }
335
336    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
337        self.request_current_consensus_retry(
338            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
339            ApiRequestErased::new(txid),
340        )
341        .await
342    }
343
344    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
345        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
346            .await
347    }
348
349    async fn download_backup(
350        &self,
351        id: &secp256k1::PublicKey,
352    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
353        self.request_with_strategy(
354            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
355            RECOVER_ENDPOINT.to_owned(),
356            ApiRequestErased::new(id),
357        )
358        .await
359    }
360
361    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
362        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
363            .await
364    }
365
366    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
367        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
368            .await
369    }
370
371    async fn set_local_params(
372        &self,
373        name: String,
374        federation_name: Option<String>,
375        disable_base_fees: Option<bool>,
376        auth: ApiAuth,
377    ) -> FederationResult<String> {
378        self.request_admin(
379            SET_LOCAL_PARAMS_ENDPOINT,
380            ApiRequestErased::new(SetLocalParamsRequest {
381                name,
382                federation_name,
383                disable_base_fees,
384            }),
385            auth,
386        )
387        .await
388    }
389
390    async fn add_peer_connection_info(
391        &self,
392        info: String,
393        auth: ApiAuth,
394    ) -> FederationResult<String> {
395        self.request_admin(
396            ADD_PEER_SETUP_CODE_ENDPOINT,
397            ApiRequestErased::new(info),
398            auth,
399        )
400        .await
401    }
402
403    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
404        self.request_admin(
405            RESET_PEER_SETUP_CODES_ENDPOINT,
406            ApiRequestErased::default(),
407            auth,
408        )
409        .await
410    }
411
412    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
413        self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
414            .await
415    }
416
417    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
418        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
419            .await
420    }
421
422    async fn status(&self) -> FederationResult<StatusResponse> {
423        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
424            .await
425    }
426
427    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
428        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
429            .await
430    }
431
432    async fn guardian_config_backup(
433        &self,
434        auth: ApiAuth,
435    ) -> FederationResult<GuardianConfigBackup> {
436        self.request_admin(
437            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
438            ApiRequestErased::default(),
439            auth,
440        )
441        .await
442    }
443
444    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
445        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
446            .await
447    }
448
449    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
450        self.request_admin(
451            RESTART_FEDERATION_SETUP_ENDPOINT,
452            ApiRequestErased::default(),
453            auth,
454        )
455        .await
456    }
457
458    async fn submit_api_announcement(
459        &self,
460        announcement_peer_id: PeerId,
461        announcement: SignedApiAnnouncement,
462    ) -> FederationResult<()> {
463        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
464            let announcement_inner = announcement.clone();
465            async move {
466                (
467                    peer_id,
468                    self.request_single_peer::<()>(
469                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
470                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
471                            signed_api_announcement: announcement_inner,
472                            peer_id: announcement_peer_id,
473                        }),
474                        peer_id,
475                    )
476                    .await,
477                )
478            }
479        }))
480        .await
481        .into_iter()
482        .filter_map(|(peer_id, result)| match result {
483            Ok(()) => None,
484            Err(e) => Some((peer_id, e)),
485        })
486        .collect::<BTreeMap<_, _>>();
487
488        if peer_errors.is_empty() {
489            Ok(())
490        } else {
491            Err(FederationError {
492                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
493                params: serde_json::to_value(announcement).expect("can be serialized"),
494                general: None,
495                peer_errors,
496            })
497        }
498    }
499
500    async fn api_announcements(
501        &self,
502        guardian: PeerId,
503    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
504        self.request_single_peer(
505            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
506            ApiRequestErased::default(),
507            guardian,
508        )
509        .await
510    }
511
512    async fn sign_api_announcement(
513        &self,
514        api_url: SafeUrl,
515        auth: ApiAuth,
516    ) -> FederationResult<SignedApiAnnouncement> {
517        self.request_admin(
518            SIGN_API_ANNOUNCEMENT_ENDPOINT,
519            ApiRequestErased::new(api_url),
520            auth,
521        )
522        .await
523    }
524
525    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
526        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
527            .await
528    }
529
530    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
531        self.request_admin(
532            BACKUP_STATISTICS_ENDPOINT,
533            ApiRequestErased::default(),
534            auth,
535        )
536        .await
537    }
538
539    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String> {
540        self.request_single_peer(
541            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
542            ApiRequestErased::default(),
543            peer_id,
544        )
545        .await
546    }
547
548    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode> {
549        self.request_single_peer(
550            INVITE_CODE_ENDPOINT.to_owned(),
551            ApiRequestErased::default(),
552            guardian,
553        )
554        .await
555    }
556
557    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
558        self.request_admin(
559            CHANGE_PASSWORD_ENDPOINT,
560            ApiRequestErased::new(new_password),
561            auth,
562        )
563        .await
564    }
565}