fedimint_client/
api_announcements.rs
1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::{SignedApiAnnouncement, override_api_urls};
12use fedimint_core::runtime::sleep;
13use fedimint_core::secp256k1::SECP256K1;
14use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
15use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
16use fedimint_logging::LOG_CLIENT;
17use futures::future::join_all;
18use tracing::{debug, warn};
19
20use crate::Client;
21use crate::db::DbKeyPrefix;
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementKey(pub PeerId);
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct ApiAnnouncementPrefix;
28
29impl_db_record!(
30 key = ApiAnnouncementKey,
31 value = SignedApiAnnouncement,
32 db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
33 notify_on_modify = false,
34);
35impl_db_lookup!(
36 key = ApiAnnouncementKey,
37 query_prefix = ApiAnnouncementPrefix
38);
39
40pub(crate) async fn run_api_announcement_sync(client_inner: Arc<Client>) {
43 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
45 loop {
46 if let Err(err) =
47 refresh_api_announcement_sync(&client_inner.api, client_inner.db(), &guardian_pub_keys)
48 .await
49 {
50 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
51 }
52
53 let duration = if is_running_in_test_env() {
54 Duration::from_secs(1)
55 } else {
56 Duration::from_secs(3600)
58 };
59 sleep(duration).await;
60 }
61}
62
63pub(crate) async fn refresh_api_announcement_sync(
64 api: &DynGlobalApi,
65 db: &Database,
66 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
67) -> anyhow::Result<()> {
68 let results = fetch_api_announcements_from_all_peers(api, guardian_pub_keys).await;
69
70 let mut some_success = false;
71
72 for (peer_id, result) in guardian_pub_keys.keys().zip(results) {
73 match result {
74 Ok(announcements) => {
75 store_api_announcements(db, announcements).await;
76 some_success |= true
77 }
78 Err(e) => {
79 warn!(target: LOG_CLIENT, %peer_id, ?e, "Failed to process API announcements");
80 }
81 }
82 }
83
84 if some_success {
85 Ok(())
86 } else {
87 bail!("Unable to get any api announcements");
88 }
89}
90
91async fn fetch_api_announcements_from_all_peers(
92 api: &DynGlobalApi,
93 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
94) -> Vec<Result<BTreeMap<PeerId, SignedApiAnnouncement>, anyhow::Error>> {
95 join_all(api.all_peers().iter().map(|peer_id| async {
96 let peer_id = *peer_id;
97 let announcements = api.api_announcements(peer_id).await.with_context(move || {
98 format!("Fetching API announcements from peer {peer_id} failed")
99 })?;
100
101 for (peer_id, announcement) in &announcements {
104 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
105 bail!("Guardian public key not found for peer {}", peer_id);
106 };
107
108 if !announcement.verify(SECP256K1, guardian_pub_key) {
109 bail!("Failed to verify announcement for peer {}", peer_id);
110 }
111 }
112 Ok(announcements)
113 }))
114 .await
115}
116
117pub(crate) async fn store_api_announcements(
118 db: &Database,
119 announcements: BTreeMap<PeerId, SignedApiAnnouncement>,
120) {
121 db
122 .autocommit(
123 |dbtx, _|{
124 let announcements_inner = announcements.clone();
125 Box::pin(async move {
126 for (peer, new_announcement) in announcements_inner {
127 let replace_current_announcement = dbtx
128 .get_value(&ApiAnnouncementKey(peer))
129 .await.is_none_or(|current_announcement| {
130 current_announcement.api_announcement.nonce
131 < new_announcement.api_announcement.nonce
132 });
133 if replace_current_announcement {
134 debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
135 dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
136 .await;
137 }
138 }
139
140 Result::<(), ()>::Ok(())
141 })},
142 None,
143 )
144 .await
145 .expect("Will never return an error");
146}
147
148pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
152 override_api_urls(
153 db,
154 cfg.global
155 .api_endpoints
156 .iter()
157 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
158 &ApiAnnouncementPrefix,
159 |key| key.0,
160 )
161 .await
162}