1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::invite_code::InviteCode;
11use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
12use fedimint_gateway_common::FederationInfo;
13use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
14use fedimint_gw_client::GatewayClientModule;
15use fedimint_gwv2_client::GatewayClientModuleV2;
16use fedimint_logging::LOG_GATEWAY;
17use tracing::{info, warn};
18
19use crate::error::{AdminGatewayError, FederationNotConnected};
20use crate::{AdminResult, Registration};
21
22const INITIAL_INDEX: u64 = 1;
26
27#[derive(Debug)]
29pub struct FederationManager {
30 clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
33
34 index_to_federation: BTreeMap<u64, FederationId>,
38
39 next_index: AtomicU64,
43}
44
45impl FederationManager {
46 pub fn new() -> Self {
47 Self {
48 clients: BTreeMap::new(),
49 index_to_federation: BTreeMap::new(),
50 next_index: AtomicU64::new(INITIAL_INDEX),
51 }
52 }
53
54 pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
55 let federation_id = client.borrow().with_sync(|c| c.federation_id());
56 self.clients.insert(federation_id, client);
57 self.index_to_federation.insert(index, federation_id);
58 }
59
60 pub async fn leave_federation(
61 &mut self,
62 federation_id: FederationId,
63 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
64 registrations: Vec<&Registration>,
65 ) -> AdminResult<FederationInfo> {
66 let federation_info = self.federation_info(federation_id, dbtx).await?;
67
68 for registration in registrations {
69 self.unannounce_from_federation(federation_id, registration.keypair)
70 .await;
71 }
72
73 self.remove_client(federation_id).await?;
74
75 Ok(federation_info)
76 }
77
78 async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
79 let client = self
80 .clients
81 .remove(&federation_id)
82 .ok_or(FederationNotConnected {
83 federation_id_prefix: federation_id.to_prefix(),
84 })?
85 .into_value();
86
87 self.index_to_federation
88 .retain(|_, fid| *fid != federation_id);
89
90 match Arc::into_inner(client) {
91 Some(client) => {
92 client.shutdown().await;
93 Ok(())
94 }
95 _ => Err(AdminGatewayError::ClientRemovalError(format!(
96 "Federation client {federation_id} is not unique, failed to shutdown client"
97 ))),
98 }
99 }
100
101 pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
104 for client in self.clients.values() {
105 let active_operations = client.value().get_active_operations().await;
106 let operation_log = client.value().operation_log();
107 for op_id in active_operations {
108 let log_entry = operation_log.get_operation(op_id).await;
109 if let Some(entry) = log_entry {
110 match entry.operation_module_kind() {
111 "lnv2" => {
112 let lnv2 =
113 client.value().get_first_module::<GatewayClientModuleV2>()?;
114 lnv2.await_completion(op_id).await;
115 }
116 "ln" => {
117 let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
118 lnv1.await_completion(op_id).await;
119 }
120 _ => {}
121 }
122 }
123 }
124 }
125
126 info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
127 Ok(())
128 }
129
130 async fn unannounce_from_federation(
131 &self,
132 federation_id: FederationId,
133 gateway_keypair: Keypair,
134 ) {
135 if let Ok(client) = self
136 .clients
137 .get(&federation_id)
138 .ok_or(FederationNotConnected {
139 federation_id_prefix: federation_id.to_prefix(),
140 })
141 && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
142 {
143 ln.remove_from_federation(gateway_keypair).await;
144 }
145 }
146
147 pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
150 let removal_futures = self
151 .clients
152 .values()
153 .filter_map(|client| {
154 client
155 .value()
156 .get_first_module::<GatewayClientModule>()
157 .ok()
158 .map(|lnv1| async move {
159 lnv1.remove_from_federation(gateway_keypair).await;
160 })
161 })
162 .collect::<Vec<_>>();
163
164 futures::future::join_all(removal_futures).await;
165 }
166
167 pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
168 let federation_id = self.index_to_federation.get(&short_channel_id)?;
169 match self.clients.get(federation_id).cloned() {
173 Some(client) => Some(client),
174 _ => {
175 panic!(
176 "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
177 );
178 }
179 }
180 }
181
182 pub fn get_client_for_federation_id_prefix(
183 &self,
184 federation_id_prefix: FederationIdPrefix,
185 ) -> Option<Spanned<ClientHandleArc>> {
186 self.clients.iter().find_map(|(fid, client)| {
187 if fid.to_prefix() == federation_id_prefix {
188 Some(client.clone())
189 } else {
190 None
191 }
192 })
193 }
194
195 pub fn has_federation(&self, federation_id: FederationId) -> bool {
196 self.clients.contains_key(&federation_id)
197 }
198
199 pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
200 self.clients.get(federation_id)
201 }
202
203 pub async fn federation_info(
204 &self,
205 federation_id: FederationId,
206 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
207 ) -> std::result::Result<FederationInfo, FederationNotConnected> {
208 self.clients
209 .get(&federation_id)
210 .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
211 .borrow()
212 .with(|client| async move {
213 let balance_msat = client.get_balance_for_btc().await
214 .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
216
217 let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
218 federation_id_prefix: federation_id.to_prefix(),
219 })?;
220 let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
221 federation_id_prefix: federation_id.to_prefix(),
222 })?;
223
224 Ok(FederationInfo {
225 federation_id,
226 federation_name: self.federation_name(client).await,
227 balance_msat,
228 config,
229 last_backup_time,
230 })
231 })
232 .await
233 }
234
235 pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
236 let client_config = client.config().await;
237 let federation_name = client_config.global.federation_name();
238 federation_name.map(String::from)
239 }
240
241 pub async fn federation_info_all_federations(
242 &self,
243 mut dbtx: DatabaseTransaction<'_, NonCommittable>,
244 ) -> Vec<FederationInfo> {
245 let mut federation_infos = Vec::new();
246 for (federation_id, client) in &self.clients {
247 let balance_msat = match client
248 .borrow()
249 .with(|client| client.get_balance_for_btc())
250 .await
251 {
252 Ok(balance_msat) => balance_msat,
253 Err(err) => {
254 warn!(
255 target: LOG_GATEWAY,
256 err = %err.fmt_compact_anyhow(),
257 "Skipped Federation due to lack of primary module"
258 );
259 continue;
260 }
261 };
262
263 let config = dbtx.load_federation_config(*federation_id).await;
264 let last_backup_time = dbtx
265 .load_backup_record(*federation_id)
266 .await
267 .unwrap_or_default();
268 if let Some(config) = config {
269 federation_infos.push(FederationInfo {
270 federation_id: *federation_id,
271 federation_name: self.federation_name(client.value()).await,
272 balance_msat,
273 config,
274 last_backup_time,
275 });
276 }
277 }
278 federation_infos
279 }
280
281 pub async fn get_federation_config(
282 &self,
283 federation_id: FederationId,
284 ) -> AdminResult<JsonClientConfig> {
285 let client = self
286 .clients
287 .get(&federation_id)
288 .ok_or(FederationNotConnected {
289 federation_id_prefix: federation_id.to_prefix(),
290 })?;
291 Ok(client
292 .borrow()
293 .with(|client| client.get_config_json())
294 .await)
295 }
296
297 pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
298 let mut federations = BTreeMap::new();
299 for (federation_id, client) in &self.clients {
300 federations.insert(
301 *federation_id,
302 client
303 .borrow()
304 .with(|client| client.get_config_json())
305 .await,
306 );
307 }
308 federations
309 }
310
311 pub async fn backup_federation(
312 &self,
313 federation_id: &FederationId,
314 dbtx: &mut DatabaseTransaction<'_, Committable>,
315 now: SystemTime,
316 ) {
317 if let Some(client) = self.client(federation_id) {
318 let metadata: BTreeMap<String, String> = BTreeMap::new();
319 #[allow(deprecated)]
320 if client
321 .value()
322 .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
323 metadata,
324 ))
325 .await
326 .is_ok()
327 {
328 dbtx.save_federation_backup_record(*federation_id, Some(now))
329 .await;
330 info!(federation_id = %federation_id, "Successfully backed up federation");
331 }
332 }
333 }
334
335 pub async fn all_invite_codes(&self) -> BTreeMap<FederationId, Vec<InviteCode>> {
336 let mut invite_codes = BTreeMap::new();
337
338 for (federation_id, client) in &self.clients {
339 let peer_urls = client.value().get_peer_urls().await;
340
341 let fed_invite_codes = futures::future::join_all(
342 peer_urls
343 .keys()
344 .map(|peer_id| async move { client.value().invite_code(*peer_id).await }),
345 )
346 .await
347 .into_iter()
348 .flatten()
349 .collect();
350
351 invite_codes.insert(*federation_id, fed_invite_codes);
352 }
353
354 invite_codes
355 }
356
357 pub fn set_next_index(&self, next_index: u64) {
359 self.next_index.store(next_index, Ordering::SeqCst);
360 }
361
362 pub fn pop_next_index(&self) -> AdminResult<u64> {
363 let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
364
365 if next_index == INITIAL_INDEX.wrapping_sub(1) {
367 return Err(AdminGatewayError::GatewayConfigurationError(
368 "Federation Index overflow".to_string(),
369 ));
370 }
371
372 Ok(next_index)
373 }
374}