fedimint_client/
api_announcements.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::{SignedApiAnnouncement, override_api_urls};
12use fedimint_core::runtime::{self, sleep};
13use fedimint_core::secp256k1::SECP256K1;
14use fedimint_core::util::backoff_util::custom_backoff;
15use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
16use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
17use fedimint_logging::LOG_CLIENT;
18use futures::stream::{FuturesUnordered, StreamExt as _};
19use tracing::debug;
20
21use crate::Client;
22use crate::db::DbKeyPrefix;
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementKey(pub PeerId);
26
27#[derive(Clone, Debug, Encodable, Decodable)]
28pub struct ApiAnnouncementPrefix;
29
30impl_db_record!(
31    key = ApiAnnouncementKey,
32    value = SignedApiAnnouncement,
33    db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
34    notify_on_modify = false,
35);
36impl_db_lookup!(
37    key = ApiAnnouncementKey,
38    query_prefix = ApiAnnouncementPrefix
39);
40
41/// Fetches API URL announcements from guardians, validates them and updates the
42/// DB if any new more upt to date ones are found.
43pub(crate) async fn run_api_announcement_refresh_task(client_inner: Arc<Client>) {
44    // Wait for the guardian keys to be available
45    let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
46    loop {
47        if let Err(err) = {
48            let api: &DynGlobalApi = &client_inner.api;
49            let results = fetch_api_announcements_from_at_least_num_of_peers(
50                1,
51                api,
52                &guardian_pub_keys,
53                if is_running_in_test_env() {
54                    Duration::from_millis(1)
55                } else {
56                    Duration::from_secs(30)
57                },
58            )
59            .await;
60            store_api_announcements_updates_from_peers(client_inner.db(), &results).await
61        } {
62            debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
63        }
64
65        let duration = if is_running_in_test_env() {
66            Duration::from_secs(1)
67        } else {
68            // Check once an hour if there are new announcements
69            Duration::from_secs(3600)
70        };
71        sleep(duration).await;
72    }
73}
74
75pub(crate) async fn store_api_announcements_updates_from_peers(
76    db: &Database,
77    updates: &[BTreeMap<PeerId, SignedApiAnnouncement>],
78) -> Result<(), anyhow::Error> {
79    for announcements in updates {
80        store_api_announcement_updates(db, announcements).await;
81    }
82
83    Ok(())
84}
85
86pub(crate) type PeersSignedApiAnnouncements = BTreeMap<PeerId, SignedApiAnnouncement>;
87
88/// Fetch responses from at least `num_responses_required` of peers.
89///
90/// Will wait a little bit extra in hopes of collecting more than strictly
91/// needed responses.
92pub(crate) async fn fetch_api_announcements_from_at_least_num_of_peers(
93    num_responses_required: usize,
94    api: &DynGlobalApi,
95    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
96    extra_response_wait: Duration,
97) -> Vec<PeersSignedApiAnnouncements> {
98    let num_peers = guardian_pub_keys.to_num_peers();
99    // Keep trying, initially somewhat aggressively, but after a while retry very
100    // slowly, because chances for response are getting lower and lower.
101    let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
102
103    // Make a single request to a peer after a delay
104    async fn make_request(
105        delay: Duration,
106        peer_id: PeerId,
107        api: &DynGlobalApi,
108        guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
109    ) -> (PeerId, anyhow::Result<PeersSignedApiAnnouncements>) {
110        runtime::sleep(delay).await;
111
112        let result = async {
113            let announcements = api.api_announcements(peer_id).await.with_context(move || {
114                format!("Fetching API announcements from peer {peer_id} failed")
115            })?;
116
117            // If any of the announcements is invalid something is fishy with that
118            // guardian and we ignore all its responses
119            for (peer_id, announcement) in &announcements {
120                let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
121                    bail!("Guardian public key not found for peer {}", peer_id);
122                };
123
124                if !announcement.verify(SECP256K1, guardian_pub_key) {
125                    bail!("Failed to verify announcement for peer {}", peer_id);
126                }
127            }
128            Ok(announcements)
129        }
130        .await;
131
132        (peer_id, result)
133    }
134
135    let mut requests = FuturesUnordered::new();
136
137    for peer_id in num_peers.peer_ids() {
138        requests.push(make_request(
139            Duration::ZERO,
140            peer_id,
141            api,
142            guardian_pub_keys,
143        ));
144    }
145
146    let mut responses = Vec::new();
147
148    loop {
149        let next_response = if responses.len() < num_responses_required {
150            // If we don't have enough responses yet, we wait
151            requests.next().await
152        } else {
153            // if we do have responses we need, we wait opportunistically just for a small
154            // duration if any other responses are ready anyway, just to not
155            // throw them away
156            fedimint_core::runtime::timeout(extra_response_wait, requests.next())
157                .await
158                .ok()
159                .flatten()
160        };
161
162        let Some((peer_id, response)) = next_response else {
163            break;
164        };
165
166        match response {
167            Err(err) => {
168                debug!(
169                    target: LOG_CLIENT,
170                    %peer_id,
171                    err = %err.fmt_compact_anyhow(),
172                    "Failed to fetch API announcements from peer"
173                );
174                requests.push(make_request(
175                    backoff.next().expect("Keeps retrying"),
176                    peer_id,
177                    api,
178                    guardian_pub_keys,
179                ));
180            }
181            Ok(announcements) => {
182                responses.push(announcements);
183            }
184        }
185    }
186
187    responses
188}
189
190pub(crate) async fn store_api_announcement_updates(
191    db: &Database,
192    announcements: &BTreeMap<PeerId, SignedApiAnnouncement>,
193) {
194    db
195        .autocommit(
196            |dbtx, _|{
197                let announcements_inner = announcements.clone();
198            Box::pin(async move {
199                for (peer, new_announcement) in announcements_inner {
200                    let replace_current_announcement = dbtx
201                        .get_value(&ApiAnnouncementKey(peer))
202                        .await.is_none_or(|current_announcement| {
203                            current_announcement.api_announcement.nonce
204                                < new_announcement.api_announcement.nonce
205                        });
206                    if replace_current_announcement {
207                        debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
208                        dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
209                            .await;
210                    }
211                }
212
213                Result::<(), ()>::Ok(())
214            })},
215            None,
216        )
217        .await
218        .expect("Will never return an error");
219}
220
221/// Returns a list of all peers and their respective API URLs taking into
222/// account announcements overwriting the URLs contained in the original
223/// configuration.
224pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
225    override_api_urls(
226        db,
227        cfg.global
228            .api_endpoints
229            .iter()
230            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
231        &ApiAnnouncementPrefix,
232        |key| key.0,
233    )
234    .await
235}