1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, Encodable};
9use fedimint_core::envs::is_running_in_test_env;
10use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
11use fedimint_core::runtime::{self, sleep};
12use fedimint_core::secp256k1::SECP256K1;
13use fedimint_core::util::backoff_util::custom_backoff;
14use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
15use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
16use fedimint_logging::LOG_CLIENT;
17use futures::stream::{FuturesUnordered, StreamExt as _};
18use tracing::debug;
19
20use crate::Client;
21use crate::db::DbKeyPrefix;
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct GuardianMetadataKey(pub PeerId);
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct GuardianMetadataPrefix;
28
29impl_db_record!(
30 key = GuardianMetadataKey,
31 value = SignedGuardianMetadata,
32 db_prefix = DbKeyPrefix::GuardianMetadata,
33 notify_on_modify = false,
34);
35impl_db_lookup!(
36 key = GuardianMetadataKey,
37 query_prefix = GuardianMetadataPrefix
38);
39
40pub(crate) async fn run_guardian_metadata_refresh_task(client_inner: Arc<Client>) {
43 let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
45 loop {
46 if let Err(err) = {
47 let api: &DynGlobalApi = &client_inner.api;
48 let results = fetch_guardian_metadata_from_at_least_num_of_peers(
49 1,
50 api,
51 &guardian_pub_keys,
52 if is_running_in_test_env() {
53 Duration::from_millis(1)
54 } else {
55 Duration::from_secs(30)
56 },
57 )
58 .await;
59 store_guardian_metadata_updates_from_peers(
60 client_inner.db(),
61 &guardian_pub_keys,
62 &results,
63 )
64 .await
65 } {
66 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing guardian metadata failed");
67 }
68
69 let duration = if is_running_in_test_env() {
70 Duration::from_secs(1)
71 } else {
72 Duration::from_secs(3600)
74 };
75 sleep(duration).await;
76 }
77}
78
79pub(crate) async fn store_guardian_metadata_updates_from_peers(
80 db: &Database,
81 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
82 updates: &[BTreeMap<PeerId, SignedGuardianMetadata>],
83) -> Result<(), anyhow::Error> {
84 for metadata_map in updates {
85 store_guardian_metadata_updates(db, guardian_pub_keys, metadata_map).await;
86 }
87
88 Ok(())
89}
90
91pub(crate) type PeersSignedGuardianMetadata = BTreeMap<PeerId, SignedGuardianMetadata>;
92
93pub(crate) async fn fetch_guardian_metadata_from_at_least_num_of_peers(
98 num_responses_required: usize,
99 api: &DynGlobalApi,
100 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
101 extra_response_wait: Duration,
102) -> Vec<PeersSignedGuardianMetadata> {
103 let num_peers = guardian_pub_keys.to_num_peers();
104 let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
107
108 async fn make_request(
110 delay: Duration,
111 peer_id: PeerId,
112 api: &DynGlobalApi,
113 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
114 ) -> (PeerId, anyhow::Result<PeersSignedGuardianMetadata>) {
115 runtime::sleep(delay).await;
116
117 let result = async {
118 let metadata_map = api.guardian_metadata(peer_id).await.with_context(move || {
119 format!("Fetching guardian metadata from peer {peer_id} failed")
120 })?;
121
122 for (peer_id, metadata) in &metadata_map {
125 let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
126 bail!("Guardian public key not found for peer {}", peer_id);
127 };
128
129 let now = fedimint_core::time::duration_since_epoch();
130 if let Err(e) = metadata.verify(SECP256K1, guardian_pub_key, now) {
131 bail!("Failed to verify metadata for peer {}: {}", peer_id, e);
132 }
133 }
134 Ok(metadata_map)
135 }
136 .await;
137
138 (peer_id, result)
139 }
140
141 let mut requests = FuturesUnordered::new();
142
143 for peer_id in num_peers.peer_ids() {
144 requests.push(make_request(
145 Duration::ZERO,
146 peer_id,
147 api,
148 guardian_pub_keys,
149 ));
150 }
151
152 let mut responses = Vec::new();
153
154 loop {
155 let next_response = if responses.len() < num_responses_required {
156 requests.next().await
158 } else {
159 fedimint_core::runtime::timeout(extra_response_wait, requests.next())
163 .await
164 .ok()
165 .flatten()
166 };
167
168 let Some((peer_id, response)) = next_response else {
169 break;
170 };
171
172 match response {
173 Err(err) => {
174 debug!(
175 target: LOG_CLIENT,
176 %peer_id,
177 err = %err.fmt_compact_anyhow(),
178 "Failed to fetch guardian metadata from peer"
179 );
180 requests.push(make_request(
181 backoff.next().expect("Keeps retrying"),
182 peer_id,
183 api,
184 guardian_pub_keys,
185 ));
186 }
187 Ok(metadata) => {
188 responses.push(metadata);
189 }
190 }
191 }
192
193 responses
194}
195
196pub(crate) async fn store_guardian_metadata_updates(
197 db: &Database,
198 guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
199 metadata_map: &BTreeMap<PeerId, SignedGuardianMetadata>,
200) {
201 let now = fedimint_core::time::duration_since_epoch();
202
203 db.autocommit(
204 |dbtx, _| {
205 let metadata_map_inner = metadata_map.clone();
206 let guardian_pub_keys_inner = guardian_pub_keys.clone();
207 Box::pin(async move {
208 for (peer, new_metadata) in metadata_map_inner {
209 let Some(guardian_pub_key) = guardian_pub_keys_inner.get(&peer) else {
211 debug!(
212 target: LOG_CLIENT,
213 ?peer,
214 "Skipping metadata update: guardian public key not found"
215 );
216 continue;
217 };
218
219 if let Err(e) = new_metadata.verify(SECP256K1, guardian_pub_key, now) {
220 debug!(
221 target: LOG_CLIENT,
222 ?peer,
223 err = %e.fmt_compact(),
224 "Skipping metadata update: verification failed"
225 );
226 continue;
227 }
228
229 let replace_current_metadata = dbtx
230 .get_value(&GuardianMetadataKey(peer))
231 .await
232 .is_none_or(|current_metadata| {
233 current_metadata.guardian_metadata().timestamp_secs
235 < new_metadata.guardian_metadata().timestamp_secs
236 });
237 if replace_current_metadata {
238 debug!(target: LOG_CLIENT, ?peer, "Updating guardian metadata");
239 dbtx.insert_entry(&GuardianMetadataKey(peer), &new_metadata)
240 .await;
241 }
242 }
243
244 Result::<(), ()>::Ok(())
245 })
246 },
247 None,
248 )
249 .await
250 .expect("Will never return an error");
251}