fedimint_server/net/api/
announcement.rs
1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use fedimint_api_client::api::DynGlobalApi;
5use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::net::api_announcement::{
8 ApiAnnouncement, SignedApiAnnouncement, override_api_urls,
9};
10use fedimint_core::task::{TaskGroup, sleep};
11use fedimint_core::util::SafeUrl;
12use fedimint_core::{PeerId, impl_db_lookup, impl_db_record, secp256k1};
13use fedimint_logging::LOG_NET_API;
14use futures::stream::StreamExt;
15use tokio::select;
16use tracing::debug;
17
18use crate::config::{ServerConfig, ServerConfigConsensus};
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct ApiAnnouncementKey(pub PeerId);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct ApiAnnouncementPrefix;
26
27impl_db_record!(
28 key = ApiAnnouncementKey,
29 value = SignedApiAnnouncement,
30 db_prefix = DbKeyPrefix::ApiAnnouncements,
31 notify_on_modify = true,
32);
33impl_db_lookup!(
34 key = ApiAnnouncementKey,
35 query_prefix = ApiAnnouncementPrefix
36);
37
38pub async fn start_api_announcement_service(
39 db: &Database,
40 tg: &TaskGroup,
41 cfg: &ServerConfig,
42 api_secret: Option<String>,
43) -> anyhow::Result<()> {
44 const INITIAL_DEALY_SECONDS: u64 = 5;
45 const FAILURE_RETRY_SECONDS: u64 = 60;
46 const SUCCESS_RETRY_SECONDS: u64 = 600;
47
48 insert_signed_api_announcement_if_not_present(db, cfg).await;
49
50 let db = db.clone();
51 let api_client =
53 DynGlobalApi::from_endpoints(get_api_urls(&db, &cfg.consensus).await, &api_secret).await?;
54
55 let our_peer_id = cfg.local.identity;
56 tg.spawn_cancellable("submit-api-url-announcement", async move {
57 sleep(Duration::from_secs(INITIAL_DEALY_SECONDS)).await;
60 loop {
61 let mut success = true;
62 let announcements = db.begin_transaction_nc()
63 .await
64 .find_by_prefix(&ApiAnnouncementPrefix)
65 .await
66 .map(|(peer_key, peer_announcement)| (peer_key.0, peer_announcement))
67 .collect::<Vec<(PeerId, SignedApiAnnouncement)>>()
68 .await;
69
70 for (peer, announcement) in announcements {
72 if let Err(e) = api_client
73 .submit_api_announcement(peer, announcement.clone())
74 .await {
75 debug!(target: LOG_NET_API, ?peer, ?e, "Announcing API URL did not succeed for all peers, retrying in {FAILURE_RETRY_SECONDS} seconds");
76 success = false;
77 }
78 }
79
80 let our_announcement_key = ApiAnnouncementKey(our_peer_id);
82 let our_announcement = db
83 .begin_transaction_nc()
84 .await
85 .get_value(&our_announcement_key)
86 .await
87 .expect("Our announcement is always present");
88 let new_announcement = db.wait_key_check(
89 &our_announcement_key,
90 |new_announcement| {
91 new_announcement.and_then(
92 |new_announcement| (new_announcement.api_announcement.nonce != our_announcement.api_announcement.nonce).then_some(())
93 )
94 });
95
96 let auto_announcement_delay = if success {
97 Duration::from_secs(SUCCESS_RETRY_SECONDS)
98 } else {
99 Duration::from_secs(FAILURE_RETRY_SECONDS)
100 };
101
102 select! {
103 _ = new_announcement => {},
104 () = sleep(auto_announcement_delay) => {},
105 }
106 }
107 });
108
109 Ok(())
110}
111
112async fn insert_signed_api_announcement_if_not_present(db: &Database, cfg: &ServerConfig) {
115 let mut dbtx = db.begin_transaction().await;
116 if dbtx
117 .get_value(&ApiAnnouncementKey(cfg.local.identity))
118 .await
119 .is_some()
120 {
121 return;
122 }
123
124 let api_announcement = ApiAnnouncement::new(
125 cfg.consensus.api_endpoints()[&cfg.local.identity]
126 .url
127 .clone(),
128 0,
129 );
130 let ctx = secp256k1::Secp256k1::new();
131 let signed_announcement =
132 api_announcement.sign(&ctx, &cfg.private.broadcast_secret_key.keypair(&ctx));
133
134 dbtx.insert_entry(
135 &ApiAnnouncementKey(cfg.local.identity),
136 &signed_announcement,
137 )
138 .await;
139 dbtx.commit_tx().await;
140}
141
142pub async fn get_api_urls(db: &Database, cfg: &ServerConfigConsensus) -> BTreeMap<PeerId, SafeUrl> {
146 override_api_urls(
147 db,
148 cfg.api_endpoints()
149 .iter()
150 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
151 &ApiAnnouncementPrefix,
152 |key| key.0,
153 )
154 .await
155}