fedimint_gateway_server/
federation_manager.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bitcoin::secp256k1::Keypair;
6use fedimint_client::ClientHandleArc;
7use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
8use fedimint_core::db::{DatabaseTransaction, NonCommittable};
9use fedimint_core::util::Spanned;
10use fedimint_gateway_common::FederationInfo;
11use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
12use fedimint_gw_client::GatewayClientModule;
13use fedimint_gwv2_client::GatewayClientModuleV2;
14use fedimint_logging::LOG_GATEWAY;
15use tracing::info;
16
17use crate::AdminResult;
18use crate::error::{AdminGatewayError, FederationNotConnected};
19
20/// The first index that the gateway will assign to a federation.
21/// Note: This starts at 1 because LNv1 uses the `federation_index` as an SCID.
22/// An SCID of 0 is considered invalid by LND's HTLC interceptor.
23const INITIAL_INDEX: u64 = 1;
24
25// TODO: Add support for client lookup by payment hash (for LNv2).
26#[derive(Debug)]
27pub struct FederationManager {
28    /// Map of `FederationId` -> `Client`. Used for efficient retrieval of the
29    /// client while handling incoming HTLCs.
30    clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
31
32    /// Map of federation indices to `FederationId`. Use for efficient retrieval
33    /// of the client while handling incoming HTLCs.
34    /// Can be removed after LNv1 removal.
35    index_to_federation: BTreeMap<u64, FederationId>,
36
37    /// Tracker for federation index assignments. When connecting a new
38    /// federation, this value is incremented and assigned to the federation
39    /// as the `federation_index`
40    next_index: AtomicU64,
41}
42
43impl FederationManager {
44    pub fn new() -> Self {
45        Self {
46            clients: BTreeMap::new(),
47            index_to_federation: BTreeMap::new(),
48            next_index: AtomicU64::new(INITIAL_INDEX),
49        }
50    }
51
52    pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
53        let federation_id = client.borrow().with_sync(|c| c.federation_id());
54        self.clients.insert(federation_id, client);
55        self.index_to_federation.insert(index, federation_id);
56    }
57
58    pub async fn leave_federation(
59        &mut self,
60        federation_id: FederationId,
61        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
62    ) -> AdminResult<FederationInfo> {
63        let federation_info = self.federation_info(federation_id, dbtx).await?;
64
65        let gateway_keypair = dbtx.load_gateway_keypair_assert_exists().await;
66
67        self.unannounce_from_federation(federation_id, gateway_keypair)
68            .await?;
69
70        self.remove_client(federation_id).await?;
71
72        Ok(federation_info)
73    }
74
75    async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
76        let client = self
77            .clients
78            .remove(&federation_id)
79            .ok_or(FederationNotConnected {
80                federation_id_prefix: federation_id.to_prefix(),
81            })?
82            .into_value();
83
84        self.index_to_federation
85            .retain(|_, fid| *fid != federation_id);
86
87        match Arc::into_inner(client) {
88            Some(client) => {
89                client.shutdown().await;
90                Ok(())
91            }
92            _ => Err(AdminGatewayError::ClientRemovalError(format!(
93                "Federation client {federation_id} is not unique, failed to shutdown client"
94            ))),
95        }
96    }
97
98    /// Waits for ongoing incoming LNv1 and LNv2 payments to complete before
99    /// returning.
100    pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
101        for client in self.clients.values() {
102            let active_operations = client.value().get_active_operations().await;
103            let operation_log = client.value().operation_log();
104            for op_id in active_operations {
105                let log_entry = operation_log.get_operation(op_id).await;
106                if let Some(entry) = log_entry {
107                    match entry.operation_module_kind() {
108                        "lnv2" => {
109                            let lnv2 =
110                                client.value().get_first_module::<GatewayClientModuleV2>()?;
111                            lnv2.await_completion(op_id).await;
112                        }
113                        "ln" => {
114                            let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
115                            lnv1.await_completion(op_id).await;
116                        }
117                        _ => continue,
118                    }
119                }
120            }
121        }
122
123        info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
124        Ok(())
125    }
126
127    async fn unannounce_from_federation(
128        &self,
129        federation_id: FederationId,
130        gateway_keypair: Keypair,
131    ) -> AdminResult<()> {
132        let client = self
133            .clients
134            .get(&federation_id)
135            .ok_or(FederationNotConnected {
136                federation_id_prefix: federation_id.to_prefix(),
137            })?;
138
139        client
140            .value()
141            .get_first_module::<GatewayClientModule>()?
142            .remove_from_federation(gateway_keypair)
143            .await;
144
145        Ok(())
146    }
147
148    /// Iterates through all of the federations the gateway is registered with
149    /// and requests to remove the registration record.
150    pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
151        let removal_futures = self
152            .clients
153            .values()
154            .map(|client| async {
155                client
156                    .value()
157                    .get_first_module::<GatewayClientModule>()
158                    .expect("Must have client module")
159                    .remove_from_federation(gateway_keypair)
160                    .await;
161            })
162            .collect::<Vec<_>>();
163
164        futures::future::join_all(removal_futures).await;
165    }
166
167    pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
168        let federation_id = self.index_to_federation.get(&short_channel_id)?;
169        // TODO(tvolk131): Cloning the client here could cause issues with client
170        // shutdown (see `remove_client` above). Perhaps this function should take a
171        // lambda and pass it into `client.with_sync`.
172        match self.clients.get(federation_id).cloned() {
173            Some(client) => Some(client),
174            _ => {
175                panic!(
176                    "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
177                );
178            }
179        }
180    }
181
182    pub fn get_client_for_federation_id_prefix(
183        &self,
184        federation_id_prefix: FederationIdPrefix,
185    ) -> Option<Spanned<ClientHandleArc>> {
186        self.clients.iter().find_map(|(fid, client)| {
187            if fid.to_prefix() == federation_id_prefix {
188                Some(client.clone())
189            } else {
190                None
191            }
192        })
193    }
194
195    pub fn has_federation(&self, federation_id: FederationId) -> bool {
196        self.clients.contains_key(&federation_id)
197    }
198
199    pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
200        self.clients.get(federation_id)
201    }
202
203    pub async fn federation_info(
204        &self,
205        federation_id: FederationId,
206        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
207    ) -> std::result::Result<FederationInfo, FederationNotConnected> {
208        self.clients
209            .get(&federation_id)
210            .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
211            .borrow()
212            .with(|client| async move {
213                let balance_msat = client.get_balance().await;
214
215                let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
216                    federation_id_prefix: federation_id.to_prefix(),
217                })?;
218
219                Ok(FederationInfo {
220                    federation_id,
221                    federation_name: self.federation_name(client).await,
222                    balance_msat,
223                    config,
224                })
225            })
226            .await
227    }
228
229    pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
230        let client_config = client.config().await;
231        let federation_name = client_config.global.federation_name();
232        federation_name.map(String::from)
233    }
234
235    pub async fn federation_info_all_federations(
236        &self,
237        mut dbtx: DatabaseTransaction<'_, NonCommittable>,
238    ) -> Vec<FederationInfo> {
239        let mut federation_infos = Vec::new();
240        for (federation_id, client) in &self.clients {
241            let balance_msat = client.borrow().with(|client| client.get_balance()).await;
242
243            let config = dbtx.load_federation_config(*federation_id).await;
244            if let Some(config) = config {
245                federation_infos.push(FederationInfo {
246                    federation_id: *federation_id,
247                    federation_name: self.federation_name(client.value()).await,
248                    balance_msat,
249                    config,
250                });
251            }
252        }
253        federation_infos
254    }
255
256    pub async fn get_federation_config(
257        &self,
258        federation_id: FederationId,
259    ) -> AdminResult<JsonClientConfig> {
260        let client = self
261            .clients
262            .get(&federation_id)
263            .ok_or(FederationNotConnected {
264                federation_id_prefix: federation_id.to_prefix(),
265            })?;
266        Ok(client
267            .borrow()
268            .with(|client| client.get_config_json())
269            .await)
270    }
271
272    pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
273        let mut federations = BTreeMap::new();
274        for (federation_id, client) in &self.clients {
275            federations.insert(
276                *federation_id,
277                client
278                    .borrow()
279                    .with(|client| client.get_config_json())
280                    .await,
281            );
282        }
283        federations
284    }
285
286    // TODO(tvolk131): Set this value in the constructor.
287    pub fn set_next_index(&self, next_index: u64) {
288        self.next_index.store(next_index, Ordering::SeqCst);
289    }
290
291    pub fn pop_next_index(&self) -> AdminResult<u64> {
292        let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
293
294        // Check for overflow.
295        if next_index == INITIAL_INDEX.wrapping_sub(1) {
296            return Err(AdminGatewayError::GatewayConfigurationError(
297                "Federation Index overflow".to_string(),
298            ));
299        }
300
301        Ok(next_index)
302    }
303}