fedimint_gateway_server/
federation_manager.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
11use fedimint_gateway_common::FederationInfo;
12use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
13use fedimint_gw_client::GatewayClientModule;
14use fedimint_gwv2_client::GatewayClientModuleV2;
15use fedimint_logging::LOG_GATEWAY;
16use tracing::{info, warn};
17
18use crate::error::{AdminGatewayError, FederationNotConnected};
19use crate::{AdminResult, Registration};
20
21/// The first index that the gateway will assign to a federation.
22/// Note: This starts at 1 because LNv1 uses the `federation_index` as an SCID.
23/// An SCID of 0 is considered invalid by LND's HTLC interceptor.
24const INITIAL_INDEX: u64 = 1;
25
26// TODO: Add support for client lookup by payment hash (for LNv2).
27#[derive(Debug)]
28pub struct FederationManager {
29    /// Map of `FederationId` -> `Client`. Used for efficient retrieval of the
30    /// client while handling incoming HTLCs.
31    clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
32
33    /// Map of federation indices to `FederationId`. Use for efficient retrieval
34    /// of the client while handling incoming HTLCs.
35    /// Can be removed after LNv1 removal.
36    index_to_federation: BTreeMap<u64, FederationId>,
37
38    /// Tracker for federation index assignments. When connecting a new
39    /// federation, this value is incremented and assigned to the federation
40    /// as the `federation_index`
41    next_index: AtomicU64,
42}
43
44impl FederationManager {
45    pub fn new() -> Self {
46        Self {
47            clients: BTreeMap::new(),
48            index_to_federation: BTreeMap::new(),
49            next_index: AtomicU64::new(INITIAL_INDEX),
50        }
51    }
52
53    pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
54        let federation_id = client.borrow().with_sync(|c| c.federation_id());
55        self.clients.insert(federation_id, client);
56        self.index_to_federation.insert(index, federation_id);
57    }
58
59    pub async fn leave_federation(
60        &mut self,
61        federation_id: FederationId,
62        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
63        registrations: Vec<&Registration>,
64    ) -> AdminResult<FederationInfo> {
65        let federation_info = self.federation_info(federation_id, dbtx).await?;
66
67        for registration in registrations {
68            self.unannounce_from_federation(federation_id, registration.keypair)
69                .await;
70        }
71
72        self.remove_client(federation_id).await?;
73
74        Ok(federation_info)
75    }
76
77    async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
78        let client = self
79            .clients
80            .remove(&federation_id)
81            .ok_or(FederationNotConnected {
82                federation_id_prefix: federation_id.to_prefix(),
83            })?
84            .into_value();
85
86        self.index_to_federation
87            .retain(|_, fid| *fid != federation_id);
88
89        match Arc::into_inner(client) {
90            Some(client) => {
91                client.shutdown().await;
92                Ok(())
93            }
94            _ => Err(AdminGatewayError::ClientRemovalError(format!(
95                "Federation client {federation_id} is not unique, failed to shutdown client"
96            ))),
97        }
98    }
99
100    /// Waits for ongoing incoming LNv1 and LNv2 payments to complete before
101    /// returning.
102    pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
103        for client in self.clients.values() {
104            let active_operations = client.value().get_active_operations().await;
105            let operation_log = client.value().operation_log();
106            for op_id in active_operations {
107                let log_entry = operation_log.get_operation(op_id).await;
108                if let Some(entry) = log_entry {
109                    match entry.operation_module_kind() {
110                        "lnv2" => {
111                            let lnv2 =
112                                client.value().get_first_module::<GatewayClientModuleV2>()?;
113                            lnv2.await_completion(op_id).await;
114                        }
115                        "ln" => {
116                            let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
117                            lnv1.await_completion(op_id).await;
118                        }
119                        _ => {}
120                    }
121                }
122            }
123        }
124
125        info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
126        Ok(())
127    }
128
129    async fn unannounce_from_federation(
130        &self,
131        federation_id: FederationId,
132        gateway_keypair: Keypair,
133    ) {
134        if let Ok(client) = self
135            .clients
136            .get(&federation_id)
137            .ok_or(FederationNotConnected {
138                federation_id_prefix: federation_id.to_prefix(),
139            })
140            && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
141        {
142            ln.remove_from_federation(gateway_keypair).await;
143        }
144    }
145
146    /// Iterates through all of the federations the gateway is registered with
147    /// and requests to remove the registration record.
148    pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
149        let removal_futures = self
150            .clients
151            .values()
152            .filter_map(|client| {
153                client
154                    .value()
155                    .get_first_module::<GatewayClientModule>()
156                    .ok()
157                    .map(|lnv1| async move {
158                        lnv1.remove_from_federation(gateway_keypair).await;
159                    })
160            })
161            .collect::<Vec<_>>();
162
163        futures::future::join_all(removal_futures).await;
164    }
165
166    pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
167        let federation_id = self.index_to_federation.get(&short_channel_id)?;
168        // TODO(tvolk131): Cloning the client here could cause issues with client
169        // shutdown (see `remove_client` above). Perhaps this function should take a
170        // lambda and pass it into `client.with_sync`.
171        match self.clients.get(federation_id).cloned() {
172            Some(client) => Some(client),
173            _ => {
174                panic!(
175                    "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
176                );
177            }
178        }
179    }
180
181    pub fn get_client_for_federation_id_prefix(
182        &self,
183        federation_id_prefix: FederationIdPrefix,
184    ) -> Option<Spanned<ClientHandleArc>> {
185        self.clients.iter().find_map(|(fid, client)| {
186            if fid.to_prefix() == federation_id_prefix {
187                Some(client.clone())
188            } else {
189                None
190            }
191        })
192    }
193
194    pub fn has_federation(&self, federation_id: FederationId) -> bool {
195        self.clients.contains_key(&federation_id)
196    }
197
198    pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
199        self.clients.get(federation_id)
200    }
201
202    pub async fn federation_info(
203        &self,
204        federation_id: FederationId,
205        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
206    ) -> std::result::Result<FederationInfo, FederationNotConnected> {
207        self.clients
208            .get(&federation_id)
209            .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
210            .borrow()
211            .with(|client| async move {
212                let balance_msat = client.get_balance_for_btc().await
213                    // If primary module is not available, we're not really connected yet
214                    .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
215
216                let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
217                    federation_id_prefix: federation_id.to_prefix(),
218                })?;
219                let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
220                    federation_id_prefix: federation_id.to_prefix(),
221                })?;
222
223                Ok(FederationInfo {
224                    federation_id,
225                    federation_name: self.federation_name(client).await,
226                    balance_msat,
227                    config,
228                    last_backup_time,
229                })
230            })
231            .await
232    }
233
234    pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
235        let client_config = client.config().await;
236        let federation_name = client_config.global.federation_name();
237        federation_name.map(String::from)
238    }
239
240    pub async fn federation_info_all_federations(
241        &self,
242        mut dbtx: DatabaseTransaction<'_, NonCommittable>,
243    ) -> Vec<FederationInfo> {
244        let mut federation_infos = Vec::new();
245        for (federation_id, client) in &self.clients {
246            let balance_msat = match client
247                .borrow()
248                .with(|client| client.get_balance_for_btc())
249                .await
250            {
251                Ok(balance_msat) => balance_msat,
252                Err(err) => {
253                    warn!(
254                        target: LOG_GATEWAY,
255                        err = %err.fmt_compact_anyhow(),
256                        "Skipped Federation due to lack of primary module"
257                    );
258                    continue;
259                }
260            };
261
262            let config = dbtx.load_federation_config(*federation_id).await;
263            let last_backup_time = dbtx
264                .load_backup_record(*federation_id)
265                .await
266                .unwrap_or_default();
267            if let Some(config) = config {
268                federation_infos.push(FederationInfo {
269                    federation_id: *federation_id,
270                    federation_name: self.federation_name(client.value()).await,
271                    balance_msat,
272                    config,
273                    last_backup_time,
274                });
275            }
276        }
277        federation_infos
278    }
279
280    pub async fn get_federation_config(
281        &self,
282        federation_id: FederationId,
283    ) -> AdminResult<JsonClientConfig> {
284        let client = self
285            .clients
286            .get(&federation_id)
287            .ok_or(FederationNotConnected {
288                federation_id_prefix: federation_id.to_prefix(),
289            })?;
290        Ok(client
291            .borrow()
292            .with(|client| client.get_config_json())
293            .await)
294    }
295
296    pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
297        let mut federations = BTreeMap::new();
298        for (federation_id, client) in &self.clients {
299            federations.insert(
300                *federation_id,
301                client
302                    .borrow()
303                    .with(|client| client.get_config_json())
304                    .await,
305            );
306        }
307        federations
308    }
309
310    pub async fn backup_federation(
311        &self,
312        federation_id: &FederationId,
313        dbtx: &mut DatabaseTransaction<'_, Committable>,
314        now: SystemTime,
315    ) {
316        if let Some(client) = self.client(federation_id) {
317            let metadata: BTreeMap<String, String> = BTreeMap::new();
318            if client
319                .value()
320                .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
321                    metadata,
322                ))
323                .await
324                .is_ok()
325            {
326                dbtx.save_federation_backup_record(*federation_id, Some(now))
327                    .await;
328                info!(federation_id = %federation_id, "Successfully backed up federation");
329            }
330        }
331    }
332
333    // TODO(tvolk131): Set this value in the constructor.
334    pub fn set_next_index(&self, next_index: u64) {
335        self.next_index.store(next_index, Ordering::SeqCst);
336    }
337
338    pub fn pop_next_index(&self) -> AdminResult<u64> {
339        let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
340
341        // Check for overflow.
342        if next_index == INITIAL_INDEX.wrapping_sub(1) {
343            return Err(AdminGatewayError::GatewayConfigurationError(
344                "Federation Index overflow".to_string(),
345            ));
346        }
347
348        Ok(next_index)
349    }
350}