1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::config::ClientConfig;
8use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::encoding::{Decodable, Encodable};
10use fedimint_core::envs::is_running_in_test_env;
11use fedimint_core::net::api_announcement::SignedApiAnnouncement;
12use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
13use fedimint_core::runtime::{self, sleep};
14use fedimint_core::secp256k1::SECP256K1;
15use fedimint_core::util::backoff_util::custom_backoff;
16use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
17use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
18use fedimint_logging::LOG_CLIENT;
19use futures::stream::{FuturesUnordered, StreamExt as _};
20use tracing::debug;
21
22use crate::Client;
23use crate::db::DbKeyPrefix;
24use crate::guardian_metadata::GuardianMetadataPrefix;
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct ApiAnnouncementKey(pub PeerId);
28
29#[derive(Clone, Debug, Encodable, Decodable)]
30pub struct ApiAnnouncementPrefix;
31
32impl_db_record!(
33 key = ApiAnnouncementKey,
34 value = SignedApiAnnouncement,
35 db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
36 notify_on_modify = false,
37);
38impl_db_lookup!(
39 key = ApiAnnouncementKey,
40 query_prefix = ApiAnnouncementPrefix
41);
42
43pub(crate) async fn run_api_announcement_refresh_task(client_inner: Arc<Client>) {
46 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
48 loop {
49 if let Err(err) = {
50 let api: &DynGlobalApi = &client_inner.api;
51 let results = fetch_api_announcements_from_at_least_num_of_peers(
52 1,
53 api,
54 &guardian_pub_keys,
55 if is_running_in_test_env() {
56 Duration::from_millis(1)
57 } else {
58 Duration::from_secs(30)
59 },
60 )
61 .await;
62 store_api_announcements_updates_from_peers(client_inner.db(), &results).await
63 } {
64 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
65 }
66
67 let duration = if is_running_in_test_env() {
68 Duration::from_secs(1)
69 } else {
70 Duration::from_secs(3600)
72 };
73 sleep(duration).await;
74 }
75}
76
77pub(crate) async fn store_api_announcements_updates_from_peers(
78 db: &Database,
79 updates: &[BTreeMap<PeerId, SignedApiAnnouncement>],
80) -> Result<(), anyhow::Error> {
81 for announcements in updates {
82 store_api_announcement_updates(db, announcements).await;
83 }
84
85 Ok(())
86}
87
88pub(crate) type PeersSignedApiAnnouncements = BTreeMap<PeerId, SignedApiAnnouncement>;
89
90pub(crate) async fn fetch_api_announcements_from_at_least_num_of_peers(
95 num_responses_required: usize,
96 api: &DynGlobalApi,
97 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
98 extra_response_wait: Duration,
99) -> Vec<PeersSignedApiAnnouncements> {
100 let num_peers = guardian_pub_keys.to_num_peers();
101 let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
104
105 async fn make_request(
107 delay: Duration,
108 peer_id: PeerId,
109 api: &DynGlobalApi,
110 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
111 ) -> (PeerId, anyhow::Result<PeersSignedApiAnnouncements>) {
112 runtime::sleep(delay).await;
113
114 let result = async {
115 let announcements = api.api_announcements(peer_id).await.with_context(move || {
116 format!("Fetching API announcements from peer {peer_id} failed")
117 })?;
118
119 for (peer_id, announcement) in &announcements {
122 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
123 bail!("Guardian public key not found for peer {}", peer_id);
124 };
125
126 if !announcement.verify(SECP256K1, guardian_pub_key) {
127 bail!("Failed to verify announcement for peer {}", peer_id);
128 }
129 }
130 Ok(announcements)
131 }
132 .await;
133
134 (peer_id, result)
135 }
136
137 let mut requests = FuturesUnordered::new();
138
139 for peer_id in num_peers.peer_ids() {
140 requests.push(make_request(
141 Duration::ZERO,
142 peer_id,
143 api,
144 guardian_pub_keys,
145 ));
146 }
147
148 let mut responses = Vec::new();
149
150 loop {
151 let next_response = if responses.len() < num_responses_required {
152 requests.next().await
154 } else {
155 fedimint_core::runtime::timeout(extra_response_wait, requests.next())
159 .await
160 .ok()
161 .flatten()
162 };
163
164 let Some((peer_id, response)) = next_response else {
165 break;
166 };
167
168 match response {
169 Err(err) => {
170 debug!(
171 target: LOG_CLIENT,
172 %peer_id,
173 err = %err.fmt_compact_anyhow(),
174 "Failed to fetch API announcements from peer"
175 );
176 requests.push(make_request(
177 backoff.next().expect("Keeps retrying"),
178 peer_id,
179 api,
180 guardian_pub_keys,
181 ));
182 }
183 Ok(announcements) => {
184 responses.push(announcements);
185 }
186 }
187 }
188
189 responses
190}
191
192pub(crate) async fn store_api_announcement_updates(
193 db: &Database,
194 announcements: &BTreeMap<PeerId, SignedApiAnnouncement>,
195) {
196 db
197 .autocommit(
198 |dbtx, _|{
199 let announcements_inner = announcements.clone();
200 Box::pin(async move {
201 for (peer, new_announcement) in announcements_inner {
202 let replace_current_announcement = dbtx
203 .get_value(&ApiAnnouncementKey(peer))
204 .await.is_none_or(|current_announcement| {
205 current_announcement.api_announcement.nonce
206 < new_announcement.api_announcement.nonce
207 });
208 if replace_current_announcement {
209 debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
210 dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
211 .await;
212 }
213 }
214
215 Result::<(), ()>::Ok(())
216 })},
217 None,
218 )
219 .await
220 .expect("Will never return an error");
221}
222
223pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
232 let mut dbtx = db.begin_transaction_nc().await;
233
234 let guardian_metadata: BTreeMap<PeerId, SignedGuardianMetadata> = dbtx
236 .find_by_prefix(&GuardianMetadataPrefix)
237 .await
238 .map(|(key, metadata)| (key.0, metadata))
239 .collect()
240 .await;
241
242 let api_announcements: BTreeMap<PeerId, SignedApiAnnouncement> = dbtx
244 .find_by_prefix(&ApiAnnouncementPrefix)
245 .await
246 .map(|(key, announcement)| (key.0, announcement))
247 .collect()
248 .await;
249
250 cfg.global
252 .api_endpoints
253 .iter()
254 .map(|(peer_id, peer_url)| {
255 let url = guardian_metadata
256 .get(peer_id)
257 .and_then(|m| m.guardian_metadata().api_urls.first().cloned())
258 .or_else(|| {
259 api_announcements
260 .get(peer_id)
261 .map(|a| a.api_announcement.api_url.clone())
262 })
263 .unwrap_or_else(|| peer_url.url.clone());
264 (*peer_id, url)
265 })
266 .collect()
267}