fedimint_client/
api_announcements.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_core::config::ClientConfig;
7use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, Encodable};
9use fedimint_core::net::api_announcement::{SignedApiAnnouncement, override_api_urls};
10use fedimint_core::runtime::sleep;
11use fedimint_core::secp256k1::SECP256K1;
12use fedimint_core::util::{SafeUrl, backoff_util, retry};
13use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
14use fedimint_logging::LOG_CLIENT;
15use futures::future::join_all;
16use tracing::{debug, warn};
17
18use crate::Client;
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28 key = ApiAnnouncementKey,
29 value = SignedApiAnnouncement,
30 db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
31 notify_on_modify = false,
32);
33impl_db_lookup!(
34 key = ApiAnnouncementKey,
35 query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn run_api_announcement_sync(client_inner: Arc<Client>) {
41 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
43 loop {
44 let results = join_all(client_inner.api.all_peers().iter()
45 .map(|peer_id| async {
46 let peer_id = *peer_id;
47 let announcements =
48
49 retry(
50 "Fetch api announcement (sync)",
51 backoff_util::aggressive_backoff(),
52 || async {
53 client_inner
54 .api
55 .api_announcements(peer_id)
56 .await
57 .with_context(move || format!("Fetching API announcements from peer {peer_id} failed"))
58 },
59 )
60 .await?;
61
62 for (peer_id, announcement) in &announcements {
65 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
66 bail!("Guardian public key not found for peer {}", peer_id);
67 };
68
69 if !announcement.verify(SECP256K1, guardian_pub_key) {
70 bail!("Failed to verify announcement for peer {}", peer_id);
71 }
72 }
73
74 client_inner
75 .db()
76 .autocommit(
77 |dbtx, _|{
78 let announcements_inner = announcements.clone();
79 Box::pin(async move {
80 for (peer, new_announcement) in announcements_inner {
81 let replace_current_announcement = dbtx
82 .get_value(&ApiAnnouncementKey(peer))
83 .await.is_none_or(|current_announcement| {
84 current_announcement.api_announcement.nonce
85 < new_announcement.api_announcement.nonce
86 });
87 if replace_current_announcement {
88 debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
89 dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
90 .await;
91 }
92 }
93
94 Result::<(), ()>::Ok(())
95 })},
96 None,
97 )
98 .await
99 .expect("Will never return an error");
100
101 Ok(())
102 })).await;
103
104 for (peer_id, result) in guardian_pub_keys.keys().zip(results) {
105 if let Err(e) = result {
106 warn!(target: LOG_CLIENT, %peer_id, ?e, "Failed to process API announcements");
107 }
108 }
109
110 sleep(Duration::from_secs(3600)).await;
112 }
113}
114
115pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
119 override_api_urls(
120 db,
121 cfg.global
122 .api_endpoints
123 .iter()
124 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
125 &ApiAnnouncementPrefix,
126 |key| key.0,
127 )
128 .await
129}