1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
11use fedimint_gateway_common::FederationInfo;
12use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
13use fedimint_gw_client::GatewayClientModule;
14use fedimint_gwv2_client::GatewayClientModuleV2;
15use fedimint_logging::LOG_GATEWAY;
16use tracing::{info, warn};
17
18use crate::AdminResult;
19use crate::error::{AdminGatewayError, FederationNotConnected};
20
21const INITIAL_INDEX: u64 = 1;
25
26#[derive(Debug)]
28pub struct FederationManager {
29 clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
32
33 index_to_federation: BTreeMap<u64, FederationId>,
37
38 next_index: AtomicU64,
42}
43
44impl FederationManager {
45 pub fn new() -> Self {
46 Self {
47 clients: BTreeMap::new(),
48 index_to_federation: BTreeMap::new(),
49 next_index: AtomicU64::new(INITIAL_INDEX),
50 }
51 }
52
53 pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
54 let federation_id = client.borrow().with_sync(|c| c.federation_id());
55 self.clients.insert(federation_id, client);
56 self.index_to_federation.insert(index, federation_id);
57 }
58
59 pub async fn leave_federation(
60 &mut self,
61 federation_id: FederationId,
62 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
63 ) -> AdminResult<FederationInfo> {
64 let federation_info = self.federation_info(federation_id, dbtx).await?;
65
66 let gateway_keypair = dbtx.load_gateway_keypair_assert_exists().await;
67
68 self.unannounce_from_federation(federation_id, gateway_keypair)
69 .await;
70
71 self.remove_client(federation_id).await?;
72
73 Ok(federation_info)
74 }
75
76 async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
77 let client = self
78 .clients
79 .remove(&federation_id)
80 .ok_or(FederationNotConnected {
81 federation_id_prefix: federation_id.to_prefix(),
82 })?
83 .into_value();
84
85 self.index_to_federation
86 .retain(|_, fid| *fid != federation_id);
87
88 match Arc::into_inner(client) {
89 Some(client) => {
90 client.shutdown().await;
91 Ok(())
92 }
93 _ => Err(AdminGatewayError::ClientRemovalError(format!(
94 "Federation client {federation_id} is not unique, failed to shutdown client"
95 ))),
96 }
97 }
98
99 pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
102 for client in self.clients.values() {
103 let active_operations = client.value().get_active_operations().await;
104 let operation_log = client.value().operation_log();
105 for op_id in active_operations {
106 let log_entry = operation_log.get_operation(op_id).await;
107 if let Some(entry) = log_entry {
108 match entry.operation_module_kind() {
109 "lnv2" => {
110 let lnv2 =
111 client.value().get_first_module::<GatewayClientModuleV2>()?;
112 lnv2.await_completion(op_id).await;
113 }
114 "ln" => {
115 let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
116 lnv1.await_completion(op_id).await;
117 }
118 _ => {}
119 }
120 }
121 }
122 }
123
124 info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
125 Ok(())
126 }
127
128 async fn unannounce_from_federation(
129 &self,
130 federation_id: FederationId,
131 gateway_keypair: Keypair,
132 ) {
133 if let Ok(client) = self
134 .clients
135 .get(&federation_id)
136 .ok_or(FederationNotConnected {
137 federation_id_prefix: federation_id.to_prefix(),
138 })
139 && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
140 {
141 ln.remove_from_federation(gateway_keypair).await;
142 }
143 }
144
145 pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
148 let removal_futures = self
149 .clients
150 .values()
151 .map(|client| async {
152 client
153 .value()
154 .get_first_module::<GatewayClientModule>()
155 .expect("Must have client module")
156 .remove_from_federation(gateway_keypair)
157 .await;
158 })
159 .collect::<Vec<_>>();
160
161 futures::future::join_all(removal_futures).await;
162 }
163
164 pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
165 let federation_id = self.index_to_federation.get(&short_channel_id)?;
166 match self.clients.get(federation_id).cloned() {
170 Some(client) => Some(client),
171 _ => {
172 panic!(
173 "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
174 );
175 }
176 }
177 }
178
179 pub fn get_client_for_federation_id_prefix(
180 &self,
181 federation_id_prefix: FederationIdPrefix,
182 ) -> Option<Spanned<ClientHandleArc>> {
183 self.clients.iter().find_map(|(fid, client)| {
184 if fid.to_prefix() == federation_id_prefix {
185 Some(client.clone())
186 } else {
187 None
188 }
189 })
190 }
191
192 pub fn has_federation(&self, federation_id: FederationId) -> bool {
193 self.clients.contains_key(&federation_id)
194 }
195
196 pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
197 self.clients.get(federation_id)
198 }
199
200 pub async fn federation_info(
201 &self,
202 federation_id: FederationId,
203 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
204 ) -> std::result::Result<FederationInfo, FederationNotConnected> {
205 self.clients
206 .get(&federation_id)
207 .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
208 .borrow()
209 .with(|client| async move {
210 let balance_msat = client.get_balance_for_btc().await
211 .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
213
214 let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
215 federation_id_prefix: federation_id.to_prefix(),
216 })?;
217 let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
218 federation_id_prefix: federation_id.to_prefix(),
219 })?;
220
221 Ok(FederationInfo {
222 federation_id,
223 federation_name: self.federation_name(client).await,
224 balance_msat,
225 config,
226 last_backup_time,
227 })
228 })
229 .await
230 }
231
232 pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
233 let client_config = client.config().await;
234 let federation_name = client_config.global.federation_name();
235 federation_name.map(String::from)
236 }
237
238 pub async fn federation_info_all_federations(
239 &self,
240 mut dbtx: DatabaseTransaction<'_, NonCommittable>,
241 ) -> Vec<FederationInfo> {
242 let mut federation_infos = Vec::new();
243 for (federation_id, client) in &self.clients {
244 let balance_msat = match client
245 .borrow()
246 .with(|client| client.get_balance_for_btc())
247 .await
248 {
249 Ok(balance_msat) => balance_msat,
250 Err(err) => {
251 warn!(
252 target: LOG_GATEWAY,
253 err = %err.fmt_compact_anyhow(),
254 "Skipped Federation due to lack of primary module"
255 );
256 continue;
257 }
258 };
259
260 let config = dbtx.load_federation_config(*federation_id).await;
261 let last_backup_time = dbtx
262 .load_backup_record(*federation_id)
263 .await
264 .unwrap_or_default();
265 if let Some(config) = config {
266 federation_infos.push(FederationInfo {
267 federation_id: *federation_id,
268 federation_name: self.federation_name(client.value()).await,
269 balance_msat,
270 config,
271 last_backup_time,
272 });
273 }
274 }
275 federation_infos
276 }
277
278 pub async fn get_federation_config(
279 &self,
280 federation_id: FederationId,
281 ) -> AdminResult<JsonClientConfig> {
282 let client = self
283 .clients
284 .get(&federation_id)
285 .ok_or(FederationNotConnected {
286 federation_id_prefix: federation_id.to_prefix(),
287 })?;
288 Ok(client
289 .borrow()
290 .with(|client| client.get_config_json())
291 .await)
292 }
293
294 pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
295 let mut federations = BTreeMap::new();
296 for (federation_id, client) in &self.clients {
297 federations.insert(
298 *federation_id,
299 client
300 .borrow()
301 .with(|client| client.get_config_json())
302 .await,
303 );
304 }
305 federations
306 }
307
308 pub async fn backup_federation(
309 &self,
310 federation_id: &FederationId,
311 dbtx: &mut DatabaseTransaction<'_, Committable>,
312 now: SystemTime,
313 ) {
314 if let Some(client) = self.client(federation_id) {
315 let metadata: BTreeMap<String, String> = BTreeMap::new();
316 if client
317 .value()
318 .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
319 metadata,
320 ))
321 .await
322 .is_ok()
323 {
324 dbtx.save_federation_backup_record(*federation_id, Some(now))
325 .await;
326 info!(federation_id = %federation_id, "Successfully backed up federation");
327 }
328 }
329 }
330
331 pub fn set_next_index(&self, next_index: u64) {
333 self.next_index.store(next_index, Ordering::SeqCst);
334 }
335
336 pub fn pop_next_index(&self) -> AdminResult<u64> {
337 let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
338
339 if next_index == INITIAL_INDEX.wrapping_sub(1) {
341 return Err(AdminGatewayError::GatewayConfigurationError(
342 "Federation Index overflow".to_string(),
343 ));
344 }
345
346 Ok(next_index)
347 }
348}