fedimint_gateway_server/
federation_manager.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
11use fedimint_gateway_common::FederationInfo;
12use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
13use fedimint_gw_client::GatewayClientModule;
14use fedimint_gwv2_client::GatewayClientModuleV2;
15use fedimint_logging::LOG_GATEWAY;
16use tracing::{info, warn};
17
18use crate::AdminResult;
19use crate::error::{AdminGatewayError, FederationNotConnected};
20
21/// The first index that the gateway will assign to a federation.
22/// Note: This starts at 1 because LNv1 uses the `federation_index` as an SCID.
23/// An SCID of 0 is considered invalid by LND's HTLC interceptor.
24const INITIAL_INDEX: u64 = 1;
25
26// TODO: Add support for client lookup by payment hash (for LNv2).
27#[derive(Debug)]
28pub struct FederationManager {
29    /// Map of `FederationId` -> `Client`. Used for efficient retrieval of the
30    /// client while handling incoming HTLCs.
31    clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
32
33    /// Map of federation indices to `FederationId`. Use for efficient retrieval
34    /// of the client while handling incoming HTLCs.
35    /// Can be removed after LNv1 removal.
36    index_to_federation: BTreeMap<u64, FederationId>,
37
38    /// Tracker for federation index assignments. When connecting a new
39    /// federation, this value is incremented and assigned to the federation
40    /// as the `federation_index`
41    next_index: AtomicU64,
42}
43
44impl FederationManager {
45    pub fn new() -> Self {
46        Self {
47            clients: BTreeMap::new(),
48            index_to_federation: BTreeMap::new(),
49            next_index: AtomicU64::new(INITIAL_INDEX),
50        }
51    }
52
53    pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
54        let federation_id = client.borrow().with_sync(|c| c.federation_id());
55        self.clients.insert(federation_id, client);
56        self.index_to_federation.insert(index, federation_id);
57    }
58
59    pub async fn leave_federation(
60        &mut self,
61        federation_id: FederationId,
62        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
63    ) -> AdminResult<FederationInfo> {
64        let federation_info = self.federation_info(federation_id, dbtx).await?;
65
66        let gateway_keypair = dbtx.load_gateway_keypair_assert_exists().await;
67
68        self.unannounce_from_federation(federation_id, gateway_keypair)
69            .await;
70
71        self.remove_client(federation_id).await?;
72
73        Ok(federation_info)
74    }
75
76    async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
77        let client = self
78            .clients
79            .remove(&federation_id)
80            .ok_or(FederationNotConnected {
81                federation_id_prefix: federation_id.to_prefix(),
82            })?
83            .into_value();
84
85        self.index_to_federation
86            .retain(|_, fid| *fid != federation_id);
87
88        match Arc::into_inner(client) {
89            Some(client) => {
90                client.shutdown().await;
91                Ok(())
92            }
93            _ => Err(AdminGatewayError::ClientRemovalError(format!(
94                "Federation client {federation_id} is not unique, failed to shutdown client"
95            ))),
96        }
97    }
98
99    /// Waits for ongoing incoming LNv1 and LNv2 payments to complete before
100    /// returning.
101    pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
102        for client in self.clients.values() {
103            let active_operations = client.value().get_active_operations().await;
104            let operation_log = client.value().operation_log();
105            for op_id in active_operations {
106                let log_entry = operation_log.get_operation(op_id).await;
107                if let Some(entry) = log_entry {
108                    match entry.operation_module_kind() {
109                        "lnv2" => {
110                            let lnv2 =
111                                client.value().get_first_module::<GatewayClientModuleV2>()?;
112                            lnv2.await_completion(op_id).await;
113                        }
114                        "ln" => {
115                            let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
116                            lnv1.await_completion(op_id).await;
117                        }
118                        _ => {}
119                    }
120                }
121            }
122        }
123
124        info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
125        Ok(())
126    }
127
128    async fn unannounce_from_federation(
129        &self,
130        federation_id: FederationId,
131        gateway_keypair: Keypair,
132    ) {
133        if let Ok(client) = self
134            .clients
135            .get(&federation_id)
136            .ok_or(FederationNotConnected {
137                federation_id_prefix: federation_id.to_prefix(),
138            })
139            && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
140        {
141            ln.remove_from_federation(gateway_keypair).await;
142        }
143    }
144
145    /// Iterates through all of the federations the gateway is registered with
146    /// and requests to remove the registration record.
147    pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
148        let removal_futures = self
149            .clients
150            .values()
151            .map(|client| async {
152                client
153                    .value()
154                    .get_first_module::<GatewayClientModule>()
155                    .expect("Must have client module")
156                    .remove_from_federation(gateway_keypair)
157                    .await;
158            })
159            .collect::<Vec<_>>();
160
161        futures::future::join_all(removal_futures).await;
162    }
163
164    pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
165        let federation_id = self.index_to_federation.get(&short_channel_id)?;
166        // TODO(tvolk131): Cloning the client here could cause issues with client
167        // shutdown (see `remove_client` above). Perhaps this function should take a
168        // lambda and pass it into `client.with_sync`.
169        match self.clients.get(federation_id).cloned() {
170            Some(client) => Some(client),
171            _ => {
172                panic!(
173                    "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
174                );
175            }
176        }
177    }
178
179    pub fn get_client_for_federation_id_prefix(
180        &self,
181        federation_id_prefix: FederationIdPrefix,
182    ) -> Option<Spanned<ClientHandleArc>> {
183        self.clients.iter().find_map(|(fid, client)| {
184            if fid.to_prefix() == federation_id_prefix {
185                Some(client.clone())
186            } else {
187                None
188            }
189        })
190    }
191
192    pub fn has_federation(&self, federation_id: FederationId) -> bool {
193        self.clients.contains_key(&federation_id)
194    }
195
196    pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
197        self.clients.get(federation_id)
198    }
199
200    pub async fn federation_info(
201        &self,
202        federation_id: FederationId,
203        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
204    ) -> std::result::Result<FederationInfo, FederationNotConnected> {
205        self.clients
206            .get(&federation_id)
207            .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
208            .borrow()
209            .with(|client| async move {
210                let balance_msat = client.get_balance_for_btc().await
211                    // If primary module is not available, we're not really connected yet
212                    .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
213
214                let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
215                    federation_id_prefix: federation_id.to_prefix(),
216                })?;
217                let last_backup_time = dbtx.load_backup_record(federation_id).await.ok_or(FederationNotConnected {
218                    federation_id_prefix: federation_id.to_prefix(),
219                })?;
220
221                Ok(FederationInfo {
222                    federation_id,
223                    federation_name: self.federation_name(client).await,
224                    balance_msat,
225                    config,
226                    last_backup_time,
227                })
228            })
229            .await
230    }
231
232    pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
233        let client_config = client.config().await;
234        let federation_name = client_config.global.federation_name();
235        federation_name.map(String::from)
236    }
237
238    pub async fn federation_info_all_federations(
239        &self,
240        mut dbtx: DatabaseTransaction<'_, NonCommittable>,
241    ) -> Vec<FederationInfo> {
242        let mut federation_infos = Vec::new();
243        for (federation_id, client) in &self.clients {
244            let balance_msat = match client
245                .borrow()
246                .with(|client| client.get_balance_for_btc())
247                .await
248            {
249                Ok(balance_msat) => balance_msat,
250                Err(err) => {
251                    warn!(
252                        target: LOG_GATEWAY,
253                        err = %err.fmt_compact_anyhow(),
254                        "Skipped Federation due to lack of primary module"
255                    );
256                    continue;
257                }
258            };
259
260            let config = dbtx.load_federation_config(*federation_id).await;
261            let last_backup_time = dbtx
262                .load_backup_record(*federation_id)
263                .await
264                .unwrap_or_default();
265            if let Some(config) = config {
266                federation_infos.push(FederationInfo {
267                    federation_id: *federation_id,
268                    federation_name: self.federation_name(client.value()).await,
269                    balance_msat,
270                    config,
271                    last_backup_time,
272                });
273            }
274        }
275        federation_infos
276    }
277
278    pub async fn get_federation_config(
279        &self,
280        federation_id: FederationId,
281    ) -> AdminResult<JsonClientConfig> {
282        let client = self
283            .clients
284            .get(&federation_id)
285            .ok_or(FederationNotConnected {
286                federation_id_prefix: federation_id.to_prefix(),
287            })?;
288        Ok(client
289            .borrow()
290            .with(|client| client.get_config_json())
291            .await)
292    }
293
294    pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
295        let mut federations = BTreeMap::new();
296        for (federation_id, client) in &self.clients {
297            federations.insert(
298                *federation_id,
299                client
300                    .borrow()
301                    .with(|client| client.get_config_json())
302                    .await,
303            );
304        }
305        federations
306    }
307
308    pub async fn backup_federation(
309        &self,
310        federation_id: &FederationId,
311        dbtx: &mut DatabaseTransaction<'_, Committable>,
312        now: SystemTime,
313    ) {
314        if let Some(client) = self.client(federation_id) {
315            let metadata: BTreeMap<String, String> = BTreeMap::new();
316            if client
317                .value()
318                .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
319                    metadata,
320                ))
321                .await
322                .is_ok()
323            {
324                dbtx.save_federation_backup_record(*federation_id, Some(now))
325                    .await;
326                info!(federation_id = %federation_id, "Successfully backed up federation");
327            }
328        }
329    }
330
331    // TODO(tvolk131): Set this value in the constructor.
332    pub fn set_next_index(&self, next_index: u64) {
333        self.next_index.store(next_index, Ordering::SeqCst);
334    }
335
336    pub fn pop_next_index(&self) -> AdminResult<u64> {
337        let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
338
339        // Check for overflow.
340        if next_index == INITIAL_INDEX.wrapping_sub(1) {
341            return Err(AdminGatewayError::GatewayConfigurationError(
342                "Federation Index overflow".to_string(),
343            ));
344        }
345
346        Ok(next_index)
347    }
348}