fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_connectors::ConnectorRegistry;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::envs::is_running_in_test_env;
9use fedimint_core::net::api_announcement::{ApiAnnouncement, SignedApiAnnouncement};
10use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
11use fedimint_core::task::{TaskGroup, sleep};
12use fedimint_core::util::{FmtCompact, SafeUrl};
13use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
14use fedimint_logging::LOG_NET_API;
15use futures::future::join_all;
16use futures::stream::StreamExt;
17use tokio::select;
18use tracing::debug;
19
20use super::guardian_metadata::GuardianMetadataPrefix;
21use crate::config::{ServerConfig, ServerConfigConsensus};
22use crate::db::DbKeyPrefix;
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementKey(pub PeerId);
26
27#[derive(Clone, Debug, Encodable, Decodable)]
28pub struct ApiAnnouncementPrefix;
29
30impl_db_record!(
31    key = ApiAnnouncementKey,
32    value = SignedApiAnnouncement,
33    db_prefix = DbKeyPrefix::ApiAnnouncements,
34    notify_on_modify = true,
35);
36impl_db_lookup!(
37    key = ApiAnnouncementKey,
38    query_prefix = ApiAnnouncementPrefix
39);
40
41pub async fn start_api_announcement_service(
42    db: &Database,
43    tg: &TaskGroup,
44    cfg: &ServerConfig,
45    api_secret: Option<String>,
46) -> anyhow::Result<()> {
47    const INITIAL_DEALY_SECONDS: u64 = 5;
48    const FAILURE_RETRY_SECONDS: u64 = 60;
49    const SUCCESS_RETRY_SECONDS: u64 = 600;
50
51    let initial_delay = if insert_signed_api_announcement_if_not_present(db, cfg).await {
52        Duration::ZERO
53    } else {
54        Duration::from_secs(INITIAL_DEALY_SECONDS)
55    };
56
57    let db = db.clone();
58    // FIXME: (@leonardo) how should we handle the connector here ?
59    let api_client = DynGlobalApi::new(
60        // TODO: get from somewhere/unify?
61        ConnectorRegistry::build_from_server_env()?.bind().await?,
62        get_api_urls(&db, &cfg.consensus).await,
63        api_secret.as_deref(),
64    )?;
65
66    let our_peer_id = cfg.local.identity;
67    tg.spawn_cancellable("submit-api-url-announcement", async move {
68        // Give other servers some time to start up in case they were just restarted
69        // together
70        sleep(initial_delay).await;
71        loop {
72            let mut success = true;
73            let announcements = db.begin_transaction_nc()
74                .await
75                .find_by_prefix(&ApiAnnouncementPrefix)
76                .await
77                .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
78                .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
79                .await;
80
81            // Submit all API announcements we know (including our own and other peers')
82            // to all federation members (in parallel). Each submit_api_announcement call
83            // broadcasts one announcement to all peers.
84            let results = join_all(announcements.iter().map(|(peer, announcement)| {
85                let api_client = &api_client;
86                async move {
87                    (*peer, api_client.submit_api_announcement(*peer, announcement.clone()).await)
88                }
89            }))
90            .await;
91
92            for (peer, result) in results {
93                if let Err(err) = result {
94                    debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
95                    success = false;
96                }
97            }
98
99            // While we announce all peer API urls, we only want to immediately trigger in case
100            let our_announcement_key = ApiAnnouncementKey(our_peer_id);
101            let our_announcement = db
102                .begin_transaction_nc()
103                .await
104                .get_value(&our_announcement_key)
105                .await
106                .expect("Our announcement is always present");
107            let new_announcement = db.wait_key_check(
108                &our_announcement_key,
109                |new_announcement| {
110                    new_announcement.and_then(
111                        |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
112                    )
113                });
114
115            let auto_announcement_delay = if success {
116                Duration::from_secs(SUCCESS_RETRY_SECONDS)
117            } else if is_running_in_test_env() {
118                Duration::from_secs(3)
119            } else {
120                Duration::from_secs(FAILURE_RETRY_SECONDS)
121            };
122
123            select! {
124                _ = new_announcement => {},
125                () = sleep(auto_announcement_delay) => {},
126            }
127        }
128    });
129
130    Ok(())
131}
132
133/// Checks if we already have a signed API endpoint announcement for our own
134/// identity in the database and creates one if not.
135///
136/// Return `true` fresh announcements were inserted because it was not present
137async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) -> bool {
138    let mut dbtx = db.begin_transaction().await;
139    if dbtx
140        .get_value(&ApiAnnouncementKey(cfg.local.identity))
141        .await
142        .is_some()
143    {
144        return false;
145    }
146
147    let api_announcement = ApiAnnouncement::new(
148        cfg.consensus.api_endpoints()[&cfg.local.identity]
149            .url
150            .clone(),
151        0,
152    );
153    let ctx = secp256k1::Secp256k1::new();
154    let signed_announcement =
155        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
156
157    dbtx.insert_entry(
158        &ApiAnnouncementKey(cfg.local.identity),
159        &signed_announcement,
160    )
161    .await;
162    dbtx.commit_tx().await;
163
164    true
165}
166
167/// Returns a list of all peers and their respective API URLs taking into
168/// account guardian metadata and API announcements overwriting the URLs
169/// contained in the original configuration.
170///
171/// Priority order:
172/// 1. Guardian metadata (if available) - uses first URL from api_urls
173/// 2. API announcement (if available)
174/// 3. Configured URL (fallback)
175pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
176    let mut dbtx = db.begin_transaction_nc().await;
177
178    // Load guardian metadata for all peers
179    let guardian_metadata: BTreeMap<PeerId, SignedGuardianMetadata> = dbtx
180        .find_by_prefix(&GuardianMetadataPrefix)
181        .await
182        .map(|(key, metadata)| (key.0, metadata))
183        .collect()
184        .await;
185
186    // Load API announcements for all peers
187    let api_announcements: BTreeMap<PeerId, SignedApiAnnouncement> = dbtx
188        .find_by_prefix(&ApiAnnouncementPrefix)
189        .await
190        .map(|(key, announcement)| (key.0, announcement))
191        .collect()
192        .await;
193
194    // For each peer: prefer guardian metadata, then API announcement, then config
195    cfg.api_endpoints()
196        .iter()
197        .map(|(peer_id, peer_url)| {
198            let url = guardian_metadata
199                .get(peer_id)
200                .and_then(|m| m.guardian_metadata().api_urls.first().cloned())
201                .or_else(|| {
202                    api_announcements
203                        .get(peer_id)
204                        .map(|a| a.api_announcement.api_url.clone())
205                })
206                .unwrap_or_else(|| peer_url.url.clone());
207            (*peer_id, url)
208        })
209        .collect()
210}