fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
15    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
16    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
17    FEDIMINTD_VERSION_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, RECOVER_ENDPOINT,
18    RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
19    SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
20    SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
23    VERIFY_CONFIG_HASH_ENDPOINT,
24};
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
29};
30use fedimint_core::net::api_announcement::{
31    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
32};
33use fedimint_core::session_outcome::{
34    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
35};
36use fedimint_core::task::{MaybeSend, MaybeSync};
37use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
38use fedimint_core::util::SafeUrl;
39use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::future::join_all;
42use itertools::Itertools;
43use rand::seq::SliceRandom;
44use serde_json::Value;
45use tokio::sync::OnceCell;
46use tracing::debug;
47
48use super::super::{
49    DynModuleApi, GuardianConfigBackup, IGlobalFederationApi, IRawFederationApi, StatusResponse,
50};
51use crate::api::{
52    FederationApiExt, FederationError, FederationResult, PeerResult,
53    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
54};
55use crate::query::FilterMapThreshold;
56
57/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
58/// a [`GlobalFederationApiWithCache`]
59pub trait GlobalFederationApiWithCacheExt
60where
61    Self: Sized,
62{
63    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
64}
65
66impl<T> GlobalFederationApiWithCacheExt for T
67where
68    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
69{
70    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
71        GlobalFederationApiWithCache::new(self)
72    }
73}
74
75/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
76/// a tiny bit of caching.
77///
78/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
79#[derive(Debug)]
80pub struct GlobalFederationApiWithCache<T> {
81    pub(crate) inner: T,
82    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
83    ///
84    /// This is mostly to avoid multiple client module recovery processes
85    /// re-requesting same blocks and putting burden on the federation.
86    ///
87    /// The LRU can be be fairly small, as if the modules are
88    /// (near-)bottlenecked on fetching blocks they will naturally
89    /// synchronize, or split into a handful of groups. And if they are not,
90    /// no LRU here is going to help them.
91    pub(crate) await_session_lru:
92        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
93
94    /// Like [`Self::await_session_lru`], but for
95    /// [`IGlobalFederationApi::get_session_status`].
96    ///
97    /// In theory these two LRUs have the same content, but one is locked by
98    /// potentially long-blocking operation, while the other non-blocking one.
99    /// Given how tiny they are, it's not worth complicating things to unify
100    /// them.
101    pub(crate) get_session_status_lru:
102        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
103}
104
105impl<T> GlobalFederationApiWithCache<T> {
106    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
107        Self {
108            inner,
109            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
110                NonZeroUsize::new(512).expect("is non-zero"),
111            ))),
112            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
113                NonZeroUsize::new(512).expect("is non-zero"),
114            ))),
115        }
116    }
117}
118
119impl<T> GlobalFederationApiWithCache<T>
120where
121    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
122{
123    pub(crate) async fn await_block_raw(
124        &self,
125        block_index: u64,
126        decoders: &ModuleDecoderRegistry,
127    ) -> anyhow::Result<SessionOutcome> {
128        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
129        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
130            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
131            ApiRequestErased::new(block_index),
132        )
133        .await?
134        .try_into_inner(decoders)
135        .map_err(|e| anyhow!(e.to_string()))
136    }
137
138    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
139        let mut peers = self.all_peers().iter().copied().collect_vec();
140        peers.shuffle(&mut rand::thread_rng());
141        peers.into_iter()
142    }
143
144    pub(crate) async fn get_session_status_raw_v2(
145        &self,
146        block_index: u64,
147        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
148        decoders: &ModuleDecoderRegistry,
149    ) -> anyhow::Result<SessionStatus> {
150        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
151        let params = ApiRequestErased::new(block_index);
152        let mut last_error = None;
153        // fetch serially
154        for peer_id in self.select_peers_for_status() {
155            match self
156                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
157                    SESSION_STATUS_V2_ENDPOINT.to_string(),
158                    params.clone(),
159                    peer_id,
160                )
161                .await
162                .map_err(anyhow::Error::from)
163                .and_then(|s| Ok(s.try_into_inner(decoders)?))
164            {
165                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
166                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
167                        // early return
168                        return Ok(SessionStatus::Complete(
169                            signed_session_outcome.session_outcome,
170                        ));
171                    }
172                    last_error = Some(format_err!("Invalid signature"));
173                }
174                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
175                    // no signature: use fallback method
176                    return self.get_session_status_raw(block_index, decoders).await;
177                }
178                Err(err) => {
179                    last_error = Some(err);
180                }
181            }
182            // if we loop then we must have last_error
183            assert!(last_error.is_some());
184        }
185        Err(last_error.expect("must have at least one peer"))
186    }
187
188    pub(crate) async fn get_session_status_raw(
189        &self,
190        block_index: u64,
191        decoders: &ModuleDecoderRegistry,
192    ) -> anyhow::Result<SessionStatus> {
193        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
194        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
195            SESSION_STATUS_ENDPOINT.to_string(),
196            ApiRequestErased::new(block_index),
197        )
198        .await?
199        .try_into_inner(&decoders.clone().with_fallback())
200        .map_err(|e| anyhow!(e))
201    }
202}
203
204#[apply(async_trait_maybe_send!)]
205impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
206where
207    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
208{
209    fn all_peers(&self) -> &BTreeSet<PeerId> {
210        self.inner.all_peers()
211    }
212
213    fn self_peer(&self) -> Option<PeerId> {
214        self.inner.self_peer()
215    }
216
217    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
218        self.inner.with_module(id)
219    }
220
221    /// Make request to a specific federation peer by `peer_id`
222    async fn request_raw(
223        &self,
224        peer_id: PeerId,
225        method: &str,
226        params: &ApiRequestErased,
227    ) -> PeerResult<Value> {
228        self.inner.request_raw(peer_id, method, params).await
229    }
230}
231
232#[apply(async_trait_maybe_send!)]
233impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
234where
235    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
236{
237    async fn await_block(
238        &self,
239        session_idx: u64,
240        decoders: &ModuleDecoderRegistry,
241    ) -> anyhow::Result<SessionOutcome> {
242        let mut lru_lock = self.await_session_lru.lock().await;
243
244        let entry_arc = lru_lock
245            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
246            .clone();
247
248        // we drop the lru lock so requests for other `session_idx` can work in parallel
249        drop(lru_lock);
250
251        entry_arc
252            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
253            .await
254            .cloned()
255    }
256
257    async fn get_session_status(
258        &self,
259        session_idx: u64,
260        decoders: &ModuleDecoderRegistry,
261        core_api_version: ApiVersion,
262        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
263    ) -> anyhow::Result<SessionStatus> {
264        let mut lru_lock = self.get_session_status_lru.lock().await;
265
266        let entry_arc = lru_lock
267            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
268            .clone();
269
270        // we drop the lru lock so requests for other `session_idx` can work in parallel
271        drop(lru_lock);
272
273        enum NoCacheErr {
274            Initial,
275            Pending(Vec<AcceptedItem>),
276            Err(anyhow::Error),
277        }
278        match entry_arc
279            .get_or_try_init(|| async {
280                let session_status =
281                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
282                        self.get_session_status_raw(session_idx, decoders).await
283                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
284                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
285                            .await
286                    } else {
287                        self.get_session_status_raw(session_idx, decoders).await
288                    };
289                match session_status {
290                    Err(e) => Err(NoCacheErr::Err(e)),
291                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
292                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
293                    // only status we can cache (hance outer Ok)
294                    Ok(SessionStatus::Complete(s)) => Ok(s),
295                }
296            })
297            .await
298            .cloned()
299        {
300            Ok(s) => Ok(SessionStatus::Complete(s)),
301            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
302            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
303            Err(NoCacheErr::Err(e)) => Err(e),
304        }
305    }
306
307    async fn submit_transaction(
308        &self,
309        tx: Transaction,
310    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
311        self.request_current_consensus_retry(
312            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
313            ApiRequestErased::new(SerdeTransaction::from(&tx)),
314        )
315        .await
316    }
317
318    async fn session_count(&self) -> FederationResult<u64> {
319        self.request_current_consensus(
320            SESSION_COUNT_ENDPOINT.to_owned(),
321            ApiRequestErased::default(),
322        )
323        .await
324    }
325
326    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
327        self.request_current_consensus_retry(
328            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
329            ApiRequestErased::new(txid),
330        )
331        .await
332    }
333
334    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
335        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
336            .await
337    }
338
339    async fn download_backup(
340        &self,
341        id: &secp256k1::PublicKey,
342    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
343        self.request_with_strategy(
344            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
345            RECOVER_ENDPOINT.to_owned(),
346            ApiRequestErased::new(id),
347        )
348        .await
349    }
350
351    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
352        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
353            .await
354    }
355
356    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
357        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
358            .await
359    }
360
361    async fn set_local_params(
362        &self,
363        name: String,
364        federation_name: Option<String>,
365        auth: ApiAuth,
366    ) -> FederationResult<String> {
367        self.request_admin(
368            SET_LOCAL_PARAMS_ENDPOINT,
369            ApiRequestErased::new(SetLocalParamsRequest {
370                name,
371                federation_name,
372            }),
373            auth,
374        )
375        .await
376    }
377
378    async fn add_peer_connection_info(
379        &self,
380        info: String,
381        auth: ApiAuth,
382    ) -> FederationResult<String> {
383        self.request_admin(
384            ADD_PEER_SETUP_CODE_ENDPOINT,
385            ApiRequestErased::new(info),
386            auth,
387        )
388        .await
389    }
390
391    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
392        self.request_admin(
393            RESET_PEER_SETUP_CODES_ENDPOINT,
394            ApiRequestErased::default(),
395            auth,
396        )
397        .await
398    }
399
400    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
401        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
402            .await
403    }
404
405    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
406        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
407            .await
408    }
409
410    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
411        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
412            .await
413    }
414
415    async fn get_verify_config_hash(
416        &self,
417        auth: ApiAuth,
418    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
419        self.request_admin(
420            VERIFY_CONFIG_HASH_ENDPOINT,
421            ApiRequestErased::default(),
422            auth,
423        )
424        .await
425    }
426
427    async fn verified_configs(
428        &self,
429        auth: ApiAuth,
430    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
431        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
432            .await
433    }
434
435    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
436        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
437            .await
438    }
439
440    async fn status(&self) -> FederationResult<StatusResponse> {
441        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
442            .await
443    }
444
445    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
446        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
447            .await
448    }
449
450    async fn guardian_config_backup(
451        &self,
452        auth: ApiAuth,
453    ) -> FederationResult<GuardianConfigBackup> {
454        self.request_admin(
455            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
456            ApiRequestErased::default(),
457            auth,
458        )
459        .await
460    }
461
462    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
463        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
464            .await
465    }
466
467    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
468        self.request_admin(
469            RESTART_FEDERATION_SETUP_ENDPOINT,
470            ApiRequestErased::default(),
471            auth,
472        )
473        .await
474    }
475
476    async fn submit_api_announcement(
477        &self,
478        announcement_peer_id: PeerId,
479        announcement: SignedApiAnnouncement,
480    ) -> FederationResult<()> {
481        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
482            let announcement_inner = announcement.clone();
483            async move {
484                (
485                    peer_id,
486                    self.request_single_peer::<()>(
487                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
488                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
489                            signed_api_announcement: announcement_inner,
490                            peer_id: announcement_peer_id,
491                        }),
492                        peer_id,
493                    )
494                    .await,
495                )
496            }
497        }))
498        .await
499        .into_iter()
500        .filter_map(|(peer_id, result)| match result {
501            Ok(()) => None,
502            Err(e) => Some((peer_id, e)),
503        })
504        .collect::<BTreeMap<_, _>>();
505
506        if peer_errors.is_empty() {
507            Ok(())
508        } else {
509            Err(FederationError {
510                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
511                params: serde_json::to_value(announcement).expect("can be serialized"),
512                general: None,
513                peer_errors,
514            })
515        }
516    }
517
518    async fn api_announcements(
519        &self,
520        guardian: PeerId,
521    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
522        self.request_single_peer(
523            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
524            ApiRequestErased::default(),
525            guardian,
526        )
527        .await
528    }
529
530    async fn sign_api_announcement(
531        &self,
532        api_url: SafeUrl,
533        auth: ApiAuth,
534    ) -> FederationResult<SignedApiAnnouncement> {
535        self.request_admin(
536            SIGN_API_ANNOUNCEMENT_ENDPOINT,
537            ApiRequestErased::new(api_url),
538            auth,
539        )
540        .await
541    }
542
543    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
544        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
545            .await
546    }
547
548    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
549        self.request_admin(
550            BACKUP_STATISTICS_ENDPOINT,
551            ApiRequestErased::default(),
552            auth,
553        )
554        .await
555    }
556
557    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
558        self.request_single_peer(
559            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
560            ApiRequestErased::default(),
561            peer_id,
562        )
563        .await
564    }
565}