1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
11use fedimint_gateway_common::FederationInfo;
12use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
13use fedimint_gw_client::GatewayClientModule;
14use fedimint_gwv2_client::GatewayClientModuleV2;
15use fedimint_logging::LOG_GATEWAY;
16use tracing::{info, warn};
17
18use crate::error::{AdminGatewayError, FederationNotConnected};
19use crate::{AdminResult, Registration};
20
21const INITIAL_INDEX: u64 = 1;
25
26#[derive(Debug)]
28pub struct FederationManager {
29 clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
32
33 index_to_federation: BTreeMap<u64, FederationId>,
37
38 next_index: AtomicU64,
42}
43
44impl FederationManager {
45 pub fn new() -> Self {
46 Self {
47 clients: BTreeMap::new(),
48 index_to_federation: BTreeMap::new(),
49 next_index: AtomicU64::new(INITIAL_INDEX),
50 }
51 }
52
53 pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
54 let federation_id = client.borrow().with_sync(|c| c.federation_id());
55 self.clients.insert(federation_id, client);
56 self.index_to_federation.insert(index, federation_id);
57 }
58
59 pub async fn leave_federation(
60 &mut self,
61 federation_id: FederationId,
62 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
63 registrations: Vec<&Registration>,
64 ) -> AdminResult<FederationInfo> {
65 let federation_info = self.federation_info(federation_id, dbtx).await?;
66
67 for registration in registrations {
68 self.unannounce_from_federation(federation_id, registration.keypair)
69 .await;
70 }
71
72 self.remove_client(federation_id).await?;
73
74 Ok(federation_info)
75 }
76
77 async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
78 let client = self
79 .clients
80 .remove(&federation_id)
81 .ok_or(FederationNotConnected {
82 federation_id_prefix: federation_id.to_prefix(),
83 })?
84 .into_value();
85
86 self.index_to_federation
87 .retain(|_, fid| *fid != federation_id);
88
89 match Arc::into_inner(client) {
90 Some(client) => {
91 client.shutdown().await;
92 Ok(())
93 }
94 _ => Err(AdminGatewayError::ClientRemovalError(format!(
95 "Federation client {federation_id} is not unique, failed to shutdown client"
96 ))),
97 }
98 }
99
100 pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
103 for client in self.clients.values() {
104 let active_operations = client.value().get_active_operations().await;
105 let operation_log = client.value().operation_log();
106 for op_id in active_operations {
107 let log_entry = operation_log.get_operation(op_id).await;
108 if let Some(entry) = log_entry {
109 match entry.operation_module_kind() {
110 "lnv2" => {
111 let lnv2 =
112 client.value().get_first_module::<GatewayClientModuleV2>()?;
113 lnv2.await_completion(op_id).await;
114 }
115 "ln" => {
116 let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
117 lnv1.await_completion(op_id).await;
118 }
119 _ => {}
120 }
121 }
122 }
123 }
124
125 info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
126 Ok(())
127 }
128
129 async fn unannounce_from_federation(
130 &self,
131 federation_id: FederationId,
132 gateway_keypair: Keypair,
133 ) {
134 if let Ok(client) = self
135 .clients
136 .get(&federation_id)
137 .ok_or(FederationNotConnected {
138 federation_id_prefix: federation_id.to_prefix(),
139 })
140 && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
141 {
142 ln.remove_from_federation(gateway_keypair).await;
143 }
144 }
145
146 pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
149 let removal_futures = self
150 .clients
151 .values()
152 .filter_map(|client| {
153 client
154 .value()
155 .get_first_module::<GatewayClientModule>()
156 .ok()
157 .map(|lnv1| async move {
158 lnv1.remove_from_federation(gateway_keypair).await;
159 })
160 })
161 .collect::<Vec<_>>();
162
163 futures::future::join_all(removal_futures).await;
164 }
165
166 pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
167 let federation_id = self.index_to_federation.get(&short_channel_id)?;
168 match self.clients.get(federation_id).cloned() {
172 Some(client) => Some(client),
173 _ => {
174 panic!(
175 "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
176 );
177 }
178 }
179 }
180
181 pub fn get_client_for_federation_id_prefix(
182 &self,
183 federation_id_prefix: FederationIdPrefix,
184 ) -> Option<Spanned<ClientHandleArc>> {
185 self.clients.iter().find_map(|(fid, client)| {
186 if fid.to_prefix() == federation_id_prefix {
187 Some(client.clone())
188 } else {
189 None
190 }
191 })
192 }
193
194 pub fn has_federation(&self, federation_id: FederationId) -> bool {
195 self.clients.contains_key(&federation_id)
196 }
197
198 pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
199 self.clients.get(federation_id)
200 }
201
202 pub async fn federation_info(
203 &self,
204 federation_id: FederationId,
205 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
206 ) -> std::result::Result<FederationInfo, FederationNotConnected> {
207 self.clients
208 .get(&federation_id)
209 .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
210 .borrow()
211 .with(|client| async move {
212 let balance_msat = client.get_balance_for_btc().await
213 .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
215
216 let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
217 federation_id_prefix: federation_id.to_prefix(),
218 })?;
219 let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
220 federation_id_prefix: federation_id.to_prefix(),
221 })?;
222
223 Ok(FederationInfo {
224 federation_id,
225 federation_name: self.federation_name(client).await,
226 balance_msat,
227 config,
228 last_backup_time,
229 })
230 })
231 .await
232 }
233
234 pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
235 let client_config = client.config().await;
236 let federation_name = client_config.global.federation_name();
237 federation_name.map(String::from)
238 }
239
240 pub async fn federation_info_all_federations(
241 &self,
242 mut dbtx: DatabaseTransaction<'_, NonCommittable>,
243 ) -> Vec<FederationInfo> {
244 let mut federation_infos = Vec::new();
245 for (federation_id, client) in &self.clients {
246 let balance_msat = match client
247 .borrow()
248 .with(|client| client.get_balance_for_btc())
249 .await
250 {
251 Ok(balance_msat) => balance_msat,
252 Err(err) => {
253 warn!(
254 target: LOG_GATEWAY,
255 err = %err.fmt_compact_anyhow(),
256 "Skipped Federation due to lack of primary module"
257 );
258 continue;
259 }
260 };
261
262 let config = dbtx.load_federation_config(*federation_id).await;
263 let last_backup_time = dbtx
264 .load_backup_record(*federation_id)
265 .await
266 .unwrap_or_default();
267 if let Some(config) = config {
268 federation_infos.push(FederationInfo {
269 federation_id: *federation_id,
270 federation_name: self.federation_name(client.value()).await,
271 balance_msat,
272 config,
273 last_backup_time,
274 });
275 }
276 }
277 federation_infos
278 }
279
280 pub async fn get_federation_config(
281 &self,
282 federation_id: FederationId,
283 ) -> AdminResult<JsonClientConfig> {
284 let client = self
285 .clients
286 .get(&federation_id)
287 .ok_or(FederationNotConnected {
288 federation_id_prefix: federation_id.to_prefix(),
289 })?;
290 Ok(client
291 .borrow()
292 .with(|client| client.get_config_json())
293 .await)
294 }
295
296 pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
297 let mut federations = BTreeMap::new();
298 for (federation_id, client) in &self.clients {
299 federations.insert(
300 *federation_id,
301 client
302 .borrow()
303 .with(|client| client.get_config_json())
304 .await,
305 );
306 }
307 federations
308 }
309
310 pub async fn backup_federation(
311 &self,
312 federation_id: &FederationId,
313 dbtx: &mut DatabaseTransaction<'_, Committable>,
314 now: SystemTime,
315 ) {
316 if let Some(client) = self.client(federation_id) {
317 let metadata: BTreeMap<String, String> = BTreeMap::new();
318 if client
319 .value()
320 .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
321 metadata,
322 ))
323 .await
324 .is_ok()
325 {
326 dbtx.save_federation_backup_record(*federation_id, Some(now))
327 .await;
328 info!(federation_id = %federation_id, "Successfully backed up federation");
329 }
330 }
331 }
332
333 pub fn set_next_index(&self, next_index: u64) {
335 self.next_index.store(next_index, Ordering::SeqCst);
336 }
337
338 pub fn pop_next_index(&self) -> AdminResult<u64> {
339 let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
340
341 if next_index == INITIAL_INDEX.wrapping_sub(1) {
343 return Err(AdminGatewayError::GatewayConfigurationError(
344 "Federation Index overflow".to_string(),
345 ));
346 }
347
348 Ok(next_index)
349 }
350}