fedimint_client/
api_announcements.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::{SignedApiAnnouncement, override_api_urls};
12use fedimint_core::runtime::sleep;
13use fedimint_core::secp256k1::SECP256K1;
14use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
15use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
16use fedimint_logging::LOG_CLIENT;
17use futures::future::join_all;
18use tracing::{debug, warn};
19
20use crate::Client;
21use crate::db::DbKeyPrefix;
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementKey(pub PeerId);
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct ApiAnnouncementPrefix;
28
29impl_db_record!(
30    key = ApiAnnouncementKey,
31    value = SignedApiAnnouncement,
32    db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
33    notify_on_modify = false,
34);
35impl_db_lookup!(
36    key = ApiAnnouncementKey,
37    query_prefix = ApiAnnouncementPrefix
38);
39
40/// Fetches API URL announcements from guardians, validates them and updates the
41/// DB if any new more upt to date ones are found.
42pub(crate) async fn run_api_announcement_sync(client_inner: Arc<Client>) {
43    // Wait for the guardian keys to be available
44    let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
45    loop {
46        if let Err(err) =
47            refresh_api_announcement_sync(&client_inner.api, client_inner.db(), &guardian_pub_keys)
48                .await
49        {
50            debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
51        }
52
53        let duration = if is_running_in_test_env() {
54            Duration::from_secs(1)
55        } else {
56            // Check once an hour if there are new announcements
57            Duration::from_secs(3600)
58        };
59        sleep(duration).await;
60    }
61}
62
63pub(crate) async fn refresh_api_announcement_sync(
64    api: &DynGlobalApi,
65    db: &Database,
66    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
67) -> anyhow::Result<()> {
68    let results = fetch_api_announcements_from_all_peers(api, guardian_pub_keys).await;
69
70    let mut some_success = false;
71
72    for (peer_id, result) in guardian_pub_keys.keys().zip(results) {
73        match result {
74            Ok(announcements) => {
75                store_api_announcements(db, announcements).await;
76                some_success |= true
77            }
78            Err(e) => {
79                warn!(target: LOG_CLIENT, %peer_id, ?e, "Failed to process API announcements");
80            }
81        }
82    }
83
84    if some_success {
85        Ok(())
86    } else {
87        bail!("Unable to get any api announcements");
88    }
89}
90
91async fn fetch_api_announcements_from_all_peers(
92    api: &DynGlobalApi,
93    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
94) -> Vec<Result<BTreeMap<PeerId, SignedApiAnnouncement>, anyhow::Error>> {
95    join_all(api.all_peers().iter().map(|peer_id| async {
96        let peer_id = *peer_id;
97        let announcements = api.api_announcements(peer_id).await.with_context(move || {
98            format!("Fetching API announcements from peer {peer_id} failed")
99        })?;
100
101        // If any of the announcements is invalid something is fishy with that
102        // guardian and we ignore all its responses
103        for (peer_id, announcement) in &announcements {
104            let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
105                bail!("Guardian public key not found for peer {}", peer_id);
106            };
107
108            if !announcement.verify(SECP256K1, guardian_pub_key) {
109                bail!("Failed to verify announcement for peer {}", peer_id);
110            }
111        }
112        Ok(announcements)
113    }))
114    .await
115}
116
117pub(crate) async fn store_api_announcements(
118    db: &Database,
119    announcements: BTreeMap<PeerId, SignedApiAnnouncement>,
120) {
121    db
122        .autocommit(
123            |dbtx, _|{
124                let announcements_inner = announcements.clone();
125            Box::pin(async move {
126                for (peer, new_announcement) in announcements_inner {
127                    let replace_current_announcement = dbtx
128                        .get_value(&ApiAnnouncementKey(peer))
129                        .await.is_none_or(|current_announcement| {
130                            current_announcement.api_announcement.nonce
131                                < new_announcement.api_announcement.nonce
132                        });
133                    if replace_current_announcement {
134                        debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
135                        dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
136                            .await;
137                    }
138                }
139
140                Result::<(), ()>::Ok(())
141            })},
142            None,
143        )
144        .await
145        .expect("Will never return an error");
146}
147
148/// Returns a list of all peers and their respective API URLs taking into
149/// account announcements overwriting the URLs contained in the original
150/// configuration.
151pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
152    override_api_urls(
153        db,
154        cfg.global
155            .api_endpoints
156            .iter()
157            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
158        &ApiAnnouncementPrefix,
159        |key| key.0,
160    )
161    .await
162}