fedimint_client/
api_announcements.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::{SignedApiAnnouncement, override_api_urls};
12use fedimint_core::runtime::{self, sleep};
13use fedimint_core::secp256k1::SECP256K1;
14use fedimint_core::util::backoff_util::custom_backoff;
15use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
16use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
17use fedimint_logging::LOG_CLIENT;
18use futures::stream::{FuturesUnordered, StreamExt as _};
19use tracing::debug;
20
21use crate::Client;
22use crate::db::DbKeyPrefix;
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementKey(pub PeerId);
26
27#[derive(Clone, Debug, Encodable, Decodable)]
28pub struct ApiAnnouncementPrefix;
29
30impl_db_record!(
31 key = ApiAnnouncementKey,
32 value = SignedApiAnnouncement,
33 db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
34 notify_on_modify = false,
35);
36impl_db_lookup!(
37 key = ApiAnnouncementKey,
38 query_prefix = ApiAnnouncementPrefix
39);
40
41pub(crate) async fn run_api_announcement_refresh_task(client_inner: Arc<Client>) {
44 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
46 loop {
47 if let Err(err) = {
48 let api: &DynGlobalApi = &client_inner.api;
49 let results = fetch_api_announcements_from_at_least_num_of_peers(
50 1,
51 api,
52 &guardian_pub_keys,
53 if is_running_in_test_env() {
54 Duration::from_millis(1)
55 } else {
56 Duration::from_secs(30)
57 },
58 )
59 .await;
60 store_api_announcements_updates_from_peers(client_inner.db(), &results).await
61 } {
62 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
63 }
64
65 let duration = if is_running_in_test_env() {
66 Duration::from_secs(1)
67 } else {
68 Duration::from_secs(3600)
70 };
71 sleep(duration).await;
72 }
73}
74
75pub(crate) async fn store_api_announcements_updates_from_peers(
76 db: &Database,
77 updates: &[BTreeMap<PeerId, SignedApiAnnouncement>],
78) -> Result<(), anyhow::Error> {
79 for announcements in updates {
80 store_api_announcement_updates(db, announcements).await;
81 }
82
83 Ok(())
84}
85
86pub(crate) type PeersSignedApiAnnouncements = BTreeMap<PeerId, SignedApiAnnouncement>;
87
88pub(crate) async fn fetch_api_announcements_from_at_least_num_of_peers(
93 num_responses_required: usize,
94 api: &DynGlobalApi,
95 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
96 extra_response_wait: Duration,
97) -> Vec<PeersSignedApiAnnouncements> {
98 let num_peers = guardian_pub_keys.to_num_peers();
99 let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
102
103 async fn make_request(
105 delay: Duration,
106 peer_id: PeerId,
107 api: &DynGlobalApi,
108 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
109 ) -> (PeerId, anyhow::Result<PeersSignedApiAnnouncements>) {
110 runtime::sleep(delay).await;
111
112 let result = async {
113 let announcements = api.api_announcements(peer_id).await.with_context(move || {
114 format!("Fetching API announcements from peer {peer_id} failed")
115 })?;
116
117 for (peer_id, announcement) in &announcements {
120 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
121 bail!("Guardian public key not found for peer {}", peer_id);
122 };
123
124 if !announcement.verify(SECP256K1, guardian_pub_key) {
125 bail!("Failed to verify announcement for peer {}", peer_id);
126 }
127 }
128 Ok(announcements)
129 }
130 .await;
131
132 (peer_id, result)
133 }
134
135 let mut requests = FuturesUnordered::new();
136
137 for peer_id in num_peers.peer_ids() {
138 requests.push(make_request(
139 Duration::ZERO,
140 peer_id,
141 api,
142 guardian_pub_keys,
143 ));
144 }
145
146 let mut responses = Vec::new();
147
148 loop {
149 let next_response = if responses.len() < num_responses_required {
150 requests.next().await
152 } else {
153 fedimint_core::runtime::timeout(extra_response_wait, requests.next())
157 .await
158 .ok()
159 .flatten()
160 };
161
162 let Some((peer_id, response)) = next_response else {
163 break;
164 };
165
166 match response {
167 Err(err) => {
168 debug!(
169 target: LOG_CLIENT,
170 %peer_id,
171 err = %err.fmt_compact_anyhow(),
172 "Failed to fetch API announcements from peer"
173 );
174 requests.push(make_request(
175 backoff.next().expect("Keeps retrying"),
176 peer_id,
177 api,
178 guardian_pub_keys,
179 ));
180 }
181 Ok(announcements) => {
182 responses.push(announcements);
183 }
184 }
185 }
186
187 responses
188}
189
190pub(crate) async fn store_api_announcement_updates(
191 db: &Database,
192 announcements: &BTreeMap<PeerId, SignedApiAnnouncement>,
193) {
194 db
195 .autocommit(
196 |dbtx, _|{
197 let announcements_inner = announcements.clone();
198 Box::pin(async move {
199 for (peer, new_announcement) in announcements_inner {
200 let replace_current_announcement = dbtx
201 .get_value(&ApiAnnouncementKey(peer))
202 .await.is_none_or(|current_announcement| {
203 current_announcement.api_announcement.nonce
204 < new_announcement.api_announcement.nonce
205 });
206 if replace_current_announcement {
207 debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
208 dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
209 .await;
210 }
211 }
212
213 Result::<(), ()>::Ok(())
214 })},
215 None,
216 )
217 .await
218 .expect("Will never return an error");
219}
220
221pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
225 override_api_urls(
226 db,
227 cfg.global
228 .api_endpoints
229 .iter()
230 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
231 &ApiAnnouncementPrefix,
232 |key| key.0,
233 )
234 .await
235}