fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::{ConnectorRegistry, DynGlobalApi};
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8 ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::{FmtCompact, SafeUrl};
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28 key = ApiAnnouncementKey,
29 value = SignedApiAnnouncement,
30 db_prefix = DbKeyPrefix::ApiAnnouncements,
31 notify_on_modify = true,
32);
33impl_db_lookup!(
34 key = ApiAnnouncementKey,
35 query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39 db: &Database,
40 tg: &TaskGroup,
41 cfg: &ServerConfig,
42 api_secret: Option<String>,
43) -> anyhow::Result<()> {
44 const INITIAL_DEALY_SECONDS: u64 = 5;
45 const FAILURE_RETRY_SECONDS: u64 = 60;
46 const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48 insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50 let db = db.clone();
51 let api_client = DynGlobalApi::new(
53 ConnectorRegistry::build_from_server_env()?.bind().await?,
55 get_api_urls(&db, &cfg.consensus).await,
56 api_secret.as_deref(),
57 )?;
58
59 let our_peer_id = cfg.local.identity;
60 tg.spawn_cancellable("submit-api-url-announcement", async move {
61 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
64 loop {
65 let mut success = true;
66 let announcements = db.begin_transaction_nc()
67 .await
68 .find_by_prefix(&ApiAnnouncementPrefix)
69 .await
70 .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
71 .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
72 .await;
73
74 for (peer, announcement) in announcements {
76 if let Err(err) = api_client
77 .submit_api_announcement(peer, announcement.clone())
78 .await {
79 debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
80 success = false;
81 }
82 }
83
84 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
86 let our_announcement = db
87 .begin_transaction_nc()
88 .await
89 .get_value(&our_announcement_key)
90 .await
91 .expect("Our announcement is always present");
92 let new_announcement = db.wait_key_check(
93 &our_announcement_key,
94 |new_announcement| {
95 new_announcement.and_then(
96 |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
97 )
98 });
99
100 let auto_announcement_delay = if success {
101 Duration::from_secs(SUCCESS_RETRY_SECONDS)
102 } else {
103 Duration::from_secs(FAILURE_RETRY_SECONDS)
104 };
105
106 select! {
107 _ = new_announcement => {},
108 () = sleep(auto_announcement_delay) => {},
109 }
110 }
111 });
112
113 Ok(())
114}
115
116async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
119 let mut dbtx = db.begin_transaction().await;
120 if dbtx
121 .get_value(&ApiAnnouncementKey(cfg.local.identity))
122 .await
123 .is_some()
124 {
125 return;
126 }
127
128 let api_announcement = ApiAnnouncement::new(
129 cfg.consensus.api_endpoints()[&cfg.local.identity]
130 .url
131 .clone(),
132 0,
133 );
134 let ctx = secp256k1::Secp256k1::new();
135 let signed_announcement =
136 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
137
138 dbtx.insert_entry(
139 &ApiAnnouncementKey(cfg.local.identity),
140 &signed_announcement,
141 )
142 .await;
143 dbtx.commit_tx().await;
144}
145
146pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
150 override_api_urls(
151 db,
152 cfg.api_endpoints()
153 .iter()
154 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
155 &ApiAnnouncementPrefix,
156 |key| key.0,
157 )
158 .await
159}