fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
15    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
16    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
17    FEDIMINTD_VERSION_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT,
18    RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT,
19    SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT,
20    SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
21    START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
23    VERIFY_CONFIG_HASH_ENDPOINT,
24};
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use itertools::Itertools;
43use rand::seq::SliceRandom;
44use serde_json::Value;
45use tokio::sync::OnceCell;
46use tracing::debug;
47
48use super::super::{
49    DynModuleApi, GuardianConfigBackup, IGlobalFederationApi, IRawFederationApi, StatusResponse,
50};
51use crate::api::{
52    FederationApiExt, FederationError, FederationResult, PeerResult,
53    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
54};
55use crate::query::FilterMapThreshold;
56
57/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
58/// a [`GlobalFederationApiWithCache`]
59pub trait GlobalFederationApiWithCacheExt
60where
61    Self: Sized,
62{
63    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
64}
65
66impl<T> GlobalFederationApiWithCacheExt for T
67where
68    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
69{
70    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
71        GlobalFederationApiWithCache::new(self)
72    }
73}
74
75/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
76/// a tiny bit of caching.
77///
78/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
79#[derive(Debug)]
80pub struct GlobalFederationApiWithCache<T> {
81    pub(crate) inner: T,
82    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
83    ///
84    /// This is mostly to avoid multiple client module recovery processes
85    /// re-requesting same blocks and putting burden on the federation.
86    ///
87    /// The LRU can be be fairly small, as if the modules are
88    /// (near-)bottlenecked on fetching blocks they will naturally
89    /// synchronize, or split into a handful of groups. And if they are not,
90    /// no LRU here is going to help them.
91    pub(crate) await_session_lru:
92        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
93
94    /// Like [`Self::await_session_lru`], but for
95    /// [`IGlobalFederationApi::get_session_status`].
96    ///
97    /// In theory these two LRUs have the same content, but one is locked by
98    /// potentially long-blocking operation, while the other non-blocking one.
99    /// Given how tiny they are, it's not worth complicating things to unify
100    /// them.
101    pub(crate) get_session_status_lru:
102        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
103}
104
105impl<T> GlobalFederationApiWithCache<T> {
106    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
107        Self {
108            inner,
109            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
110                NonZeroUsize::new(512).expect("is non-zero"),
111            ))),
112            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113                NonZeroUsize::new(512).expect("is non-zero"),
114            ))),
115        }
116    }
117}
118
119impl<T> GlobalFederationApiWithCache<T>
120where
121    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
122{
123    pub(crate) async fn await_block_raw(
124        &self,
125        block_index: u64,
126        decoders: &ModuleDecoderRegistry,
127    ) -> anyhow::Result<SessionOutcome> {
128        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
130            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
131            ApiRequestErased::new(block_index),
132        )
133        .await?
134        .try_into_inner(decoders)
135        .map_err(|e| anyhow!(e.to_string()))
136    }
137
138    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
139        let mut peers = self.all_peers().iter().copied().collect_vec();
140        peers.shuffle(&mut rand::thread_rng());
141        peers.into_iter()
142    }
143
144    pub(crate) async fn get_session_status_raw_v2(
145        &self,
146        block_index: u64,
147        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
148        decoders: &ModuleDecoderRegistry,
149    ) -> anyhow::Result<SessionStatus> {
150        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
151        let params = ApiRequestErased::new(block_index);
152        let mut last_error = None;
153        // fetch serially
154        for peer_id in self.select_peers_for_status() {
155            match self
156                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
157                    SESSION_STATUS_V2_ENDPOINT.to_string(),
158                    params.clone(),
159                    peer_id,
160                )
161                .await
162                .map_err(anyhow::Error::from)
163                .and_then(|s| Ok(s.try_into_inner(decoders)?))
164            {
165                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
166                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
167                        // early return
168                        return Ok(SessionStatus::Complete(
169                            signed_session_outcome.session_outcome,
170                        ));
171                    }
172                    last_error = Some(format_err!("Invalid signature"));
173                }
174                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
175                    // no signature: use fallback method
176                    return self.get_session_status_raw(block_index, decoders).await;
177                }
178                Err(err) => {
179                    last_error = Some(err);
180                }
181            }
182            // if we loop then we must have last_error
183            assert!(last_error.is_some());
184        }
185        Err(last_error.expect("must have at least one peer"))
186    }
187
188    pub(crate) async fn get_session_status_raw(
189        &self,
190        block_index: u64,
191        decoders: &ModuleDecoderRegistry,
192    ) -> anyhow::Result<SessionStatus> {
193        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
194        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
195            SESSION_STATUS_ENDPOINT.to_string(),
196            ApiRequestErased::new(block_index),
197        )
198        .await?
199        .try_into_inner(&decoders.clone().with_fallback())
200        .map_err(|e| anyhow!(e))
201    }
202}
203
204#[apply(async_trait_maybe_send!)]
205impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
206where
207    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
208{
209    fn all_peers(&self) -> &BTreeSet<PeerId> {
210        self.inner.all_peers()
211    }
212
213    fn self_peer(&self) -> Option<PeerId> {
214        self.inner.self_peer()
215    }
216
217    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
218        self.inner.with_module(id)
219    }
220
221    /// Make request to a specific federation peer by `peer_id`
222    async fn request_raw(
223        &self,
224        peer_id: PeerId,
225        method: &str,
226        params: &ApiRequestErased,
227    ) -> PeerResult<Value> {
228        self.inner.request_raw(peer_id, method, params).await
229    }
230}
231
232#[apply(async_trait_maybe_send!)]
233impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
234where
235    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
236{
237    async fn await_block(
238        &self,
239        session_idx: u64,
240        decoders: &ModuleDecoderRegistry,
241    ) -> anyhow::Result<SessionOutcome> {
242        let mut lru_lock = self.await_session_lru.lock().await;
243
244        let entry_arc = lru_lock
245            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
246            .clone();
247
248        // we drop the lru lock so requests for other `session_idx` can work in parallel
249        drop(lru_lock);
250
251        entry_arc
252            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
253            .await
254            .cloned()
255    }
256
257    async fn get_session_status(
258        &self,
259        session_idx: u64,
260        decoders: &ModuleDecoderRegistry,
261        core_api_version: ApiVersion,
262        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
263    ) -> anyhow::Result<SessionStatus> {
264        let mut lru_lock = self.get_session_status_lru.lock().await;
265
266        let entry_arc = lru_lock
267            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
268            .clone();
269
270        // we drop the lru lock so requests for other `session_idx` can work in parallel
271        drop(lru_lock);
272
273        enum NoCacheErr {
274            Initial,
275            Pending(Vec<AcceptedItem>),
276            Err(anyhow::Error),
277        }
278        match entry_arc
279            .get_or_try_init(|| async {
280                let session_status =
281                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
282                        self.get_session_status_raw(session_idx, decoders).await
283                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
284                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
285                            .await
286                    } else {
287                        self.get_session_status_raw(session_idx, decoders).await
288                    };
289                match session_status {
290                    Err(e) => Err(NoCacheErr::Err(e)),
291                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
292                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
293                    // only status we can cache (hance outer Ok)
294                    Ok(SessionStatus::Complete(s)) => Ok(s),
295                }
296            })
297            .await
298            .cloned()
299        {
300            Ok(s) => Ok(SessionStatus::Complete(s)),
301            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
302            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
303            Err(NoCacheErr::Err(e)) => Err(e),
304        }
305    }
306
307    async fn submit_transaction(
308        &self,
309        tx: Transaction,
310    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
311        self.request_current_consensus_retry(
312            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
313            ApiRequestErased::new(SerdeTransaction::from(&tx)),
314        )
315        .await
316    }
317
318    async fn session_count(&self) -> FederationResult<u64> {
319        self.request_current_consensus(
320            SESSION_COUNT_ENDPOINT.to_owned(),
321            ApiRequestErased::default(),
322        )
323        .await
324    }
325
326    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
327        self.request_current_consensus_retry(
328            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
329            ApiRequestErased::new(txid),
330        )
331        .await
332    }
333
334    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
335        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
336            .await
337    }
338
339    async fn download_backup(
340        &self,
341        id: &secp256k1::PublicKey,
342    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
343        self.request_with_strategy(
344            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
345            RECOVER_ENDPOINT.to_owned(),
346            ApiRequestErased::new(id),
347        )
348        .await
349    }
350
351    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
352        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
353            .await
354    }
355
356    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
357        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
358            .await
359    }
360
361    async fn set_local_params(
362        &self,
363        name: String,
364        federation_name: Option<String>,
365        auth: ApiAuth,
366    ) -> FederationResult<String> {
367        self.request_admin(
368            SET_LOCAL_PARAMS_ENDPOINT,
369            ApiRequestErased::new(SetLocalParamsRequest {
370                name,
371                federation_name,
372            }),
373            auth,
374        )
375        .await
376    }
377
378    async fn add_peer_connection_info(
379        &self,
380        info: String,
381        auth: ApiAuth,
382    ) -> FederationResult<String> {
383        self.request_admin(
384            ADD_PEER_SETUP_CODE_ENDPOINT,
385            ApiRequestErased::new(info),
386            auth,
387        )
388        .await
389    }
390
391    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
392        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
393            .await
394    }
395
396    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
397        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
398            .await
399    }
400
401    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
402        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
403            .await
404    }
405
406    async fn get_verify_config_hash(
407        &self,
408        auth: ApiAuth,
409    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
410        self.request_admin(
411            VERIFY_CONFIG_HASH_ENDPOINT,
412            ApiRequestErased::default(),
413            auth,
414        )
415        .await
416    }
417
418    async fn verified_configs(
419        &self,
420        auth: ApiAuth,
421    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
422        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
423            .await
424    }
425
426    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
427        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
428            .await
429    }
430
431    async fn status(&self) -> FederationResult<StatusResponse> {
432        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
433            .await
434    }
435
436    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
437        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
438            .await
439    }
440
441    async fn guardian_config_backup(
442        &self,
443        auth: ApiAuth,
444    ) -> FederationResult<GuardianConfigBackup> {
445        self.request_admin(
446            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
447            ApiRequestErased::default(),
448            auth,
449        )
450        .await
451    }
452
453    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
454        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
455            .await
456    }
457
458    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
459        self.request_admin(
460            RESTART_FEDERATION_SETUP_ENDPOINT,
461            ApiRequestErased::default(),
462            auth,
463        )
464        .await
465    }
466
467    async fn submit_api_announcement(
468        &self,
469        announcement_peer_id: PeerId,
470        announcement: SignedApiAnnouncement,
471    ) -> FederationResult<()> {
472        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
473            let announcement_inner = announcement.clone();
474            async move {
475                (
476                    peer_id,
477                    self.request_single_peer::<()>(
478                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
479                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
480                            signed_api_announcement: announcement_inner,
481                            peer_id: announcement_peer_id,
482                        }),
483                        peer_id,
484                    )
485                    .await,
486                )
487            }
488        }))
489        .await
490        .into_iter()
491        .filter_map(|(peer_id, result)| match result {
492            Ok(()) => None,
493            Err(e) => Some((peer_id, e)),
494        })
495        .collect::<BTreeMap<_, _>>();
496
497        if peer_errors.is_empty() {
498            Ok(())
499        } else {
500            Err(FederationError {
501                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
502                params: serde_json::to_value(announcement).expect("can be serialized"),
503                general: None,
504                peer_errors,
505            })
506        }
507    }
508
509    async fn api_announcements(
510        &self,
511        guardian: PeerId,
512    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
513        self.request_single_peer(
514            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
515            ApiRequestErased::default(),
516            guardian,
517        )
518        .await
519    }
520
521    async fn sign_api_announcement(
522        &self,
523        api_url: SafeUrl,
524        auth: ApiAuth,
525    ) -> FederationResult<SignedApiAnnouncement> {
526        self.request_admin(
527            SIGN_API_ANNOUNCEMENT_ENDPOINT,
528            ApiRequestErased::new(api_url),
529            auth,
530        )
531        .await
532    }
533
534    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
535        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
536            .await
537    }
538
539    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
540        self.request_admin(
541            BACKUP_STATISTICS_ENDPOINT,
542            ApiRequestErased::default(),
543            auth,
544        )
545        .await
546    }
547
548    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
549        self.request_single_peer(
550            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
551            ApiRequestErased::default(),
552            peer_id,
553        )
554        .await
555    }
556}