fedimint_gateway_server/
federation_manager.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bitcoin::secp256k1::Keypair;
6use fedimint_client::ClientHandleArc;
7use fedimint_core::config::{FederationId, FederationIdPrefix, JsonClientConfig};
8use fedimint_core::db::{DatabaseTransaction, NonCommittable};
9use fedimint_core::util::{FmtCompactAnyhow as _, Spanned};
10use fedimint_gateway_common::FederationInfo;
11use fedimint_gateway_server_db::GatewayDbtxNcExt as _;
12use fedimint_gw_client::GatewayClientModule;
13use fedimint_gwv2_client::GatewayClientModuleV2;
14use fedimint_logging::LOG_GATEWAY;
15use tracing::{info, warn};
16
17use crate::AdminResult;
18use crate::error::{AdminGatewayError, FederationNotConnected};
19
20const INITIAL_INDEX: u64 = 1;
24
25#[derive(Debug)]
27pub struct FederationManager {
28 clients: BTreeMap<FederationId, Spanned<fedimint_client::ClientHandleArc>>,
31
32 index_to_federation: BTreeMap<u64, FederationId>,
36
37 next_index: AtomicU64,
41}
42
43impl FederationManager {
44 pub fn new() -> Self {
45 Self {
46 clients: BTreeMap::new(),
47 index_to_federation: BTreeMap::new(),
48 next_index: AtomicU64::new(INITIAL_INDEX),
49 }
50 }
51
52 pub fn add_client(&mut self, index: u64, client: Spanned<fedimint_client::ClientHandleArc>) {
53 let federation_id = client.borrow().with_sync(|c| c.federation_id());
54 self.clients.insert(federation_id, client);
55 self.index_to_federation.insert(index, federation_id);
56 }
57
58 pub async fn leave_federation(
59 &mut self,
60 federation_id: FederationId,
61 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
62 ) -> AdminResult<FederationInfo> {
63 let federation_info = self.federation_info(federation_id, dbtx).await?;
64
65 let gateway_keypair = dbtx.load_gateway_keypair_assert_exists().await;
66
67 self.unannounce_from_federation(federation_id, gateway_keypair)
68 .await;
69
70 self.remove_client(federation_id).await?;
71
72 Ok(federation_info)
73 }
74
75 async fn remove_client(&mut self, federation_id: FederationId) -> AdminResult<()> {
76 let client = self
77 .clients
78 .remove(&federation_id)
79 .ok_or(FederationNotConnected {
80 federation_id_prefix: federation_id.to_prefix(),
81 })?
82 .into_value();
83
84 self.index_to_federation
85 .retain(|_, fid| *fid != federation_id);
86
87 match Arc::into_inner(client) {
88 Some(client) => {
89 client.shutdown().await;
90 Ok(())
91 }
92 _ => Err(AdminGatewayError::ClientRemovalError(format!(
93 "Federation client {federation_id} is not unique, failed to shutdown client"
94 ))),
95 }
96 }
97
98 pub async fn wait_for_incoming_payments(&self) -> AdminResult<()> {
101 for client in self.clients.values() {
102 let active_operations = client.value().get_active_operations().await;
103 let operation_log = client.value().operation_log();
104 for op_id in active_operations {
105 let log_entry = operation_log.get_operation(op_id).await;
106 if let Some(entry) = log_entry {
107 match entry.operation_module_kind() {
108 "lnv2" => {
109 let lnv2 =
110 client.value().get_first_module::<GatewayClientModuleV2>()?;
111 lnv2.await_completion(op_id).await;
112 }
113 "ln" => {
114 let lnv1 = client.value().get_first_module::<GatewayClientModule>()?;
115 lnv1.await_completion(op_id).await;
116 }
117 _ => {}
118 }
119 }
120 }
121 }
122
123 info!(target: LOG_GATEWAY, "Finished waiting for incoming payments");
124 Ok(())
125 }
126
127 async fn unannounce_from_federation(
128 &self,
129 federation_id: FederationId,
130 gateway_keypair: Keypair,
131 ) {
132 if let Ok(client) = self
133 .clients
134 .get(&federation_id)
135 .ok_or(FederationNotConnected {
136 federation_id_prefix: federation_id.to_prefix(),
137 })
138 && let Ok(ln) = client.value().get_first_module::<GatewayClientModule>()
139 {
140 ln.remove_from_federation(gateway_keypair).await;
141 }
142 }
143
144 pub async fn unannounce_from_all_federations(&self, gateway_keypair: Keypair) {
147 let removal_futures = self
148 .clients
149 .values()
150 .map(|client| async {
151 client
152 .value()
153 .get_first_module::<GatewayClientModule>()
154 .expect("Must have client module")
155 .remove_from_federation(gateway_keypair)
156 .await;
157 })
158 .collect::<Vec<_>>();
159
160 futures::future::join_all(removal_futures).await;
161 }
162
163 pub fn get_client_for_index(&self, short_channel_id: u64) -> Option<Spanned<ClientHandleArc>> {
164 let federation_id = self.index_to_federation.get(&short_channel_id)?;
165 match self.clients.get(federation_id).cloned() {
169 Some(client) => Some(client),
170 _ => {
171 panic!(
172 "`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug."
173 );
174 }
175 }
176 }
177
178 pub fn get_client_for_federation_id_prefix(
179 &self,
180 federation_id_prefix: FederationIdPrefix,
181 ) -> Option<Spanned<ClientHandleArc>> {
182 self.clients.iter().find_map(|(fid, client)| {
183 if fid.to_prefix() == federation_id_prefix {
184 Some(client.clone())
185 } else {
186 None
187 }
188 })
189 }
190
191 pub fn has_federation(&self, federation_id: FederationId) -> bool {
192 self.clients.contains_key(&federation_id)
193 }
194
195 pub fn client(&self, federation_id: &FederationId) -> Option<&Spanned<ClientHandleArc>> {
196 self.clients.get(federation_id)
197 }
198
199 pub async fn federation_info(
200 &self,
201 federation_id: FederationId,
202 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
203 ) -> std::result::Result<FederationInfo, FederationNotConnected> {
204 self.clients
205 .get(&federation_id)
206 .expect("`FederationManager.index_to_federation` is out of sync with `FederationManager.clients`! This is a bug.")
207 .borrow()
208 .with(|client| async move {
209 let balance_msat = client.get_balance_for_btc().await
210 .map_err(|_err| FederationNotConnected { federation_id_prefix: federation_id.to_prefix() })?;
212
213 let config = dbtx.load_federation_config(federation_id).await.ok_or(FederationNotConnected {
214 federation_id_prefix: federation_id.to_prefix(),
215 })?;
216
217 Ok(FederationInfo {
218 federation_id,
219 federation_name: self.federation_name(client).await,
220 balance_msat,
221 config,
222 })
223 })
224 .await
225 }
226
227 pub async fn federation_name(&self, client: &ClientHandleArc) -> Option<String> {
228 let client_config = client.config().await;
229 let federation_name = client_config.global.federation_name();
230 federation_name.map(String::from)
231 }
232
233 pub async fn federation_info_all_federations(
234 &self,
235 mut dbtx: DatabaseTransaction<'_, NonCommittable>,
236 ) -> Vec<FederationInfo> {
237 let mut federation_infos = Vec::new();
238 for (federation_id, client) in &self.clients {
239 let balance_msat = match client
240 .borrow()
241 .with(|client| client.get_balance_for_btc())
242 .await
243 {
244 Ok(balance_msat) => balance_msat,
245 Err(err) => {
246 warn!(
247 target: LOG_GATEWAY,
248 err = %err.fmt_compact_anyhow(),
249 "Skipped Federation due to lack of primary module"
250 );
251 continue;
252 }
253 };
254
255 let config = dbtx.load_federation_config(*federation_id).await;
256 if let Some(config) = config {
257 federation_infos.push(FederationInfo {
258 federation_id: *federation_id,
259 federation_name: self.federation_name(client.value()).await,
260 balance_msat,
261 config,
262 });
263 }
264 }
265 federation_infos
266 }
267
268 pub async fn get_federation_config(
269 &self,
270 federation_id: FederationId,
271 ) -> AdminResult<JsonClientConfig> {
272 let client = self
273 .clients
274 .get(&federation_id)
275 .ok_or(FederationNotConnected {
276 federation_id_prefix: federation_id.to_prefix(),
277 })?;
278 Ok(client
279 .borrow()
280 .with(|client| client.get_config_json())
281 .await)
282 }
283
284 pub async fn get_all_federation_configs(&self) -> BTreeMap<FederationId, JsonClientConfig> {
285 let mut federations = BTreeMap::new();
286 for (federation_id, client) in &self.clients {
287 federations.insert(
288 *federation_id,
289 client
290 .borrow()
291 .with(|client| client.get_config_json())
292 .await,
293 );
294 }
295 federations
296 }
297
298 pub fn set_next_index(&self, next_index: u64) {
300 self.next_index.store(next_index, Ordering::SeqCst);
301 }
302
303 pub fn pop_next_index(&self) -> AdminResult<u64> {
304 let next_index = self.next_index.fetch_add(1, Ordering::Relaxed);
305
306 if next_index == INITIAL_INDEX.wrapping_sub(1) {
308 return Err(AdminGatewayError::GatewayConfigurationError(
309 "Federation Index overflow".to_string(),
310 ));
311 }
312
313 Ok(next_index)
314 }
315}