fedimint_client/
api_announcements.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::SignedApiAnnouncement;
12use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
13use fedimint_core::runtime::{self, sleep};
14use fedimint_core::secp256k1::SECP256K1;
15use fedimint_core::util::backoff_util::custom_backoff;
16use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
17use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
18use fedimint_logging::LOG_CLIENT;
19use futures::stream::{FuturesUnordered, StreamExt as _};
20use tracing::debug;
21
22use crate::Client;
23use crate::db::DbKeyPrefix;
24use crate::guardian_metadata::GuardianMetadataPrefix;
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct ApiAnnouncementKey(pub PeerId);
28
29#[derive(Clone, Debug, Encodable, Decodable)]
30pub struct ApiAnnouncementPrefix;
31
32impl_db_record!(
33    key = ApiAnnouncementKey,
34    value = SignedApiAnnouncement,
35    db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
36    notify_on_modify = false,
37);
38impl_db_lookup!(
39    key = ApiAnnouncementKey,
40    query_prefix = ApiAnnouncementPrefix
41);
42
43/// Fetches API URL announcements from guardians, validates them and updates the
44/// DB if any new more upt to date ones are found.
45pub(crate) async fn run_api_announcement_refresh_task(client_inner: Arc<Client>) {
46    // Wait for the guardian keys to be available
47    let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
48    loop {
49        if let Err(err) = {
50            let api: &DynGlobalApi = &client_inner.api;
51            let results = fetch_api_announcements_from_at_least_num_of_peers(
52                1,
53                api,
54                &guardian_pub_keys,
55                if is_running_in_test_env() {
56                    Duration::from_millis(1)
57                } else {
58                    Duration::from_secs(30)
59                },
60            )
61            .await;
62            store_api_announcements_updates_from_peers(client_inner.db(), &results).await
63        } {
64            debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
65        }
66
67        let duration = if is_running_in_test_env() {
68            Duration::from_secs(1)
69        } else {
70            // Check once an hour if there are new announcements
71            Duration::from_secs(3600)
72        };
73        sleep(duration).await;
74    }
75}
76
77pub(crate) async fn store_api_announcements_updates_from_peers(
78    db: &Database,
79    updates: &[BTreeMap<PeerId, SignedApiAnnouncement>],
80) -> Result<(), anyhow::Error> {
81    for announcements in updates {
82        store_api_announcement_updates(db, announcements).await;
83    }
84
85    Ok(())
86}
87
88pub(crate) type PeersSignedApiAnnouncements = BTreeMap<PeerId, SignedApiAnnouncement>;
89
90/// Fetch responses from at least `num_responses_required` of peers.
91///
92/// Will wait a little bit extra in hopes of collecting more than strictly
93/// needed responses.
94pub(crate) async fn fetch_api_announcements_from_at_least_num_of_peers(
95    num_responses_required: usize,
96    api: &DynGlobalApi,
97    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
98    extra_response_wait: Duration,
99) -> Vec<PeersSignedApiAnnouncements> {
100    let num_peers = guardian_pub_keys.to_num_peers();
101    // Keep trying, initially somewhat aggressively, but after a while retry very
102    // slowly, because chances for response are getting lower and lower.
103    let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
104
105    // Make a single request to a peer after a delay
106    async fn make_request(
107        delay: Duration,
108        peer_id: PeerId,
109        api: &DynGlobalApi,
110        guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
111    ) -> (PeerId, anyhow::Result<PeersSignedApiAnnouncements>) {
112        runtime::sleep(delay).await;
113
114        let result = async {
115            let announcements = api.api_announcements(peer_id).await.with_context(move || {
116                format!("Fetching API announcements from peer {peer_id} failed")
117            })?;
118
119            // If any of the announcements is invalid something is fishy with that
120            // guardian and we ignore all its responses
121            for (peer_id, announcement) in &announcements {
122                let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
123                    bail!("Guardian public key not found for peer {}", peer_id);
124                };
125
126                if !announcement.verify(SECP256K1, guardian_pub_key) {
127                    bail!("Failed to verify announcement for peer {}", peer_id);
128                }
129            }
130            Ok(announcements)
131        }
132        .await;
133
134        (peer_id, result)
135    }
136
137    let mut requests = FuturesUnordered::new();
138
139    for peer_id in num_peers.peer_ids() {
140        requests.push(make_request(
141            Duration::ZERO,
142            peer_id,
143            api,
144            guardian_pub_keys,
145        ));
146    }
147
148    let mut responses = Vec::new();
149
150    loop {
151        let next_response = if responses.len() < num_responses_required {
152            // If we don't have enough responses yet, we wait
153            requests.next().await
154        } else {
155            // if we do have responses we need, we wait opportunistically just for a small
156            // duration if any other responses are ready anyway, just to not
157            // throw them away
158            fedimint_core::runtime::timeout(extra_response_wait, requests.next())
159                .await
160                .ok()
161                .flatten()
162        };
163
164        let Some((peer_id, response)) = next_response else {
165            break;
166        };
167
168        match response {
169            Err(err) => {
170                debug!(
171                    target: LOG_CLIENT,
172                    %peer_id,
173                    err = %err.fmt_compact_anyhow(),
174                    "Failed to fetch API announcements from peer"
175                );
176                requests.push(make_request(
177                    backoff.next().expect("Keeps retrying"),
178                    peer_id,
179                    api,
180                    guardian_pub_keys,
181                ));
182            }
183            Ok(announcements) => {
184                responses.push(announcements);
185            }
186        }
187    }
188
189    responses
190}
191
192pub(crate) async fn store_api_announcement_updates(
193    db: &Database,
194    announcements: &BTreeMap<PeerId, SignedApiAnnouncement>,
195) {
196    db
197        .autocommit(
198            |dbtx, _|{
199                let announcements_inner = announcements.clone();
200            Box::pin(async move {
201                for (peer, new_announcement) in announcements_inner {
202                    let replace_current_announcement = dbtx
203                        .get_value(&ApiAnnouncementKey(peer))
204                        .await.is_none_or(|current_announcement| {
205                            current_announcement.api_announcement.nonce
206                                < new_announcement.api_announcement.nonce
207                        });
208                    if replace_current_announcement {
209                        debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
210                        dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
211                            .await;
212                    }
213                }
214
215                Result::<(), ()>::Ok(())
216            })},
217            None,
218        )
219        .await
220        .expect("Will never return an error");
221}
222
223/// Returns a list of all peers and their respective API URLs taking into
224/// account guardian metadata and API announcements overwriting the URLs
225/// contained in the original configuration.
226///
227/// Priority order:
228/// 1. Guardian metadata (if available) - uses first URL from api_urls
229/// 2. API announcement (if available)
230/// 3. Configured URL (fallback)
231pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
232    let mut dbtx = db.begin_transaction_nc().await;
233
234    // Load guardian metadata for all peers
235    let guardian_metadata: BTreeMap<PeerId, SignedGuardianMetadata> = dbtx
236        .find_by_prefix(&GuardianMetadataPrefix)
237        .await
238        .map(|(key, metadata)| (key.0, metadata))
239        .collect()
240        .await;
241
242    // Load API announcements for all peers
243    let api_announcements: BTreeMap<PeerId, SignedApiAnnouncement> = dbtx
244        .find_by_prefix(&ApiAnnouncementPrefix)
245        .await
246        .map(|(key, announcement)| (key.0, announcement))
247        .collect()
248        .await;
249
250    // For each peer: prefer guardian metadata, then API announcement, then config
251    cfg.global
252        .api_endpoints
253        .iter()
254        .map(|(peer_id, peer_url)| {
255            let url = guardian_metadata
256                .get(peer_id)
257                .and_then(|m| m.guardian_metadata().api_urls.first().cloned())
258                .or_else(|| {
259                    api_announcements
260                        .get(peer_id)
261                        .map(|a| a.api_announcement.api_url.clone())
262                })
263                .unwrap_or_else(|| peer_url.url.clone());
264            (*peer_id, url)
265        })
266        .collect()
267}