fedimint_server/net/api/
announcement.rs1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8 ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::SafeUrl;
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use tokio::select;
15use tracing::debug;
16
17use crate::config::{ServerConfig, ServerConfigConsensus};
18use crate::db::DbKeyPrefix;
19
20#[derive(Clone, Debug, Encodable, Decodable)]
21pub struct ApiAnnouncementKey(pub PeerId);
22
23#[derive(Clone, Debug, Encodable, Decodable)]
24pub struct ApiAnnouncementPrefix;
25
26impl_db_record!(
27 key = ApiAnnouncementKey,
28 value = SignedApiAnnouncement,
29 db_prefix = DbKeyPrefix::ApiAnnouncements,
30 notify_on_modify = true,
31);
32impl_db_lookup!(
33 key = ApiAnnouncementKey,
34 query_prefix = ApiAnnouncementPrefix
35);
36
37pub async fn start_api_announcement_service(
38 db: &Database,
39 tg: &TaskGroup,
40 cfg: &ServerConfig,
41 api_secret: Option<String>,
42) -> anyhow::Result<()> {
43 const INITIAL_DEALY_SECONDS: u64 = 5;
44 const FAILURE_RETRY_SECONDS: u64 = 60;
45 const SUCCESS_RETRY_SECONDS: u64 = 600;
46
47 insert_signed_api_announcement_if_not_present(db, cfg).await;
48
49 let db = db.clone();
50 let api_client =
52 DynGlobalApi::from_endpoints(get_api_urls(&db, &cfg.consensus).await, &api_secret).await?;
53
54 let our_peer_id = cfg.local.identity;
55 tg.spawn_cancellable("submit-api-url-announcement", async move {
56 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
59 loop {
60 let announcement = db.begin_transaction_nc()
61 .await
62 .get_value(&ApiAnnouncementKey(our_peer_id))
63 .await
64 .expect("Our own API announcement should be present in the database");
65
66 if let Err(e) = api_client
67 .submit_api_announcement(our_peer_id, announcement.clone())
68 .await {
69 debug!(target: LOG_NET_API, ?e, "Announcing our API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
70 sleep(Duration::from_secs(FAILURE_RETRY_SECONDS)).await;
71 } else {
72 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
73 let new_announcement = db.wait_key_check(
74 &our_announcement_key,
75 |new_announcement| {
76 new_announcement.and_then(
77 |new_announcement| (new_announcement.api_announcement.nonce != announcement.api_announcement.nonce).then_some(())
78 )
79 });
80
81 select! {
82 _ = new_announcement => {},
83 () = sleep(Duration::from_secs(SUCCESS_RETRY_SECONDS)) => {},
84 }
85 }
86 }
87 });
88
89 Ok(())
90}
91
92async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
95 let mut dbtx = db.begin_transaction().await;
96 if dbtx
97 .get_value(&ApiAnnouncementKey(cfg.local.identity))
98 .await
99 .is_some()
100 {
101 return;
102 }
103
104 let api_announcement = ApiAnnouncement::new(
105 cfg.consensus.api_endpoints()[&cfg.local.identity]
106 .url
107 .clone(),
108 0,
109 );
110 let ctx = secp256k1::Secp256k1::new();
111 let signed_announcement =
112 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
113
114 dbtx.insert_entry(
115 &ApiAnnouncementKey(cfg.local.identity),
116 &signed_announcement,
117 )
118 .await;
119 dbtx.commit_tx().await;
120}
121
122pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
126 override_api_urls(
127 db,
128 cfg.api_endpoints()
129 .iter()
130 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
131 &ApiAnnouncementPrefix,
132 |key| key.0,
133 )
134 .await
135}