fedimint_server/net/api/
guardian_metadata.rs

1use std::time::{Duration, UNIX_EPOCH};
2
3use fedimint_api_client::api::DynGlobalApi;
4use fedimint_connectors::ConnectorRegistry;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::envs::is_running_in_test_env;
8use fedimint_core::net::guardian_metadata::{GuardianMetadata, SignedGuardianMetadata};
9use fedimint_core::task::{TaskGroup, sleep};
10use fedimint_core::util::FmtCompact;
11use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
12use fedimint_logging::LOG_NET_API;
13use futures::future::join_all;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::{debug, info};
17
18use crate::config::ServerConfig;
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct GuardianMetadataKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct GuardianMetadataPrefix;
26
27impl_db_record!(
28    key = GuardianMetadataKey,
29    value = SignedGuardianMetadata,
30    db_prefix = DbKeyPrefix::GuardianMetadata,
31    notify_on_modify = true,
32);
33impl_db_lookup!(
34    key = GuardianMetadataKey,
35    query_prefix = GuardianMetadataPrefix
36);
37
38pub async fn start_guardian_metadata_service(
39    db: &Database,
40    tg: &TaskGroup,
41    cfg: &ServerConfig,
42    api_secret: Option<String>,
43) -> anyhow::Result<()> {
44    const INITIAL_DELAY_SECONDS: u64 = 5;
45    const FAILURE_RETRY_SECONDS: u64 = 60;
46    const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48    let initial_delay = if insert_signed_guardian_metadata_if_not_present(db, cfg).await {
49        Duration::ZERO
50    } else {
51        Duration::from_secs(INITIAL_DELAY_SECONDS)
52    };
53
54    let db = db.clone();
55    let api_client = DynGlobalApi::new(
56        ConnectorRegistry::build_from_server_env()?.bind().await?,
57        super::announcement::get_api_urls(&db, &cfg.consensus).await,
58        api_secret.as_deref(),
59    )?;
60
61    let our_peer_id = cfg.local.identity;
62    tg.spawn_cancellable("submit-guardian-metadata", async move {
63        // Give other servers some time to start up in case they were just restarted together
64        sleep(initial_delay).await;
65        loop {
66            let mut success = true;
67            let metadata_list = db
68                .begin_transaction_nc()
69                .await
70                .find_by_prefix(&GuardianMetadataPrefix)
71                .await
72                .map(|(peer_key, peer_metadata)| (peer_key.0, peer_metadata))
73                .collect::<Vec<(PeerId, SignedGuardianMetadata)>>()
74                .await;
75
76            info!(
77                target: LOG_NET_API,
78                len = %metadata_list.len(),
79                "Submitting guardian metadata"
80            );
81            // Submit all metadata we know (including our own and other peers') to all
82            // federation members (in parallel). Each submit_guardian_metadata call
83            // broadcasts one piece of metadata to all peers.
84            let results = join_all(metadata_list.iter().map(|(peer, metadata)| {
85                let api_client = &api_client;
86                async move {
87                    (*peer, api_client.submit_guardian_metadata(*peer, metadata.clone()).await)
88                }
89            }))
90            .await;
91
92            info!(
93                target: LOG_NET_API,
94                len = %metadata_list.len(),
95                "Done"
96            );
97            for (peer, result) in results {
98                if let Err(err) = result {
99                    debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Submitting guardian metadata did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
100                    success = false;
101                }
102            }
103
104            // While we announce all peer metadata, we only want to immediately trigger in case ours changes
105            let our_metadata_key = GuardianMetadataKey(our_peer_id);
106            let our_metadata = db
107                .begin_transaction_nc()
108                .await
109                .get_value(&our_metadata_key)
110                .await
111                .expect("Our guardian metadata is always present");
112
113            let new_metadata = db.wait_key_check(&our_metadata_key, |new_metadata| {
114                new_metadata.and_then(|new_metadata| {
115                    (new_metadata.tagged_hash() != our_metadata.tagged_hash()).then_some(())
116                })
117            });
118
119
120            let auto_announcement_delay = if success {
121                Duration::from_secs(SUCCESS_RETRY_SECONDS)
122            } else if is_running_in_test_env() {
123                Duration::from_secs(3)
124            } else {
125                Duration::from_secs(FAILURE_RETRY_SECONDS)
126            };
127
128            select! {
129                _ = new_metadata => {},
130                () = sleep(auto_announcement_delay) => {},
131            }
132        }
133    });
134
135    Ok(())
136}
137
138/// Checks if we already have a signed guardian metadata for our own identity
139/// in the database and creates one if not.
140///
141/// Return `true` fresh metadata was inserted because it was not present
142async fn insert_signed_guardian_metadata_if_not_present(db: &Database, cfg: &ServerConfig) -> bool {
143    let mut dbtx = db.begin_transaction().await;
144    if dbtx
145        .get_value(&GuardianMetadataKey(cfg.local.identity))
146        .await
147        .is_some()
148    {
149        return false;
150    }
151
152    let timestamp_secs = fedimint_core::time::now()
153        .duration_since(UNIX_EPOCH)
154        .expect("System time should be after UNIX_EPOCH")
155        .as_secs();
156
157    let guardian_metadata = GuardianMetadata::new(
158        cfg.consensus
159            .api_endpoints()
160            .get(&cfg.local.identity)
161            .map(|endpoint| vec![endpoint.url.clone()])
162            .unwrap_or_default(),
163        // TODO: Get the actual pkarr_id_z32 from config or generate it
164        String::new(),
165        timestamp_secs,
166    );
167    let ctx = secp256k1::Secp256k1::new();
168    let signed_metadata =
169        guardian_metadata.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
170
171    dbtx.insert_entry(&GuardianMetadataKey(cfg.local.identity), &signed_metadata)
172        .await;
173    dbtx.commit_tx().await;
174
175    true
176}