fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8 ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::{FmtCompact, SafeUrl};
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28 key = ApiAnnouncementKey,
29 value = SignedApiAnnouncement,
30 db_prefix = DbKeyPrefix::ApiAnnouncements,
31 notify_on_modify = true,
32);
33impl_db_lookup!(
34 key = ApiAnnouncementKey,
35 query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39 db: &Database,
40 tg: &TaskGroup,
41 cfg: &ServerConfig,
42 api_secret: Option<String>,
43) -> anyhow::Result<()> {
44 const INITIAL_DEALY_SECONDS: u64 = 5;
45 const FAILURE_RETRY_SECONDS: u64 = 60;
46 const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48 insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50 let db = db.clone();
51 let api_client = DynGlobalApi::from_endpoints(
53 get_api_urls(&db, &cfg.consensus).await,
54 &api_secret,
55 true,
56 true,
57 )
58 .await?;
59
60 let our_peer_id = cfg.local.identity;
61 tg.spawn_cancellable("submit-api-url-announcement", async move {
62 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
65 loop {
66 let mut success = true;
67 let announcements = db.begin_transaction_nc()
68 .await
69 .find_by_prefix(&ApiAnnouncementPrefix)
70 .await
71 .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
72 .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
73 .await;
74
75 for (peer, announcement) in announcements {
77 if let Err(err) = api_client
78 .submit_api_announcement(peer, announcement.clone())
79 .await {
80 debug!(target: LOG_NET_API, ?peer, err = %err.fmt_compact(), "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
81 success = false;
82 }
83 }
84
85 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
87 let our_announcement = db
88 .begin_transaction_nc()
89 .await
90 .get_value(&our_announcement_key)
91 .await
92 .expect("Our announcement is always present");
93 let new_announcement = db.wait_key_check(
94 &our_announcement_key,
95 |new_announcement| {
96 new_announcement.and_then(
97 |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
98 )
99 });
100
101 let auto_announcement_delay = if success {
102 Duration::from_secs(SUCCESS_RETRY_SECONDS)
103 } else {
104 Duration::from_secs(FAILURE_RETRY_SECONDS)
105 };
106
107 select! {
108 _ = new_announcement => {},
109 () = sleep(auto_announcement_delay) => {},
110 }
111 }
112 });
113
114 Ok(())
115}
116
117async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
120 let mut dbtx = db.begin_transaction().await;
121 if dbtx
122 .get_value(&ApiAnnouncementKey(cfg.local.identity))
123 .await
124 .is_some()
125 {
126 return;
127 }
128
129 let api_announcement = ApiAnnouncement::new(
130 cfg.consensus.api_endpoints()[&cfg.local.identity]
131 .url
132 .clone(),
133 0,
134 );
135 let ctx = secp256k1::Secp256k1::new();
136 let signed_announcement =
137 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
138
139 dbtx.insert_entry(
140 &ApiAnnouncementKey(cfg.local.identity),
141 &signed_announcement,
142 )
143 .await;
144 dbtx.commit_tx().await;
145}
146
147pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
151 override_api_urls(
152 db,
153 cfg.api_endpoints()
154 .iter()
155 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
156 &ApiAnnouncementPrefix,
157 |key| key.0,
158 )
159 .await
160}