fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_connectors::ConnectorRegistry;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::net::api_announcement::{
9    ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
10};
11use fedimint_core::task::{TaskGroup, sleep};
12use fedimint_core::util::{FmtCompact, SafeUrl};
13use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
14use fedimint_logging::LOG_NET_API;
15use futures::stream::StreamExt;
16use tokio::select;
17use tracing::debug;
18
19use crate::config::{ServerConfig, ServerConfigConsensus};
20use crate::db::DbKeyPrefix;
21
22#[derive(Clone, Debug, Encodable, Decodable)]
23pub struct ApiAnnouncementKey(pub PeerId);
24
25#[derive(Clone, Debug, Encodable, Decodable)]
26pub struct ApiAnnouncementPrefix;
27
28impl_db_record!(
29    key = ApiAnnouncementKey,
30    value = SignedApiAnnouncement,
31    db_prefix = DbKeyPrefix::ApiAnnouncements,
32    notify_on_modify = true,
33);
34impl_db_lookup!(
35    key = ApiAnnouncementKey,
36    query_prefix = ApiAnnouncementPrefix
37);
38
39pub async fn start_api_announcement_service(
40    db: &Database,
41    tg: &TaskGroup,
42    cfg: &ServerConfig,
43    api_secret: Option<String>,
44) -> anyhow::Result<()> {
45    const INITIAL_DEALY_SECONDS: u64 = 5;
46    const FAILURE_RETRY_SECONDS: u64 = 60;
47    const SUCCESS_RETRY_SECONDS: u64 = 600;
48
49    insert_signed_api_announcement_if_not_present(db, cfg).await;
50
51    let db = db.clone();
52    // FIXME: (@leonardo) how should we handle the connector here ?
53    let api_client = DynGlobalApi::new(
54        // TODO: get from somewhere/unify?
55        ConnectorRegistry::build_from_server_env()?.bind().await?,
56        get_api_urls(&db, &cfg.consensus).await,
57        api_secret.as_deref(),
58    )?;
59
60    let our_peer_id = cfg.local.identity;
61    tg.spawn_cancellable("submit-api-url-announcement", async move {
62        // Give other servers some time to start up in case they were just restarted
63        // together
64        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
65        loop {
66            let mut success = true;
67            let announcements = db.begin_transaction_nc()
68                .await
69                .find_by_prefix(&ApiAnnouncementPrefix)
70                .await
71                .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
72                .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
73                .await;
74
75            // Announce all peer API URLs we know, but at least our own
76            for (peer, announcement) in announcements {
77                if let Err(err) = api_client
78                    .submit_api_announcement(peer, announcement.clone())
79                    .await {
80                    debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
81                    success = false;
82                }
83            }
84
85            // While we announce all peer API urls, we only want to immediately trigger in case
86            let our_announcement_key = ApiAnnouncementKey(our_peer_id);
87            let our_announcement = db
88                .begin_transaction_nc()
89                .await
90                .get_value(&our_announcement_key)
91                .await
92                .expect("Our announcement is always present");
93            let new_announcement = db.wait_key_check(
94                &our_announcement_key,
95                |new_announcement| {
96                    new_announcement.and_then(
97                        |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
98                    )
99                });
100
101            let auto_announcement_delay = if success {
102                Duration::from_secs(SUCCESS_RETRY_SECONDS)
103            } else {
104                Duration::from_secs(FAILURE_RETRY_SECONDS)
105            };
106
107            select! {
108                _ = new_announcement => {},
109                () = sleep(auto_announcement_delay) => {},
110            }
111        }
112    });
113
114    Ok(())
115}
116
117/// Checks if we already have a signed API endpoint announcement for our own
118/// identity in the database and creates one if not.
119async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
120    let mut dbtx = db.begin_transaction().await;
121    if dbtx
122        .get_value(&ApiAnnouncementKey(cfg.local.identity))
123        .await
124        .is_some()
125    {
126        return;
127    }
128
129    let api_announcement = ApiAnnouncement::new(
130        cfg.consensus.api_endpoints()[&cfg.local.identity]
131            .url
132            .clone(),
133        0,
134    );
135    let ctx = secp256k1::Secp256k1::new();
136    let signed_announcement =
137        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
138
139    dbtx.insert_entry(
140        &ApiAnnouncementKey(cfg.local.identity),
141        &signed_announcement,
142    )
143    .await;
144    dbtx.commit_tx().await;
145}
146
147/// Returns a list of all peers and their respective API URLs taking into
148/// account announcements overwriting the URLs contained in the original
149/// configuration.
150pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
151    override_api_urls(
152        db,
153        cfg.api_endpoints()
154            .iter()
155            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
156        &ApiAnnouncementPrefix,
157        |key| key.0,
158    )
159    .await
160}