fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8    ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::SafeUrl;
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use tokio::select;
15use tracing::debug;
16
17use crate::config::{ServerConfig, ServerConfigConsensus};
18use crate::db::DbKeyPrefix;
19
20#[derive(Clone, Debug, Encodable, Decodable)]
21pub struct ApiAnnouncementKey(pub PeerId);
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementPrefix;
25
26impl_db_record!(
27    key = ApiAnnouncementKey,
28    value = SignedApiAnnouncement,
29    db_prefix = DbKeyPrefix::ApiAnnouncements,
30    notify_on_modify = true,
31);
32impl_db_lookup!(
33    key = ApiAnnouncementKey,
34    query_prefix = ApiAnnouncementPrefix
35);
36
37pub async fn start_api_announcement_service(
38    db: &Database,
39    tg: &TaskGroup,
40    cfg: &ServerConfig,
41    api_secret: Option<String>,
42) -> anyhow::Result<()> {
43    const INITIAL_DEALY_SECONDS: u64 = 5;
44    const FAILURE_RETRY_SECONDS: u64 = 60;
45    const SUCCESS_RETRY_SECONDS: u64 = 600;
46
47    insert_signed_api_announcement_if_not_present(db, cfg).await;
48
49    let db = db.clone();
50    // FIXME: (@leonardo) how should we handle the connector here ?
51    let api_client =
52        DynGlobalApi::from_endpoints(get_api_urls(&db, &cfg.consensus).await, &api_secret).await?;
53
54    let our_peer_id = cfg.local.identity;
55    tg.spawn_cancellable("submit-api-url-announcement", async move {
56        // Give other servers some time to start up in case they were just restarted
57        // together
58        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
59        loop {
60            let announcement = db.begin_transaction_nc()
61                .await
62                .get_value(&ApiAnnouncementKey(our_peer_id))
63                .await
64                .expect("Our own API announcement should be present in the database");
65
66            if let Err(e) = api_client
67                .submit_api_announcement(our_peer_id, announcement.clone())
68                .await {
69                debug!(target: LOG_NET_API, ?e, "Announcing our API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
70                sleep(Duration::from_secs(FAILURE_RETRY_SECONDS)).await;
71            } else {
72                let our_announcement_key = ApiAnnouncementKey(our_peer_id);
73                let new_announcement = db.wait_key_check(
74                    &our_announcement_key,
75                    |new_announcement| {
76                        new_announcement.and_then(
77                            |new_announcement| (new_announcement.api_announcement.nonce != announcement.api_announcement.nonce).then_some(())
78                        )
79                    });
80
81                select! {
82                    _ = new_announcement => {},
83                    () = sleep(Duration::from_secs(SUCCESS_RETRY_SECONDS)) => {},
84                }
85            }
86        }
87    });
88
89    Ok(())
90}
91
92/// Checks if we already have a signed API endpoint announcement for our own
93/// identity in the database and creates one if not.
94async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
95    let mut dbtx = db.begin_transaction().await;
96    if dbtx
97        .get_value(&ApiAnnouncementKey(cfg.local.identity))
98        .await
99        .is_some()
100    {
101        return;
102    }
103
104    let api_announcement = ApiAnnouncement::new(
105        cfg.consensus.api_endpoints()[&cfg.local.identity]
106            .url
107            .clone(),
108        0,
109    );
110    let ctx = secp256k1::Secp256k1::new();
111    let signed_announcement =
112        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
113
114    dbtx.insert_entry(
115        &ApiAnnouncementKey(cfg.local.identity),
116        &signed_announcement,
117    )
118    .await;
119    dbtx.commit_tx().await;
120}
121
122/// Returns a list of all peers and their respective API URLs taking into
123/// account announcements overwriting the URLs contained in the original
124/// configuration.
125pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
126    override_api_urls(
127        db,
128        cfg.api_endpoints()
129            .iter()
130            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
131        &ApiAnnouncementPrefix,
132        |key| key.0,
133    )
134    .await
135}