Skip to main content

fedimint_gateway_server/
federation_manager.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::SystemTime;
5
6use bitcoin::secp256k1::Keypair;
7use fedimint_client::ClientHandleArc;
8use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
9use fedimint_core::db::{Committable, DatabaseTransaction, NonCommittable};
10use fedimint_core::invite_code::InviteCode;
11use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
12use fedimint_core::{PeerId, TieredCounts};
13use fedimint_gateway_common::FederationInfo;
14use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
15use fedimint_gw_client::GatewayClientModule;
16use fedimint_gwv2_client::GatewayClientModuleV2;
17use fedimint_logging::LOG_GATEWAY;
18use fedimint_mint_client::MintClientModule;
19use tracing::{info, warn};
20
21use crate::error::{AdminGatewayError, FederationNotConnected};
22use crate::{AdminResult, Registration};
23
24/// The first index that the gateway will assign to a federation.
25/// Note: This starts at 1 because LNv1 uses the `federation_index` as an SCID.
26/// An SCID of 0 is considered invalid by LND's HTLC interceptor.
27const INITIAL_INDEX: u64 = 1;
28
29// TODO: Add support for client lookup by payment hash (for LNv2).
30#[derive(Debug)]
31pub struct FederationManager {
32    /// Map of `FederationId` -> `Client`. Used for efficient retrieval of the
33    /// client while handling incoming HTLCs.
34    clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
35
36    /// Map of federation indices to `FederationId`. Use for efficient retrieval
37    /// of the client while handling incoming HTLCs.
38    /// Can be removed after LNv1 removal.
39    index_to_federation: BTreeMap<u64, FederationId>,
40
41    /// Tracker for federation index assignments. When connecting a new
42    /// federation, this value is incremented and assigned to the federation
43    /// as the `federation_index`
44    next_index: AtomicU64,
45}
46
47impl FederationManager {
48    pub fn new() -> Self {
49        Self {
50            clients: BTreeMap::new(),
51            index_to_federation: BTreeMap::new(),
52            next_index: AtomicU64::new(INITIAL_INDEX),
53        }
54    }
55
56    pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
57        let federation_id = client.borrow().with_sync(|c| c.federation_id());
58        self.clients.insert(federation_id, client);
59        self.index_to_federation.insert(index, federation_id);
60    }
61
62    pub async fn leave_federation(
63        &mut self,
64        federation_id: FederationId,
65        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
66        registrations: Vec<&Registration>,
67    ) -> AdminResult<FederationInfo> {
68        let federation_info = self.federation_info(federation_id, dbtx).await?;
69
70        for registration in registrations {
71            self.unannounce_from_federation(federation_id, registration.keypair)
72                .await;
73        }
74
75        self.remove_client(federation_id).await?;
76
77        Ok(federation_info)
78    }
79
80    async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
81        let client = self
82            .clients
83            .remove(&federation_id)
84            .ok_or(FederationNotConnected {
85                federation_id_prefix: federation_id.to_prefix(),
86            })?
87            .into_value();
88
89        self.index_to_federation
90            .retain(|_, fid| *fid != federation_id);
91
92        match Arc::into_inner(client) {
93            Some(client) => {
94                client.shutdown().await;
95                Ok(())
96            }
97            _ => Err(AdminGatewayError::ClientRemovalError(format!(
98                "Federation client {federation_id} is not unique, failed to shutdown client"
99            ))),
100        }
101    }
102
103    /// Waits for ongoing incoming LNv1 and LNv2 payments to complete before
104    /// returning.
105    pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
106        for client in self.clients.values() {
107            let active_operations = client.value().get_active_operations().await;
108            let operation_log = client.value().operation_log();
109            for op_id in active_operations {
110                let log_entry = operation_log.get_operation(op_id).await;
111                if let Some(entry) = log_entry {
112                    match entry.operation_module_kind() {
113                        "lnv2" => {
114                            let lnv2 =
115                                client.value().get_first_module::<GatewayClientModuleV2>()?;
116                            lnv2.await_completion(op_id).await;
117                        }
118                        "ln" => {
119                            let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
120                            lnv1.await_completion(op_id).await;
121                        }
122                        _ => {}
123                    }
124                }
125            }
126        }
127
128        info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
129        Ok(())
130    }
131
132    async fn unannounce_from_federation(
133        &self,
134        federation_id: FederationId,
135        gateway_keypair: Keypair,
136    ) {
137        if let Ok(client) = self
138            .clients
139            .get(&federation_id)
140            .ok_or(FederationNotConnected {
141                federation_id_prefix: federation_id.to_prefix(),
142            })
143            && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
144        {
145            ln.remove_from_federation(gateway_keypair).await;
146        }
147    }
148
149    /// Iterates through all of the federations the gateway is registered with
150    /// and requests to remove the registration record.
151    pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
152        let removal_futures = self
153            .clients
154            .values()
155            .filter_map(|client| {
156                client
157                    .value()
158                    .get_first_module::<GatewayClientModule>()
159                    .ok()
160                    .map(|lnv1| async move {
161                        lnv1.remove_from_federation(gateway_keypair).await;
162                    })
163            })
164            .collect::<Vec<_>>();
165
166        futures::future::join_all(removal_futures).await;
167    }
168
169    pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
170        let federation_id = self.index_to_federation.get(&short_channel_id)?;
171        // TODO(tvolk131): Cloning the client here could cause issues with client
172        // shutdown (see `remove_client` above). Perhaps this function should take a
173        // lambda and pass it into `client.with_sync`.
174        match self.clients.get(federation_id).cloned() {
175            Some(client) => Some(client),
176            _ => {
177                panic!(
178                    "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
179                );
180            }
181        }
182    }
183
184    pub fn get_client_for_federation_id_prefix(
185        &self,
186        federation_id_prefix: FederationIdPrefix,
187    ) -> Option<Spanned<ClientHandleArc>> {
188        self.clients.iter().find_map(|(fid, client)| {
189            if fid.to_prefix() == federation_id_prefix {
190                Some(client.clone())
191            } else {
192                None
193            }
194        })
195    }
196
197    pub fn has_federation(&self, federation_id: FederationId) -> bool {
198        self.clients.contains_key(&federation_id)
199    }
200
201    pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
202        self.clients.get(federation_id)
203    }
204
205    pub async fn federation_info(
206        &self,
207        federation_id: FederationId,
208        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
209    ) -> std::result::Result<FederationInfo, FederationNotConnected> {
210        self.clients
211            .get(&federation_id)
212            .ok_or(FederationNotConnected {
213                federation_id_prefix: federation_id.to_prefix(),
214            })?
215            .borrow()
216            .with(|client| async move {
217                let balance_msat = client
218                    .get_balance_for_btc()
219                    .await
220                    // If primary module is not available, we're not really connected yet
221                    .map_err(|_err| FederationNotConnected {
222                        federation_id_prefix: federation_id.to_prefix(),
223                    })?;
224
225                let config = dbtx.load_federation_config(federation_id).await.ok_or(
226                    FederationNotConnected {
227                        federation_id_prefix: federation_id.to_prefix(),
228                    },
229                )?;
230                let last_backup_time =
231                    dbtx.load_backup_record(federation_id)
232                        .await
233                        .ok_or(FederationNotConnected {
234                            federation_id_prefix: federation_id.to_prefix(),
235                        })?;
236
237                Ok(FederationInfo {
238                    federation_id,
239                    federation_name: self.federation_name(client).await,
240                    balance_msat,
241                    config,
242                    last_backup_time,
243                })
244            })
245            .await
246    }
247
248    pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
249        let client_config = client.config().await;
250        let federation_name = client_config.global.federation_name();
251        federation_name.map(String::from)
252    }
253
254    pub async fn federation_info_all_federations(
255        &self,
256        mut dbtx: DatabaseTransaction<'_, NonCommittable>,
257    ) -> Vec<FederationInfo> {
258        let mut federation_infos = Vec::new();
259        for (federation_id, client) in &self.clients {
260            let balance_msat = match client
261                .borrow()
262                .with(|client| client.get_balance_for_btc())
263                .await
264            {
265                Ok(balance_msat) => balance_msat,
266                Err(err) => {
267                    warn!(
268                        target: LOG_GATEWAY,
269                        err = %err.fmt_compact_anyhow(),
270                        "Skipped Federation due to lack of primary module"
271                    );
272                    continue;
273                }
274            };
275
276            let config = dbtx.load_federation_config(*federation_id).await;
277            let last_backup_time = dbtx
278                .load_backup_record(*federation_id)
279                .await
280                .unwrap_or_default();
281            if let Some(config) = config {
282                federation_infos.push(FederationInfo {
283                    federation_id: *federation_id,
284                    federation_name: self.federation_name(client.value()).await,
285                    balance_msat,
286                    config,
287                    last_backup_time,
288                });
289            }
290        }
291        federation_infos
292    }
293
294    pub async fn get_federation_config(
295        &self,
296        federation_id: FederationId,
297    ) -> AdminResult<JsonClientConfig> {
298        let client = self
299            .clients
300            .get(&federation_id)
301            .ok_or(FederationNotConnected {
302                federation_id_prefix: federation_id.to_prefix(),
303            })?;
304        Ok(client
305            .borrow()
306            .with(|client| client.get_config_json())
307            .await)
308    }
309
310    pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
311        let mut federations = BTreeMap::new();
312        for (federation_id, client) in &self.clients {
313            federations.insert(
314                *federation_id,
315                client
316                    .borrow()
317                    .with(|client| client.get_config_json())
318                    .await,
319            );
320        }
321        federations
322    }
323
324    pub async fn backup_federation(
325        &self,
326        federation_id: &FederationId,
327        dbtx: &mut DatabaseTransaction<'_, Committable>,
328        now: SystemTime,
329    ) {
330        if let Some(client) = self.client(federation_id) {
331            let metadata: BTreeMap<String, String> = BTreeMap::new();
332            #[allow(deprecated)]
333            if client
334                .value()
335                .backup_to_federation(fedimint_client::backup::Metadata::from_json_serialized(
336                    metadata,
337                ))
338                .await
339                .is_ok()
340            {
341                dbtx.save_federation_backup_record(*federation_id, Some(now))
342                    .await;
343                info!(federation_id = %federation_id, "Successfully backed up federation");
344            }
345        }
346    }
347
348    pub async fn all_invite_codes(
349        &self,
350    ) -> BTreeMap<FederationId, BTreeMap<PeerId, (String, InviteCode)>> {
351        let mut invite_codes = BTreeMap::new();
352
353        for (federation_id, client) in &self.clients {
354            let config = client.value().config().await;
355            let api_endpoints = &config.global.api_endpoints;
356
357            let mut fed_invite_codes = BTreeMap::new();
358            for (peer_id, peer_url) in api_endpoints {
359                if let Some(code) = client.value().invite_code(*peer_id).await {
360                    fed_invite_codes.insert(*peer_id, (peer_url.name.clone(), code));
361                }
362            }
363
364            invite_codes.insert(*federation_id, fed_invite_codes);
365        }
366
367        invite_codes
368    }
369
370    pub async fn get_note_summary(
371        &self,
372        federation_id: &FederationId,
373    ) -> AdminResult<TieredCounts> {
374        let client = self.client(federation_id).ok_or(FederationNotConnected {
375            federation_id_prefix: federation_id.to_prefix(),
376        })?;
377        let mint = client.value().get_first_module::<MintClientModule>()?;
378        let mut dbtx = mint.client_ctx.module_db().begin_transaction_nc().await;
379        let counts = mint.get_note_counts_by_denomination(&mut dbtx).await;
380        info!(target: LOG_GATEWAY, ?counts, "Note counts");
381        Ok(counts)
382    }
383
384    // TODO(tvolk131): Set this value in the constructor.
385    pub fn set_next_index(&self, next_index: u64) {
386        self.next_index.store(next_index, Ordering::SeqCst);
387    }
388
389    pub fn pop_next_index(&self) -> AdminResult<u64> {
390        let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
391
392        // Check for overflow.
393        if next_index == INITIAL_INDEX.wrapping_sub(1) {
394            return Err(AdminGatewayError::GatewayConfigurationError(
395                "Federation Index overflow".to_string(),
396            ));
397        }
398
399        Ok(next_index)
400    }
401}