fedimint_client/
guardian_metadata.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, bail};
6use fedimint_api_client::api::DynGlobalApi;
7use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::encoding::{Decodable, Encodable};
9use fedimint_core::envs::is_running_in_test_env;
10use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
11use fedimint_core::runtime::{self, sleep};
12use fedimint_core::secp256k1::SECP256K1;
13use fedimint_core::util::backoff_util::custom_backoff;
14use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
15use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
16use fedimint_logging::LOG_CLIENT;
17use futures::stream::{FuturesUnordered, StreamExt as _};
18use tracing::debug;
19
20use crate::Client;
21use crate::db::DbKeyPrefix;
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct GuardianMetadataKey(pub PeerId);
25
26#[derive(Clone, Debug, Encodable, Decodable)]
27pub struct GuardianMetadataPrefix;
28
29impl_db_record!(
30    key = GuardianMetadataKey,
31    value = SignedGuardianMetadata,
32    db_prefix = DbKeyPrefix::GuardianMetadata,
33    notify_on_modify = false,
34);
35impl_db_lookup!(
36    key = GuardianMetadataKey,
37    query_prefix = GuardianMetadataPrefix
38);
39
40/// Fetches guardian metadata from guardians, validates them and updates the
41/// DB if any new more up to date ones are found.
42pub(crate) async fn run_guardian_metadata_refresh_task(client_inner: Arc<Client>) {
43    // Wait for the guardian keys to be available
44    let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
45    loop {
46        if let Err(err) = {
47            let api: &DynGlobalApi = &client_inner.api;
48            let results = fetch_guardian_metadata_from_at_least_num_of_peers(
49                1,
50                api,
51                &guardian_pub_keys,
52                if is_running_in_test_env() {
53                    Duration::from_millis(1)
54                } else {
55                    Duration::from_secs(30)
56                },
57            )
58            .await;
59            store_guardian_metadata_updates_from_peers(
60                client_inner.db(),
61                &guardian_pub_keys,
62                &results,
63            )
64            .await
65        } {
66            debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing guardian metadata failed");
67        }
68
69        let duration = if is_running_in_test_env() {
70            Duration::from_secs(1)
71        } else {
72            // Check once an hour if there are new metadata
73            Duration::from_secs(3600)
74        };
75        sleep(duration).await;
76    }
77}
78
79pub(crate) async fn store_guardian_metadata_updates_from_peers(
80    db: &Database,
81    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
82    updates: &[BTreeMap<PeerId, SignedGuardianMetadata>],
83) -> Result<(), anyhow::Error> {
84    for metadata_map in updates {
85        store_guardian_metadata_updates(db, guardian_pub_keys, metadata_map).await;
86    }
87
88    Ok(())
89}
90
91pub(crate) type PeersSignedGuardianMetadata = BTreeMap<PeerId, SignedGuardianMetadata>;
92
93/// Fetch responses from at least `num_responses_required` of peers.
94///
95/// Will wait a little bit extra in hopes of collecting more than strictly
96/// needed responses.
97pub(crate) async fn fetch_guardian_metadata_from_at_least_num_of_peers(
98    num_responses_required: usize,
99    api: &DynGlobalApi,
100    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
101    extra_response_wait: Duration,
102) -> Vec<PeersSignedGuardianMetadata> {
103    let num_peers = guardian_pub_keys.to_num_peers();
104    // Keep trying, initially somewhat aggressively, but after a while retry very
105    // slowly, because chances for response are getting lower and lower.
106    let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
107
108    // Make a single request to a peer after a delay
109    async fn make_request(
110        delay: Duration,
111        peer_id: PeerId,
112        api: &DynGlobalApi,
113        guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
114    ) -> (PeerId, anyhow::Result<PeersSignedGuardianMetadata>) {
115        runtime::sleep(delay).await;
116
117        let result = async {
118            let metadata_map = api.guardian_metadata(peer_id).await.with_context(move || {
119                format!("Fetching guardian metadata from peer {peer_id} failed")
120            })?;
121
122            // If any of the metadata is invalid something is fishy with that
123            // guardian and we ignore all its responses
124            for (peer_id, metadata) in &metadata_map {
125                let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
126                    bail!("Guardian public key not found for peer {}", peer_id);
127                };
128
129                let now = fedimint_core::time::duration_since_epoch();
130                if let Err(e) = metadata.verify(SECP256K1, guardian_pub_key, now) {
131                    bail!("Failed to verify metadata for peer {}: {}", peer_id, e);
132                }
133            }
134            Ok(metadata_map)
135        }
136        .await;
137
138        (peer_id, result)
139    }
140
141    let mut requests = FuturesUnordered::new();
142
143    for peer_id in num_peers.peer_ids() {
144        requests.push(make_request(
145            Duration::ZERO,
146            peer_id,
147            api,
148            guardian_pub_keys,
149        ));
150    }
151
152    let mut responses = Vec::new();
153
154    loop {
155        let next_response = if responses.len() < num_responses_required {
156            // If we don't have enough responses yet, we wait
157            requests.next().await
158        } else {
159            // if we do have responses we need, we wait opportunistically just for a small
160            // duration if any other responses are ready anyway, just to not
161            // throw them away
162            fedimint_core::runtime::timeout(extra_response_wait, requests.next())
163                .await
164                .ok()
165                .flatten()
166        };
167
168        let Some((peer_id, response)) = next_response else {
169            break;
170        };
171
172        match response {
173            Err(err) => {
174                debug!(
175                    target: LOG_CLIENT,
176                    %peer_id,
177                    err = %err.fmt_compact_anyhow(),
178                    "Failed to fetch guardian metadata from peer"
179                );
180                requests.push(make_request(
181                    backoff.next().expect("Keeps retrying"),
182                    peer_id,
183                    api,
184                    guardian_pub_keys,
185                ));
186            }
187            Ok(metadata) => {
188                responses.push(metadata);
189            }
190        }
191    }
192
193    responses
194}
195
196pub(crate) async fn store_guardian_metadata_updates(
197    db: &Database,
198    guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
199    metadata_map: &BTreeMap<PeerId, SignedGuardianMetadata>,
200) {
201    let now = fedimint_core::time::duration_since_epoch();
202
203    db.autocommit(
204        |dbtx, _| {
205            let metadata_map_inner = metadata_map.clone();
206            let guardian_pub_keys_inner = guardian_pub_keys.clone();
207            Box::pin(async move {
208                for (peer, new_metadata) in metadata_map_inner {
209                    // Verify signature before storing
210                    let Some(guardian_pub_key) = guardian_pub_keys_inner.get(&peer) else {
211                        debug!(
212                            target: LOG_CLIENT,
213                            ?peer,
214                            "Skipping metadata update: guardian public key not found"
215                        );
216                        continue;
217                    };
218
219                    if let Err(e) = new_metadata.verify(SECP256K1, guardian_pub_key, now) {
220                        debug!(
221                            target: LOG_CLIENT,
222                            ?peer,
223                            err = %e.fmt_compact(),
224                            "Skipping metadata update: verification failed"
225                        );
226                        continue;
227                    }
228
229                    let replace_current_metadata = dbtx
230                        .get_value(&GuardianMetadataKey(peer))
231                        .await
232                        .is_none_or(|current_metadata| {
233                            // Replace if new metadata has a newer timestamp
234                            current_metadata.guardian_metadata().timestamp_secs
235                                < new_metadata.guardian_metadata().timestamp_secs
236                        });
237                    if replace_current_metadata {
238                        debug!(target: LOG_CLIENT, ?peer, "Updating guardian metadata");
239                        dbtx.insert_entry(&GuardianMetadataKey(peer), &new_metadata)
240                            .await;
241                    }
242                }
243
244                Result::<(), ()>::Ok(())
245            })
246        },
247        None,
248    )
249    .await
250    .expect("Will never return an error");
251}