1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::invite_code::InviteCode;
11use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
12use fedimint_core::{PeerId, TieredCounts};
13use fedimint_gateway_common::FederationInfo;
14use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
15use fedimint_gw_client::GatewayClientModule;
16use fedimint_gwv2_client::GatewayClientModuleV2;
17use fedimint_logging::LOG_GATEWAY;
18use fedimint_mint_client::MintClientModule;
19use tracing::{info, warn};
20
21use crate::error::{AdminGatewayError, FederationNotConnected};
22use crate::{AdminResult, Registration};
23
24const INITIAL_INDEX: u64 = 1;
28
29#[derive(Debug)]
31pub struct FederationManager {
32 clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
35
36 index_to_federation: BTreeMap<u64, FederationId>,
40
41 next_index: AtomicU64,
45}
46
47impl FederationManager {
48 pub fn new() -> Self {
49 Self {
50 clients: BTreeMap::new(),
51 index_to_federation: BTreeMap::new(),
52 next_index: AtomicU64::new(INITIAL_INDEX),
53 }
54 }
55
56 pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
57 let federation_id = client.borrow().with_sync(|c| c.federation_id());
58 self.clients.insert(federation_id, client);
59 self.index_to_federation.insert(index, federation_id);
60 }
61
62 pub async fn leave_federation(
63 &mut self,
64 federation_id: FederationId,
65 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
66 registrations: Vec<&Registration>,
67 ) -> AdminResult<FederationInfo> {
68 let federation_info = self.federation_info(federation_id, dbtx).await?;
69
70 for registration in registrations {
71 self.unannounce_from_federation(federation_id, registration.keypair)
72 .await;
73 }
74
75 self.remove_client(federation_id).await?;
76
77 Ok(federation_info)
78 }
79
80 async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
81 let client = self
82 .clients
83 .remove(&federation_id)
84 .ok_or(FederationNotConnected {
85 federation_id_prefix: federation_id.to_prefix(),
86 })?
87 .into_value();
88
89 self.index_to_federation
90 .retain(|_, fid| *fid != federation_id);
91
92 match Arc::into_inner(client) {
93 Some(client) => {
94 client.shutdown().await;
95 Ok(())
96 }
97 _ => Err(AdminGatewayError::ClientRemovalError(format!(
98 "Federation client {federation_id} is not unique, failed to shutdown client"
99 ))),
100 }
101 }
102
103 pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
106 for client in self.clients.values() {
107 let active_operations = client.value().get_active_operations().await;
108 let operation_log = client.value().operation_log();
109 for op_id in active_operations {
110 let log_entry = operation_log.get_operation(op_id).await;
111 if let Some(entry) = log_entry {
112 match entry.operation_module_kind() {
113 "lnv2" => {
114 let lnv2 =
115 client.value().get_first_module::<GatewayClientModuleV2>()?;
116 lnv2.await_completion(op_id).await;
117 }
118 "ln" => {
119 let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
120 lnv1.await_completion(op_id).await;
121 }
122 _ => {}
123 }
124 }
125 }
126 }
127
128 info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
129 Ok(())
130 }
131
132 async fn unannounce_from_federation(
133 &self,
134 federation_id: FederationId,
135 gateway_keypair: Keypair,
136 ) {
137 if let Ok(client) = self
138 .clients
139 .get(&federation_id)
140 .ok_or(FederationNotConnected {
141 federation_id_prefix: federation_id.to_prefix(),
142 })
143 && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
144 {
145 ln.remove_from_federation(gateway_keypair).await;
146 }
147 }
148
149 pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
152 let removal_futures = self
153 .clients
154 .values()
155 .filter_map(|client| {
156 client
157 .value()
158 .get_first_module::<GatewayClientModule>()
159 .ok()
160 .map(|lnv1| async move {
161 lnv1.remove_from_federation(gateway_keypair).await;
162 })
163 })
164 .collect::<Vec<_>>();
165
166 futures::future::join_all(removal_futures).await;
167 }
168
169 pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
170 let federation_id = self.index_to_federation.get(&short_channel_id)?;
171 match self.clients.get(federation_id).cloned() {
175 Some(client) => Some(client),
176 _ => {
177 panic!(
178 "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
179 );
180 }
181 }
182 }
183
184 pub fn get_client_for_federation_id_prefix(
185 &self,
186 federation_id_prefix: FederationIdPrefix,
187 ) -> Option<Spanned<ClientHandleArc>> {
188 self.clients.iter().find_map(|(fid, client)| {
189 if fid.to_prefix() == federation_id_prefix {
190 Some(client.clone())
191 } else {
192 None
193 }
194 })
195 }
196
197 pub fn has_federation(&self, federation_id: FederationId) -> bool {
198 self.clients.contains_key(&federation_id)
199 }
200
201 pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
202 self.clients.get(federation_id)
203 }
204
205 pub async fn federation_info(
206 &self,
207 federation_id: FederationId,
208 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
209 ) -> std::result::Result<FederationInfo, FederationNotConnected> {
210 self.clients
211 .get(&federation_id)
212 .ok_or(FederationNotConnected {
213 federation_id_prefix: federation_id.to_prefix(),
214 })?
215 .borrow()
216 .with(|client| async move {
217 let balance_msat = client
218 .get_balance_for_btc()
219 .await
220 .map_err(|_err| FederationNotConnected {
222 federation_id_prefix: federation_id.to_prefix(),
223 })?;
224
225 let config = dbtx.load_federation_config(federation_id).await.ok_or(
226 FederationNotConnected {
227 federation_id_prefix: federation_id.to_prefix(),
228 },
229 )?;
230 let last_backup_time =
231 dbtx.load_backup_record(federation_id)
232 .await
233 .ok_or(FederationNotConnected {
234 federation_id_prefix: federation_id.to_prefix(),
235 })?;
236
237 Ok(FederationInfo {
238 federation_id,
239 federation_name: self.federation_name(client).await,
240 balance_msat,
241 config,
242 last_backup_time,
243 })
244 })
245 .await
246 }
247
248 pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
249 let client_config = client.config().await;
250 let federation_name = client_config.global.federation_name();
251 federation_name.map(String::from)
252 }
253
254 pub async fn federation_info_all_federations(
255 &self,
256 mut dbtx: DatabaseTransaction<'_, NonCommittable>,
257 ) -> Vec<FederationInfo> {
258 let mut federation_infos = Vec::new();
259 for (federation_id, client) in &self.clients {
260 let balance_msat = match client
261 .borrow()
262 .with(|client| client.get_balance_for_btc())
263 .await
264 {
265 Ok(balance_msat) => balance_msat,
266 Err(err) => {
267 warn!(
268 target: LOG_GATEWAY,
269 err = %err.fmt_compact_anyhow(),
270 "Skipped Federation due to lack of primary module"
271 );
272 continue;
273 }
274 };
275
276 let config = dbtx.load_federation_config(*federation_id).await;
277 let last_backup_time = dbtx
278 .load_backup_record(*federation_id)
279 .await
280 .unwrap_or_default();
281 if let Some(config) = config {
282 federation_infos.push(FederationInfo {
283 federation_id: *federation_id,
284 federation_name: self.federation_name(client.value()).await,
285 balance_msat,
286 config,
287 last_backup_time,
288 });
289 }
290 }
291 federation_infos
292 }
293
294 pub async fn get_federation_config(
295 &self,
296 federation_id: FederationId,
297 ) -> AdminResult<JsonClientConfig> {
298 let client = self
299 .clients
300 .get(&federation_id)
301 .ok_or(FederationNotConnected {
302 federation_id_prefix: federation_id.to_prefix(),
303 })?;
304 Ok(client
305 .borrow()
306 .with(|client| client.get_config_json())
307 .await)
308 }
309
310 pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
311 let mut federations = BTreeMap::new();
312 for (federation_id, client) in &self.clients {
313 federations.insert(
314 *federation_id,
315 client
316 .borrow()
317 .with(|client| client.get_config_json())
318 .await,
319 );
320 }
321 federations
322 }
323
324 pub async fn backup_federation(
325 &self,
326 federation_id: &FederationId,
327 dbtx: &mut DatabaseTransaction<'_, Committable>,
328 now: SystemTime,
329 ) {
330 if let Some(client) = self.client(federation_id) {
331 let metadata: BTreeMap<String, String> = BTreeMap::new();
332 #[allow(deprecated)]
333 if client
334 .value()
335 .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
336 metadata,
337 ))
338 .await
339 .is_ok()
340 {
341 dbtx.save_federation_backup_record(*federation_id, Some(now))
342 .await;
343 info!(federation_id = %federation_id, "Successfully backed up federation");
344 }
345 }
346 }
347
348 pub async fn all_invite_codes(
349 &self,
350 ) -> BTreeMap<FederationId, BTreeMap<PeerId, (String, InviteCode)>> {
351 let mut invite_codes = BTreeMap::new();
352
353 for (federation_id, client) in &self.clients {
354 let config = client.value().config().await;
355 let api_endpoints = &config.global.api_endpoints;
356
357 let mut fed_invite_codes = BTreeMap::new();
358 for (peer_id, peer_url) in api_endpoints {
359 if let Some(code) = client.value().invite_code(*peer_id).await {
360 fed_invite_codes.insert(*peer_id, (peer_url.name.clone(), code));
361 }
362 }
363
364 invite_codes.insert(*federation_id, fed_invite_codes);
365 }
366
367 invite_codes
368 }
369
370 pub async fn get_note_summary(
371 &self,
372 federation_id: &FederationId,
373 ) -> AdminResult<TieredCounts> {
374 let client = self.client(federation_id).ok_or(FederationNotConnected {
375 federation_id_prefix: federation_id.to_prefix(),
376 })?;
377 let mint = client.value().get_first_module::<MintClientModule>()?;
378 let mut dbtx = mint.client_ctx.module_db().begin_transaction_nc().await;
379 let counts = mint.get_note_counts_by_denomination(&mut dbtx).await;
380 info!(target: LOG_GATEWAY, ?counts, "Note counts");
381 Ok(counts)
382 }
383
384 pub fn set_next_index(&self, next_index: u64) {
386 self.next_index.store(next_index, Ordering::SeqCst);
387 }
388
389 pub fn pop_next_index(&self) -> AdminResult<u64> {
390 let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
391
392 if next_index == INITIAL_INDEX.wrapping_sub(1) {
394 return Err(AdminGatewayError::GatewayConfigurationError(
395 "Federation Index overflow".to_string(),
396 ));
397 }
398
399 Ok(next_index)
400 }
401}