fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_connectors::ConnectorRegistry;
6use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::envs::is_running_in_test_env;
9use fedimint_core::net::api_announcement::{ApiAnnouncement, SignedApiAnnouncement};
10use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
11use fedimint_core::task::{TaskGroup, sleep};
12use fedimint_core::util::{FmtCompact, SafeUrl};
13use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
14use fedimint_logging::LOG_NET_API;
15use futures::future::join_all;
16use futures::stream::StreamExt;
17use tokio::select;
18use tracing::debug;
19
20use super::guardian_metadata::GuardianMetadataPrefix;
21use crate::config::{ServerConfig, ServerConfigConsensus};
22use crate::db::DbKeyPrefix;
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementKey(pub PeerId);
26
27#[derive(Clone, Debug, Encodable, Decodable)]
28pub struct ApiAnnouncementPrefix;
29
30impl_db_record!(
31 key = ApiAnnouncementKey,
32 value = SignedApiAnnouncement,
33 db_prefix = DbKeyPrefix::ApiAnnouncements,
34 notify_on_modify = true,
35);
36impl_db_lookup!(
37 key = ApiAnnouncementKey,
38 query_prefix = ApiAnnouncementPrefix
39);
40
41pub async fn start_api_announcement_service(
42 db: &Database,
43 tg: &TaskGroup,
44 cfg: &ServerConfig,
45 api_secret: Option<String>,
46) -> anyhow::Result<()> {
47 const INITIAL_DEALY_SECONDS: u64 = 5;
48 const FAILURE_RETRY_SECONDS: u64 = 60;
49 const SUCCESS_RETRY_SECONDS: u64 = 600;
50
51 let initial_delay = if insert_signed_api_announcement_if_not_present(db, cfg).await {
52 Duration::ZERO
53 } else {
54 Duration::from_secs(INITIAL_DEALY_SECONDS)
55 };
56
57 let db = db.clone();
58 let api_client = DynGlobalApi::new(
60 ConnectorRegistry::build_from_server_env()?.bind().await?,
62 get_api_urls(&db, &cfg.consensus).await,
63 api_secret.as_deref(),
64 )?;
65
66 let our_peer_id = cfg.local.identity;
67 tg.spawn_cancellable("submit-api-url-announcement", async move {
68 sleep(initial_delay).await;
71 loop {
72 let mut success = true;
73 let announcements = db.begin_transaction_nc()
74 .await
75 .find_by_prefix(&ApiAnnouncementPrefix)
76 .await
77 .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
78 .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
79 .await;
80
81 let results = join_all(announcements.iter().map(|(peer, announcement)| {
85 let api_client = &api_client;
86 async move {
87 (*peer, api_client.submit_api_announcement(*peer, announcement.clone()).await)
88 }
89 }))
90 .await;
91
92 for (peer, result) in results {
93 if let Err(err) = result {
94 debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
95 success = false;
96 }
97 }
98
99 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
101 let our_announcement = db
102 .begin_transaction_nc()
103 .await
104 .get_value(&our_announcement_key)
105 .await
106 .expect("Our announcement is always present");
107 let new_announcement = db.wait_key_check(
108 &our_announcement_key,
109 |new_announcement| {
110 new_announcement.and_then(
111 |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
112 )
113 });
114
115 let auto_announcement_delay = if success {
116 Duration::from_secs(SUCCESS_RETRY_SECONDS)
117 } else if is_running_in_test_env() {
118 Duration::from_secs(3)
119 } else {
120 Duration::from_secs(FAILURE_RETRY_SECONDS)
121 };
122
123 select! {
124 _ = new_announcement => {},
125 () = sleep(auto_announcement_delay) => {},
126 }
127 }
128 });
129
130 Ok(())
131}
132
133async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) -> bool {
138 let mut dbtx = db.begin_transaction().await;
139 if dbtx
140 .get_value(&ApiAnnouncementKey(cfg.local.identity))
141 .await
142 .is_some()
143 {
144 return false;
145 }
146
147 let api_announcement = ApiAnnouncement::new(
148 cfg.consensus.api_endpoints()[&cfg.local.identity]
149 .url
150 .clone(),
151 0,
152 );
153 let ctx = secp256k1::Secp256k1::new();
154 let signed_announcement =
155 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
156
157 dbtx.insert_entry(
158 &ApiAnnouncementKey(cfg.local.identity),
159 &signed_announcement,
160 )
161 .await;
162 dbtx.commit_tx().await;
163
164 true
165}
166
167pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
176 let mut dbtx = db.begin_transaction_nc().await;
177
178 let guardian_metadata: BTreeMap<PeerId, SignedGuardianMetadata> = dbtx
180 .find_by_prefix(&GuardianMetadataPrefix)
181 .await
182 .map(|(key, metadata)| (key.0, metadata))
183 .collect()
184 .await;
185
186 let api_announcements: BTreeMap<PeerId, SignedApiAnnouncement> = dbtx
188 .find_by_prefix(&ApiAnnouncementPrefix)
189 .await
190 .map(|(key, announcement)| (key.0, announcement))
191 .collect()
192 .await;
193
194 cfg.api_endpoints()
196 .iter()
197 .map(|(peer_id, peer_url)| {
198 let url = guardian_metadata
199 .get(peer_id)
200 .and_then(|m| m.guardian_metadata().api_urls.first().cloned())
201 .or_else(|| {
202 api_announcements
203 .get(peer_id)
204 .map(|a| a.api_announcement.api_url.clone())
205 })
206 .unwrap_or_else(|| peer_url.url.clone());
207 (*peer_id, url)
208 })
209 .collect()
210}