fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_connectors::ConnectorRegistry;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::net::api_announcement::{
9 ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
10};
11use fedimint_core::task::{TaskGroup, sleep};
12use fedimint_core::util::{FmtCompact, SafeUrl};
13use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
14use fedimint_logging::LOG_NET_API;
15use futures::stream::StreamExt;
16use tokio::select;
17use tracing::debug;
18
19use crate::config::{ServerConfig, ServerConfigConsensus};
20use crate::db::DbKeyPrefix;
21
22#[derive(Clone, Debug, Encodable, Decodable)]
23pub struct ApiAnnouncementKey(pub PeerId);
24
25#[derive(Clone, Debug, Encodable, Decodable)]
26pub struct ApiAnnouncementPrefix;
27
28impl_db_record!(
29 key = ApiAnnouncementKey,
30 value = SignedApiAnnouncement,
31 db_prefix = DbKeyPrefix::ApiAnnouncements,
32 notify_on_modify = true,
33);
34impl_db_lookup!(
35 key = ApiAnnouncementKey,
36 query_prefix = ApiAnnouncementPrefix
37);
38
39pub async fn start_api_announcement_service(
40 db: &Database,
41 tg: &TaskGroup,
42 cfg: &ServerConfig,
43 api_secret: Option<String>,
44) -> anyhow::Result<()> {
45 const INITIAL_DEALY_SECONDS: u64 = 5;
46 const FAILURE_RETRY_SECONDS: u64 = 60;
47 const SUCCESS_RETRY_SECONDS: u64 = 600;
48
49 insert_signed_api_announcement_if_not_present(db, cfg).await;
50
51 let db = db.clone();
52 let api_client = DynGlobalApi::new(
54 ConnectorRegistry::build_from_server_env()?.bind().await?,
56 get_api_urls(&db, &cfg.consensus).await,
57 api_secret.as_deref(),
58 )?;
59
60 let our_peer_id = cfg.local.identity;
61 tg.spawn_cancellable("submit-api-url-announcement", async move {
62 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
65 loop {
66 let mut success = true;
67 let announcements = db.begin_transaction_nc()
68 .await
69 .find_by_prefix(&ApiAnnouncementPrefix)
70 .await
71 .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
72 .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
73 .await;
74
75 for (peer, announcement) in announcements {
77 if let Err(err) = api_client
78 .submit_api_announcement(peer, announcement.clone())
79 .await {
80 debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
81 success = false;
82 }
83 }
84
85 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
87 let our_announcement = db
88 .begin_transaction_nc()
89 .await
90 .get_value(&our_announcement_key)
91 .await
92 .expect("Our announcement is always present");
93 let new_announcement = db.wait_key_check(
94 &our_announcement_key,
95 |new_announcement| {
96 new_announcement.and_then(
97 |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
98 )
99 });
100
101 let auto_announcement_delay = if success {
102 Duration::from_secs(SUCCESS_RETRY_SECONDS)
103 } else {
104 Duration::from_secs(FAILURE_RETRY_SECONDS)
105 };
106
107 select! {
108 _ = new_announcement => {},
109 () = sleep(auto_announcement_delay) => {},
110 }
111 }
112 });
113
114 Ok(())
115}
116
117async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
120 let mut dbtx = db.begin_transaction().await;
121 if dbtx
122 .get_value(&ApiAnnouncementKey(cfg.local.identity))
123 .await
124 .is_some()
125 {
126 return;
127 }
128
129 let api_announcement = ApiAnnouncement::new(
130 cfg.consensus.api_endpoints()[&cfg.local.identity]
131 .url
132 .clone(),
133 0,
134 );
135 let ctx = secp256k1::Secp256k1::new();
136 let signed_announcement =
137 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
138
139 dbtx.insert_entry(
140 &ApiAnnouncementKey(cfg.local.identity),
141 &signed_announcement,
142 )
143 .await;
144 dbtx.commit_tx().await;
145}
146
147pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
151 override_api_urls(
152 db,
153 cfg.api_endpoints()
154 .iter()
155 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
156 &ApiAnnouncementPrefix,
157 |key| key.0,
158 )
159 .await
160}