fedimint_server/net/api/
announcement.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::{ConnectorRegistry, DynGlobalApi};
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8    ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::{FmtCompact, SafeUrl};
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28    key = ApiAnnouncementKey,
29    value = SignedApiAnnouncement,
30    db_prefix = DbKeyPrefix::ApiAnnouncements,
31    notify_on_modify = true,
32);
33impl_db_lookup!(
34    key = ApiAnnouncementKey,
35    query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39    db: &Database,
40    tg: &TaskGroup,
41    cfg: &ServerConfig,
42    api_secret: Option<String>,
43) -> anyhow::Result<()> {
44    const INITIAL_DEALY_SECONDS: u64 = 5;
45    const FAILURE_RETRY_SECONDS: u64 = 60;
46    const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48    insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50    let db = db.clone();
51    // FIXME: (@leonardo) how should we handle the connector here ?
52    let api_client = DynGlobalApi::new(
53        // TODO: get from somewhere/unify?
54        ConnectorRegistry::build_from_server_env()?.bind().await?,
55        get_api_urls(&db, &cfg.consensus).await,
56        api_secret.as_deref(),
57    )?;
58
59    let our_peer_id = cfg.local.identity;
60    tg.spawn_cancellable("submit-api-url-announcement", async move {
61        // Give other servers some time to start up in case they were just restarted
62        // together
63        sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
64        loop {
65            let mut success = true;
66            let announcements = db.begin_transaction_nc()
67                .await
68                .find_by_prefix(&ApiAnnouncementPrefix)
69                .await
70                .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
71                .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
72                .await;
73
74            // Announce all peer API URLs we know, but at least our own
75            for (peer, announcement) in announcements {
76                if let Err(err) = api_client
77                    .submit_api_announcement(peer, announcement.clone())
78                    .await {
79                    debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
80                    success = false;
81                }
82            }
83
84            // While we announce all peer API urls, we only want to immediately trigger in case
85            let our_announcement_key = ApiAnnouncementKey(our_peer_id);
86            let our_announcement = db
87                .begin_transaction_nc()
88                .await
89                .get_value(&our_announcement_key)
90                .await
91                .expect("Our announcement is always present");
92            let new_announcement = db.wait_key_check(
93                &our_announcement_key,
94                |new_announcement| {
95                    new_announcement.and_then(
96                        |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
97                    )
98                });
99
100            let auto_announcement_delay = if success {
101                Duration::from_secs(SUCCESS_RETRY_SECONDS)
102            } else {
103                Duration::from_secs(FAILURE_RETRY_SECONDS)
104            };
105
106            select! {
107                _ = new_announcement => {},
108                () = sleep(auto_announcement_delay) => {},
109            }
110        }
111    });
112
113    Ok(())
114}
115
116/// Checks if we already have a signed API endpoint announcement for our own
117/// identity in the database and creates one if not.
118async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
119    let mut dbtx = db.begin_transaction().await;
120    if dbtx
121        .get_value(&ApiAnnouncementKey(cfg.local.identity))
122        .await
123        .is_some()
124    {
125        return;
126    }
127
128    let api_announcement = ApiAnnouncement::new(
129        cfg.consensus.api_endpoints()[&cfg.local.identity]
130            .url
131            .clone(),
132        0,
133    );
134    let ctx = secp256k1::Secp256k1::new();
135    let signed_announcement =
136        api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
137
138    dbtx.insert_entry(
139        &ApiAnnouncementKey(cfg.local.identity),
140        &signed_announcement,
141    )
142    .await;
143    dbtx.commit_tx().await;
144}
145
146/// Returns a list of all peers and their respective API URLs taking into
147/// account announcements overwriting the URLs contained in the original
148/// configuration.
149pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
150    override_api_urls(
151        db,
152        cfg.api_endpoints()
153            .iter()
154            .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
155        &ApiAnnouncementPrefix,
156        |key| key.0,
157    )
158    .await
159}