fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8    ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::SafeUrl;
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28    key = ApiAnnouncementKey,
29    value = SignedApiAnnouncement,
30    db_prefix = DbKeyPrefix::ApiAnnouncements,
31    notify_on_modify = true,
32);
33impl_db_lookup!(
34    key = ApiAnnouncementKey,
35    query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39    db: &Database,
40    tg: &TaskGroup,
41    cfg: &ServerConfig,
42    api_secret: Option<String>,
43) -> anyhow::Result<()> {
44    const INITIAL_DEALY_SECONDS: u64 = 5;
45    const FAILURE_RETRY_SECONDS: u64 = 60;
46    const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48    insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50    let db = db.clone();
51    // FIXME: (@leonardo) how should we handle the connector here ?
52    let api_client =
53        DynGlobalApi::from_endpoints(get_api_urls(&db, &cfg.consensus).await, &api_secret).await?;
54
55    let our_peer_id = cfg.local.identity;
56    tg.spawn_cancellable("submit-api-url-announcement", async move {
57        // Give other servers some time to start up in case they were just restarted
58        // together
59        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
60        loop {
61            let mut success = true;
62            let announcements = db.begin_transaction_nc()
63                .await
64                .find_by_prefix(&ApiAnnouncementPrefix)
65                .await
66                .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
67                .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
68                .await;
69
70            // Announce all peer API URLs we know, but at least our own
71            for (peer, announcement) in announcements {
72                if let Err(e) = api_client
73                    .submit_api_announcement(peer, announcement.clone())
74                    .await {
75                    debug!(target: LOG_NET_API, ?peer, ?e, "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
76                    success = false;
77                }
78            }
79
80            // While we announce all peer API urls, we only want to immediately trigger in case
81            let our_announcement_key = ApiAnnouncementKey(our_peer_id);
82            let our_announcement = db
83                .begin_transaction_nc()
84                .await
85                .get_value(&our_announcement_key)
86                .await
87                .expect("Our announcement is always present");
88            let new_announcement = db.wait_key_check(
89                &our_announcement_key,
90                |new_announcement| {
91                    new_announcement.and_then(
92                        |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
93                    )
94                });
95
96            let auto_announcement_delay = if success {
97                Duration::from_secs(SUCCESS_RETRY_SECONDS)
98            } else {
99                Duration::from_secs(FAILURE_RETRY_SECONDS)
100            };
101
102            select! {
103                _ = new_announcement => {},
104                () = sleep(auto_announcement_delay) => {},
105            }
106        }
107    });
108
109    Ok(())
110}
111
112/// Checks if we already have a signed API endpoint announcement for our own
113/// identity in the database and creates one if not.
114async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
115    let mut dbtx = db.begin_transaction().await;
116    if dbtx
117        .get_value(&ApiAnnouncementKey(cfg.local.identity))
118        .await
119        .is_some()
120    {
121        return;
122    }
123
124    let api_announcement = ApiAnnouncement::new(
125        cfg.consensus.api_endpoints()[&cfg.local.identity]
126            .url
127            .clone(),
128        0,
129    );
130    let ctx = secp256k1::Secp256k1::new();
131    let signed_announcement =
132        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
133
134    dbtx.insert_entry(
135        &ApiAnnouncementKey(cfg.local.identity),
136        &signed_announcement,
137    )
138    .await;
139    dbtx.commit_tx().await;
140}
141
142/// Returns a list of all peers and their respective API URLs taking into
143/// account announcements overwriting the URLs contained in the original
144/// configuration.
145pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
146    override_api_urls(
147        db,
148        cfg.api_endpoints()
149            .iter()
150            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
151        &ApiAnnouncementPrefix,
152        |key| key.0,
153    )
154    .await
155}