fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8    ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::{FmtCompact, SafeUrl};
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28    key = ApiAnnouncementKey,
29    value = SignedApiAnnouncement,
30    db_prefix = DbKeyPrefix::ApiAnnouncements,
31    notify_on_modify = true,
32);
33impl_db_lookup!(
34    key = ApiAnnouncementKey,
35    query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39    db: &Database,
40    tg: &TaskGroup,
41    cfg: &ServerConfig,
42    api_secret: Option<String>,
43) -> anyhow::Result<()> {
44    const INITIAL_DEALY_SECONDS: u64 = 5;
45    const FAILURE_RETRY_SECONDS: u64 = 60;
46    const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48    insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50    let db = db.clone();
51    // FIXME: (@leonardo) how should we handle the connector here ?
52    let api_client = DynGlobalApi::from_endpoints(
53        get_api_urls(&db, &cfg.consensus).await,
54        &api_secret,
55        true,
56        true,
57    )
58    .await?;
59
60    let our_peer_id = cfg.local.identity;
61    tg.spawn_cancellable("submit-api-url-announcement", async move {
62        // Give other servers some time to start up in case they were just restarted
63        // together
64        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
65        loop {
66            let mut success = true;
67            let announcements = db.begin_transaction_nc()
68                .await
69                .find_by_prefix(&ApiAnnouncementPrefix)
70                .await
71                .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
72                .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
73                .await;
74
75            // Announce all peer API URLs we know, but at least our own
76            for (peer, announcement) in announcements {
77                if let Err(err) = api_client
78                    .submit_api_announcement(peer, announcement.clone())
79                    .await {
80                    debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
81                    success = false;
82                }
83            }
84
85            // While we announce all peer API urls, we only want to immediately trigger in case
86            let our_announcement_key = ApiAnnouncementKey(our_peer_id);
87            let our_announcement = db
88                .begin_transaction_nc()
89                .await
90                .get_value(&our_announcement_key)
91                .await
92                .expect("Our announcement is always present");
93            let new_announcement = db.wait_key_check(
94                &our_announcement_key,
95                |new_announcement| {
96                    new_announcement.and_then(
97                        |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
98                    )
99                });
100
101            let auto_announcement_delay = if success {
102                Duration::from_secs(SUCCESS_RETRY_SECONDS)
103            } else {
104                Duration::from_secs(FAILURE_RETRY_SECONDS)
105            };
106
107            select! {
108                _ = new_announcement => {},
109                () = sleep(auto_announcement_delay) => {},
110            }
111        }
112    });
113
114    Ok(())
115}
116
117/// Checks if we already have a signed API endpoint announcement for our own
118/// identity in the database and creates one if not.
119async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
120    let mut dbtx = db.begin_transaction().await;
121    if dbtx
122        .get_value(&ApiAnnouncementKey(cfg.local.identity))
123        .await
124        .is_some()
125    {
126        return;
127    }
128
129    let api_announcement = ApiAnnouncement::new(
130        cfg.consensus.api_endpoints()[&cfg.local.identity]
131            .url
132            .clone(),
133        0,
134    );
135    let ctx = secp256k1::Secp256k1::new();
136    let signed_announcement =
137        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
138
139    dbtx.insert_entry(
140        &ApiAnnouncementKey(cfg.local.identity),
141        &signed_announcement,
142    )
143    .await;
144    dbtx.commit_tx().await;
145}
146
147/// Returns a list of all peers and their respective API URLs taking into
148/// account announcements overwriting the URLs contained in the original
149/// configuration.
150pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
151    override_api_urls(
152        db,
153        cfg.api_endpoints()
154            .iter()
155            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
156        &ApiAnnouncementPrefix,
157        |key| key.0,
158    )
159    .await
160}