fedimint_gateway_server/
federation_manager.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::invite_code::InviteCode;
11use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
12use fedimint_gateway_common::FederationInfo;
13use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
14use fedimint_gw_client::GatewayClientModule;
15use fedimint_gwv2_client::GatewayClientModuleV2;
16use fedimint_logging::LOG_GATEWAY;
17use tracing::{info, warn};
18
19use crate::error::{AdminGatewayError, FederationNotConnected};
20use crate::{AdminResult, Registration};
21
22/// The first index that the gateway will assign to a federation.
23/// Note: This starts at 1 because LNv1 uses the `federation_index` as an SCID.
24/// An SCID of 0 is considered invalid by LND's HTLC interceptor.
25const INITIAL_INDEX: u64 = 1;
26
27// TODO: Add support for client lookup by payment hash (for LNv2).
28#[derive(Debug)]
29pub struct FederationManager {
30    /// Map of `FederationId` -> `Client`. Used for efficient retrieval of the
31    /// client while handling incoming HTLCs.
32    clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
33
34    /// Map of federation indices to `FederationId`. Use for efficient retrieval
35    /// of the client while handling incoming HTLCs.
36    /// Can be removed after LNv1 removal.
37    index_to_federation: BTreeMap<u64, FederationId>,
38
39    /// Tracker for federation index assignments. When connecting a new
40    /// federation, this value is incremented and assigned to the federation
41    /// as the `federation_index`
42    next_index: AtomicU64,
43}
44
45impl FederationManager {
46    pub fn new() -> Self {
47        Self {
48            clients: BTreeMap::new(),
49            index_to_federation: BTreeMap::new(),
50            next_index: AtomicU64::new(INITIAL_INDEX),
51        }
52    }
53
54    pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
55        let federation_id = client.borrow().with_sync(|c| c.federation_id());
56        self.clients.insert(federation_id, client);
57        self.index_to_federation.insert(index, federation_id);
58    }
59
60    pub async fn leave_federation(
61        &mut self,
62        federation_id: FederationId,
63        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
64        registrations: Vec<&Registration>,
65    ) -> AdminResult<FederationInfo> {
66        let federation_info = self.federation_info(federation_id, dbtx).await?;
67
68        for registration in registrations {
69            self.unannounce_from_federation(federation_id, registration.keypair)
70                .await;
71        }
72
73        self.remove_client(federation_id).await?;
74
75        Ok(federation_info)
76    }
77
78    async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
79        let client = self
80            .clients
81            .remove(&federation_id)
82            .ok_or(FederationNotConnected {
83                federation_id_prefix: federation_id.to_prefix(),
84            })?
85            .into_value();
86
87        self.index_to_federation
88            .retain(|_, fid| *fid != federation_id);
89
90        match Arc::into_inner(client) {
91            Some(client) => {
92                client.shutdown().await;
93                Ok(())
94            }
95            _ => Err(AdminGatewayError::ClientRemovalError(format!(
96                "Federation client {federation_id} is not unique, failed to shutdown client"
97            ))),
98        }
99    }
100
101    /// Waits for ongoing incoming LNv1 and LNv2 payments to complete before
102    /// returning.
103    pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
104        for client in self.clients.values() {
105            let active_operations = client.value().get_active_operations().await;
106            let operation_log = client.value().operation_log();
107            for op_id in active_operations {
108                let log_entry = operation_log.get_operation(op_id).await;
109                if let Some(entry) = log_entry {
110                    match entry.operation_module_kind() {
111                        "lnv2" => {
112                            let lnv2 =
113                                client.value().get_first_module::<GatewayClientModuleV2>()?;
114                            lnv2.await_completion(op_id).await;
115                        }
116                        "ln" => {
117                            let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
118                            lnv1.await_completion(op_id).await;
119                        }
120                        _ => {}
121                    }
122                }
123            }
124        }
125
126        info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
127        Ok(())
128    }
129
130    async fn unannounce_from_federation(
131        &self,
132        federation_id: FederationId,
133        gateway_keypair: Keypair,
134    ) {
135        if let Ok(client) = self
136            .clients
137            .get(&federation_id)
138            .ok_or(FederationNotConnected {
139                federation_id_prefix: federation_id.to_prefix(),
140            })
141            && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
142        {
143            ln.remove_from_federation(gateway_keypair).await;
144        }
145    }
146
147    /// Iterates through all of the federations the gateway is registered with
148    /// and requests to remove the registration record.
149    pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
150        let removal_futures = self
151            .clients
152            .values()
153            .filter_map(|client| {
154                client
155                    .value()
156                    .get_first_module::<GatewayClientModule>()
157                    .ok()
158                    .map(|lnv1| async move {
159                        lnv1.remove_from_federation(gateway_keypair).await;
160                    })
161            })
162            .collect::<Vec<_>>();
163
164        futures::future::join_all(removal_futures).await;
165    }
166
167    pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
168        let federation_id = self.index_to_federation.get(&short_channel_id)?;
169        // TODO(tvolk131): Cloning the client here could cause issues with client
170        // shutdown (see `remove_client` above). Perhaps this function should take a
171        // lambda and pass it into `client.with_sync`.
172        match self.clients.get(federation_id).cloned() {
173            Some(client) => Some(client),
174            _ => {
175                panic!(
176                    "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
177                );
178            }
179        }
180    }
181
182    pub fn get_client_for_federation_id_prefix(
183        &self,
184        federation_id_prefix: FederationIdPrefix,
185    ) -> Option<Spanned<ClientHandleArc>> {
186        self.clients.iter().find_map(|(fid, client)| {
187            if fid.to_prefix() == federation_id_prefix {
188                Some(client.clone())
189            } else {
190                None
191            }
192        })
193    }
194
195    pub fn has_federation(&self, federation_id: FederationId) -> bool {
196        self.clients.contains_key(&federation_id)
197    }
198
199    pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
200        self.clients.get(federation_id)
201    }
202
203    pub async fn federation_info(
204        &self,
205        federation_id: FederationId,
206        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
207    ) -> std::result::Result<FederationInfo, FederationNotConnected> {
208        self.clients
209            .get(&federation_id)
210            .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
211            .borrow()
212            .with(|client| async move {
213                let balance_msat = client.get_balance_for_btc().await
214                    // If primary module is not available, we're not really connected yet
215                    .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
216
217                let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
218                    federation_id_prefix: federation_id.to_prefix(),
219                })?;
220                let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
221                    federation_id_prefix: federation_id.to_prefix(),
222                })?;
223
224                Ok(FederationInfo {
225                    federation_id,
226                    federation_name: self.federation_name(client).await,
227                    balance_msat,
228                    config,
229                    last_backup_time,
230                })
231            })
232            .await
233    }
234
235    pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
236        let client_config = client.config().await;
237        let federation_name = client_config.global.federation_name();
238        federation_name.map(String::from)
239    }
240
241    pub async fn federation_info_all_federations(
242        &self,
243        mut dbtx: DatabaseTransaction<'_, NonCommittable>,
244    ) -> Vec<FederationInfo> {
245        let mut federation_infos = Vec::new();
246        for (federation_id, client) in &self.clients {
247            let balance_msat = match client
248                .borrow()
249                .with(|client| client.get_balance_for_btc())
250                .await
251            {
252                Ok(balance_msat) => balance_msat,
253                Err(err) => {
254                    warn!(
255                        target: LOG_GATEWAY,
256                        err = %err.fmt_compact_anyhow(),
257                        "Skipped Federation due to lack of primary module"
258                    );
259                    continue;
260                }
261            };
262
263            let config = dbtx.load_federation_config(*federation_id).await;
264            let last_backup_time = dbtx
265                .load_backup_record(*federation_id)
266                .await
267                .unwrap_or_default();
268            if let Some(config) = config {
269                federation_infos.push(FederationInfo {
270                    federation_id: *federation_id,
271                    federation_name: self.federation_name(client.value()).await,
272                    balance_msat,
273                    config,
274                    last_backup_time,
275                });
276            }
277        }
278        federation_infos
279    }
280
281    pub async fn get_federation_config(
282        &self,
283        federation_id: FederationId,
284    ) -> AdminResult<JsonClientConfig> {
285        let client = self
286            .clients
287            .get(&federation_id)
288            .ok_or(FederationNotConnected {
289                federation_id_prefix: federation_id.to_prefix(),
290            })?;
291        Ok(client
292            .borrow()
293            .with(|client| client.get_config_json())
294            .await)
295    }
296
297    pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
298        let mut federations = BTreeMap::new();
299        for (federation_id, client) in &self.clients {
300            federations.insert(
301                *federation_id,
302                client
303                    .borrow()
304                    .with(|client| client.get_config_json())
305                    .await,
306            );
307        }
308        federations
309    }
310
311    pub async fn backup_federation(
312        &self,
313        federation_id: &FederationId,
314        dbtx: &mut DatabaseTransaction<'_, Committable>,
315        now: SystemTime,
316    ) {
317        if let Some(client) = self.client(federation_id) {
318            let metadata: BTreeMap<String, String> = BTreeMap::new();
319            #[allow(deprecated)]
320            if client
321                .value()
322                .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
323                    metadata,
324                ))
325                .await
326                .is_ok()
327            {
328                dbtx.save_federation_backup_record(*federation_id, Some(now))
329                    .await;
330                info!(federation_id = %federation_id, "Successfully backed up federation");
331            }
332        }
333    }
334
335    pub async fn all_invite_codes(&self) -> BTreeMap<FederationId, Vec<InviteCode>> {
336        let mut invite_codes = BTreeMap::new();
337
338        for (federation_id, client) in &self.clients {
339            let peer_urls = client.value().get_peer_urls().await;
340
341            let fed_invite_codes = futures::future::join_all(
342                peer_urls
343                    .keys()
344                    .map(|peer_id| async move { client.value().invite_code(*peer_id).await }),
345            )
346            .await
347            .into_iter()
348            .flatten()
349            .collect();
350
351            invite_codes.insert(*federation_id, fed_invite_codes);
352        }
353
354        invite_codes
355    }
356
357    // TODO(tvolk131): Set this value in the constructor.
358    pub fn set_next_index(&self, next_index: u64) {
359        self.next_index.store(next_index, Ordering::SeqCst);
360    }
361
362    pub fn pop_next_index(&self) -> AdminResult<u64> {
363        let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
364
365        // Check for overflow.
366        if next_index == INITIAL_INDEX.wrapping_sub(1) {
367            return Err(AdminGatewayError::GatewayConfigurationError(
368                "Federation Index overflow".to_string(),
369            ));
370        }
371
372        Ok(next_index)
373    }
374}