1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
42use crate::{poll_almost_equal, poll_eq, vars};
43
44pub const PORTS_PER_FEDIMINTD: u16 = 4;
47pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58 pub members: BTreeMap<usize, Fedimintd>,
60 pub vars: BTreeMap<usize, vars::Fedimintd>,
61 pub bitcoind: Bitcoind,
62
63 client: JitTryAnyhow<Client>,
65 #[allow(dead_code)] connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70 fn drop(&mut self) {
71 block_in_place(|| {
72 block_on(async {
73 let mut set = JoinSet::new();
74
75 while let Some((_id, fedimintd)) = self.members.pop_first() {
76 set.spawn(async { drop(fedimintd) });
77 }
78 while (set.join_next().await).is_some() {}
79 });
80 });
81 }
82}
83#[derive(Clone)]
85pub struct Client {
86 name: String,
87}
88
89impl Client {
90 fn clients_dir() -> PathBuf {
91 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92 .expect("FM_DATA_DIR_ENV not set")
93 .parse()
94 .expect("FM_DATA_DIR_ENV invalid");
95 data_dir.join("clients")
96 }
97
98 fn client_dir(&self) -> PathBuf {
99 Self::clients_dir().join(&self.name)
100 }
101
102 pub fn client_name_lock(name: &str) -> Result<FileLock> {
103 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104 let file_lock = std::fs::OpenOptions::new()
105 .write(true)
106 .create(true)
107 .truncate(true)
108 .open(&lock_path)
109 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111 fs_lock::FileLock::new_exclusive(file_lock)
112 .with_context(|| format!("Failed to lock {}", lock_path.display()))
113 }
114
115 pub async fn create(name: impl ToString) -> Result<Client> {
117 let name = name.to_string();
118 spawn_blocking(move || {
119 let _lock = Self::client_name_lock(&name);
120 for i in 0u64.. {
121 let client = Self {
122 name: format!("{name}-{i}"),
123 };
124
125 if !client.client_dir().exists() {
126 std::fs::create_dir_all(client.client_dir())?;
127 return Ok(client);
128 }
129 }
130 unreachable!()
131 })
132 .await?
133 }
134
135 pub fn open_or_create(name: &str) -> Result<Client> {
137 block_in_place(|| {
138 let _lock = Self::client_name_lock(name);
139 let client = Self {
140 name: format!("{name}-0"),
141 };
142 if !client.client_dir().exists() {
143 std::fs::create_dir_all(client.client_dir())?;
144 }
145 Ok(client)
146 })
147 }
148
149 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152 cmd!(self, "join-federation", invite_code).run().await?;
153
154 Ok(())
155 }
156
157 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160 cmd!(
161 self,
162 "restore",
163 "--invite-code",
164 invite_code,
165 "--mnemonic",
166 mnemonic
167 )
168 .run()
169 .await?;
170
171 Ok(())
172 }
173
174 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176 let restored = Self::open_or_create(name)?;
177
178 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179 .as_str()
180 .unwrap()
181 .to_owned();
182
183 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184 cmd!(
185 restored,
186 "restore",
187 "--invite-code",
188 invite_code,
189 "--mnemonic",
190 mnemonic
191 )
192 .run()
193 .await?;
194
195 Ok(restored)
196 }
197
198 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201 let new = Client::create(name).await?;
202
203 cmd!(
204 "cp",
205 "-R",
206 self.client_dir().join("client.db").display(),
207 new.client_dir().display()
208 )
209 .run()
210 .await?;
211
212 Ok(new)
213 }
214
215 pub async fn balance(&self) -> Result<u64> {
216 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217 .as_u64()
218 .unwrap())
219 }
220
221 pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
223 loop {
224 cmd!(self, "dev", "wait", "3").out_json().await?;
225
226 let balance = self.balance().await?;
227 if balance >= min_balance_msat {
228 return Ok(());
229 }
230
231 info!(
232 target: LOG_DEVIMINT,
233 balance,
234 min_balance_msat,
235 "Waiting for client balance to reach minimum"
236 );
237 }
238 }
239
240 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
241 if crate::util::supports_wallet_v2() {
242 let address = cmd!(self, "module", "walletv2", "receive")
243 .out_json()
244 .await?;
245 Ok((address.as_str().unwrap().to_string(), String::new()))
247 } else {
248 let deposit = cmd!(self, "deposit-address").out_json().await?;
249 Ok((
250 deposit["address"].as_str().unwrap().to_string(),
251 deposit["operation_id"].as_str().unwrap().to_string(),
252 ))
253 }
254 }
255
256 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
257 cmd!(self, "await-deposit", operation_id).run().await
258 }
259
260 pub fn cmd(&self) -> Command {
261 cmd!(
262 crate::util::get_fedimint_cli_path(),
263 format!("--data-dir={}", self.client_dir().display())
264 )
265 }
266
267 pub fn get_name(&self) -> &str {
268 &self.name
269 }
270
271 pub async fn get_session_count(&self) -> Result<u64> {
273 cmd!(self, "dev", "session-count").out_json().await?["count"]
274 .as_u64()
275 .context("count field wasn't a number")
276 }
277
278 pub async fn wait_complete(&self) -> Result<()> {
280 cmd!(self, "dev", "wait-complete").run().await
281 }
282
283 pub async fn wait_session(&self) -> anyhow::Result<()> {
285 info!("Waiting for a new session");
286 let session_count = self.get_session_count().await?;
287 self.wait_session_outcome(session_count).await?;
288 Ok(())
289 }
290
291 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
293 let timeout = {
294 let current_session_count = self.get_session_count().await?;
295 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
296 let session_duration_seconds = 180;
297 Duration::from_secs(sessions_to_wait * session_duration_seconds)
298 };
299
300 let start = Instant::now();
301 poll_with_timeout("Waiting for a new session", timeout, || async {
302 info!("Awaiting session outcome {session_count}");
303 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
304 .run()
305 .await
306 {
307 Err(e) => Err(ControlFlow::Continue(e)),
308 Ok(()) => Ok(()),
309 }
310 })
311 .await?;
312
313 let session_found_in = start.elapsed();
314 info!("session found in {session_found_in:?}");
315 Ok(())
316 }
317}
318
319impl Federation {
320 pub async fn new(
321 process_mgr: &ProcessManager,
322 bitcoind: Bitcoind,
323 skip_setup: bool,
324 pre_dkg: bool,
325 fed_index: usize,
327 federation_name: String,
328 ) -> Result<Self> {
329 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
330 let mut members = BTreeMap::new();
331 let mut peer_to_env_vars_map = BTreeMap::new();
332
333 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
334 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
335
336 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
337 for peer_id in num_peers.peer_ids() {
338 let peer_env_vars = vars::Fedimintd::init(
339 &process_mgr.globals,
340 federation_name.clone(),
341 peer_id,
342 process_mgr
343 .globals
344 .fedimintd_overrides
345 .peer_expect(fed_index, peer_id),
346 )
347 .await?;
348 members.insert(
349 peer_id.to_usize(),
350 Fedimintd::new(
351 process_mgr,
352 bitcoind.clone(),
353 peer_id.to_usize(),
354 &peer_env_vars,
355 federation_name.clone(),
356 )
357 .await?,
358 );
359 let admin_client = DynGlobalApi::new_admin_setup(
360 connectors.clone(),
361 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
362 )?;
365 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
366 admin_clients.insert(peer_id, admin_client);
367 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
368 }
369
370 if !skip_setup && !pre_dkg {
371 let (original_fedimint_cli_path, original_fm_mint_client) =
374 crate::util::use_matching_fedimint_cli_for_dkg().await?;
375
376 run_cli_dkg_v2(api_endpoints).await?;
377
378 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
380
381 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
383 let invite_code_filename_original = "invite-code";
384
385 for peer_env_vars in peer_to_env_vars_map.values() {
386 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
387
388 let invite_code = poll_simple("awaiting-invite-code", || async {
389 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
390 tokio::fs::read_to_string(&path)
391 .await
392 .with_context(|| format!("Awaiting invite code file: {path}"))
393 })
394 .await
395 .context("Awaiting invite code file")?;
396
397 download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
398 .await?;
399 }
400
401 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
403
404 tokio::fs::copy(
405 format!("{peer_data_dir}/{invite_code_filename_original}"),
406 format!("{client_dir}/{invite_code_filename_original}"),
407 )
408 .await
409 .context("copying invite-code file")?;
410
411 for (index, peer_env_vars) in &peer_to_env_vars_map {
414 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
415
416 let invite_code_filename_indexed =
417 format!("{invite_code_filename_original}-{index}");
418 tokio::fs::rename(
419 format!("{peer_data_dir}/{invite_code_filename_original}"),
420 format!("{client_dir}/{invite_code_filename_indexed}"),
421 )
422 .await
423 .context("moving invite-code file")?;
424 }
425
426 debug!("Moved invite-code files to client data directory");
427 }
428
429 let client = JitTryAnyhow::new_try({
430 move || async move {
431 let client = Client::open_or_create(federation_name.as_str())?;
432 let invite_code = Self::invite_code_static()?;
433 if !skip_setup && !pre_dkg {
434 cmd!(client, "join-federation", invite_code).run().await?;
435 }
436 Ok(client)
437 }
438 });
439
440 Ok(Self {
441 members,
442 vars: peer_to_env_vars_map,
443 bitcoind,
444 client,
445 connectors,
446 })
447 }
448
449 pub fn client_config(&self) -> Result<ClientConfig> {
450 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
451 load_from_file(&cfg_path)
452 }
453
454 pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
456 self.client_config()?
457 .modules
458 .iter()
459 .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
460 .with_context(|| format!("Module kind {kind} not found"))
461 }
462
463 pub fn module_client_config<M: ClientModule>(
464 &self,
465 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
466 self.client_config()?
467 .modules
468 .iter()
469 .find_map(|(module_instance_id, module_cfg)| {
470 if module_cfg.kind == M::kind() {
471 let decoders = ModuleDecoderRegistry::new(vec![(
472 *module_instance_id,
473 M::kind(),
474 M::decoder(),
475 )]);
476 Some(
477 module_cfg
478 .config
479 .clone()
480 .redecode_raw(&decoders)
481 .expect("Decoding client cfg failed")
482 .expect_decoded_ref()
483 .as_any()
484 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
485 .cloned()
486 .context("Cast to module config failed"),
487 )
488 } else {
489 None
490 }
491 })
492 .transpose()
493 }
494
495 pub fn deposit_fees(&self) -> Result<Amount> {
496 if crate::util::supports_wallet_v2() {
497 Ok(self
498 .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
499 .context("No walletv2 module found")?
500 .fee_consensus
501 .base)
502 } else {
503 Ok(self
504 .module_client_config::<WalletClientModule>()?
505 .context("No wallet module found")?
506 .fee_consensus
507 .peg_in_abs)
508 }
509 }
510
511 pub fn invite_code(&self) -> Result<String> {
513 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
514 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
515 Ok(invite_code)
516 }
517
518 pub fn invite_code_static() -> Result<String> {
519 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
520 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
521 Ok(invite_code)
522 }
523 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
524 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
525 let name = format!("invite-code-{peer_id}");
526 let invite_code = fs::read_to_string(data_dir.join(name))?;
527 Ok(invite_code)
528 }
529
530 pub async fn internal_client(&self) -> Result<&Client> {
534 self.client
535 .get_try()
536 .await
537 .context("Internal client joining Federation")
538 }
539
540 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
542 let client = Client::create(name).await?;
543 client.join_federation(self.invite_code()?).await?;
544 Ok(client)
545 }
546
547 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
548 if self.members.contains_key(&peer) {
549 bail!("fedimintd-{peer} already running");
550 }
551 self.members.insert(
552 peer,
553 Fedimintd::new(
554 process_mgr,
555 self.bitcoind.clone(),
556 peer,
557 &self.vars[&peer],
558 "default".to_string(),
559 )
560 .await?,
561 );
562 Ok(())
563 }
564
565 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
566 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
567 bail!("fedimintd-{peer_id} does not exist");
568 };
569 fedimintd.terminate().await?;
570 Ok(())
571 }
572
573 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
574 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
575 bail!("fedimintd-{peer_id} does not exist");
576 };
577 fedimintd.await_terminated().await?;
578 self.members.remove(&peer_id);
579 Ok(())
580 }
581
582 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
584 info!("starting all servers");
585 let fed_size = process_mgr.globals.FM_FED_SIZE;
586 for peer_id in 0..fed_size {
587 if self.members.contains_key(&peer_id) {
588 continue;
589 }
590 self.start_server(process_mgr, peer_id).await?;
591 }
592 self.await_all_peers().await?;
593 Ok(())
594 }
595
596 pub async fn terminate_all_servers(&mut self) -> Result<()> {
598 info!("terminating all servers");
599 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
600 for peer_id in running_peer_ids {
601 self.terminate_server(peer_id).await?;
602 }
603 Ok(())
604 }
605
606 pub async fn restart_all_staggered_with_bin(
611 &mut self,
612 process_mgr: &ProcessManager,
613 bin_path: &PathBuf,
614 ) -> Result<()> {
615 let fed_size = process_mgr.globals.FM_FED_SIZE;
616
617 self.start_all_servers(process_mgr).await?;
619
620 while self.num_members() > 0 {
622 self.terminate_server(self.num_members() - 1).await?;
623 if self.num_members() > 0 {
624 fedimint_core::task::sleep_in_test(
625 "waiting to shutdown remaining peers",
626 Duration::from_secs(10),
627 )
628 .await;
629 }
630 }
631
632 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634
635 for peer_id in 0..fed_size {
637 self.start_server(process_mgr, peer_id).await?;
638 if peer_id < fed_size - 1 {
639 fedimint_core::task::sleep_in_test(
640 "waiting to restart remaining peers",
641 Duration::from_secs(10),
642 )
643 .await;
644 }
645 }
646
647 self.await_all_peers().await?;
648
649 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
650 info!("upgraded fedimintd to version: {}", fedimintd_version);
651 Ok(())
652 }
653
654 pub async fn restart_all_with_bin(
655 &mut self,
656 process_mgr: &ProcessManager,
657 bin_path: &PathBuf,
658 ) -> Result<()> {
659 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
661 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
663 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
665
666 self.restart_all_staggered_with_bin(process_mgr, bin_path)
667 .await
668 }
669
670 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
671 let fed_size = process_mgr.globals.FM_FED_SIZE;
672 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
673 anyhow::ensure!(
674 fed_size > 3 * offline_nodes,
675 "too many offline nodes ({offline_nodes}) to reach consensus"
676 );
677
678 while self.num_members() > fed_size - offline_nodes {
679 self.terminate_server(self.num_members() - 1).await?;
680 }
681
682 if offline_nodes > 0 {
683 info!(fed_size, offline_nodes, "federation is degraded");
684 }
685 Ok(())
686 }
687
688 pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
689 self.bitcoind.send_to(address, amount).await?;
690
691 self.bitcoind.mine_blocks(21).await?;
692
693 Ok(())
694 }
695
696 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
697 let deposit_fees_msat = self.deposit_fees()?.msats;
698 assert_eq!(
699 deposit_fees_msat % 1000,
700 0,
701 "Deposit fees expected to be whole sats in test suite"
702 );
703 let deposit_fees = deposit_fees_msat / 1000;
704 info!(amount, deposit_fees, "Pegging-in client funds");
705
706 let (address, operation_id) = client.get_deposit_addr().await?;
707
708 self.bitcoind
709 .send_to(address, amount + deposit_fees)
710 .await?;
711 self.bitcoind.mine_blocks(21).await?;
712
713 Ok(operation_id)
714 }
715
716 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
717 let initial_balance = if crate::util::supports_wallet_v2() {
720 Some(client.balance().await?)
721 } else {
722 None
723 };
724
725 let operation_id = self.pegin_client_no_wait(amount, client).await?;
726
727 if let Some(initial) = initial_balance {
728 let expected_balance = initial + (amount * 1000 * 9 / 10);
731 client.await_balance(expected_balance).await?;
732 } else {
733 client.await_deposit(&operation_id).await?;
734 }
735 Ok(())
736 }
737
738 pub async fn pegin_gateways(
741 &self,
742 amount: u64,
743 gateways: Vec<&super::gatewayd::Gatewayd>,
744 ) -> Result<()> {
745 let deposit_fees_msat = self.deposit_fees()?.msats;
746 assert_eq!(
747 deposit_fees_msat % 1000,
748 0,
749 "Deposit fees expected to be whole sats in test suite"
750 );
751 let deposit_fees = deposit_fees_msat / 1000;
752 info!(amount, deposit_fees, "Pegging-in gateway funds");
753 let fed_id = self.calculate_federation_id();
754 for gw in gateways.clone() {
755 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
756 self.bitcoind
757 .send_to(pegin_addr, amount + deposit_fees)
758 .await?;
759 }
760
761 self.bitcoind.mine_blocks(21).await?;
762 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
763 try_join_all(gateways.into_iter().map(|gw| {
764 poll("gateway pegin", || async {
765 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
766
767 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
768 gw_info["block_height"]
769 .as_u64()
770 .expect("Could not parse block height")
771 } else {
772 gw_info["lightning_info"]["connected"]["block_height"]
773 .as_u64()
774 .expect("Could not parse block height")
775 };
776
777 if bitcoind_block_height != block_height {
778 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
779 "gateway block height is not synced"
780 )));
781 }
782
783 let gateway_balance = gw
784 .ecash_balance(fed_id.clone())
785 .await
786 .map_err(ControlFlow::Continue)?;
787 poll_almost_equal!(gateway_balance, amount * 1000)
788 })
789 }))
790 .await?;
791
792 Ok(())
793 }
794
795 pub async fn pegout_gateways(
798 &self,
799 amount: u64,
800 gateways: Vec<&super::gatewayd::Gatewayd>,
801 ) -> Result<()> {
802 info!(amount, "Pegging-out gateway funds");
803 let fed_id = self.calculate_federation_id();
804 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
805 for gw in gateways.clone() {
806 let prev_fed_ecash_balance = gw
807 .get_balances()
808 .await?
809 .ecash_balances
810 .into_iter()
811 .find(|fed| fed.federation_id.to_string() == fed_id)
812 .expect("Gateway has not joined federation")
813 .ecash_balance_msats;
814
815 let pegout_address = self.bitcoind.get_new_address().await?;
816 let value = cmd!(
817 gw,
818 "ecash",
819 "pegout",
820 "--federation-id",
821 fed_id,
822 "--amount",
823 amount,
824 "--address",
825 pegout_address
826 )
827 .out_json()
828 .await?;
829 let response: WithdrawResponse = serde_json::from_value(value)?;
830 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
831 }
832 self.bitcoind.mine_blocks(21).await?;
833
834 try_join_all(
835 peg_outs
836 .values()
837 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
838 )
839 .await?;
840
841 for gw in gateways.clone() {
842 let after_fed_ecash_balance = gw
843 .get_balances()
844 .await?
845 .ecash_balances
846 .into_iter()
847 .find(|fed| fed.federation_id.to_string() == fed_id)
848 .expect("Gateway has not joined federation")
849 .ecash_balance_msats;
850
851 let ln_type = gw.ln.ln_type();
852 let prev_balance = peg_outs
853 .get(&ln_type)
854 .expect("peg out does not exist")
855 .0
856 .msats;
857 let fees = peg_outs
858 .get(&ln_type)
859 .expect("peg out does not exist")
860 .1
861 .fees;
862 let total_fee = fees.amount().to_sat() * 1000;
863 let tolerance = if crate::util::supports_wallet_v2() {
866 let amount_sats = amount / 1000;
867 let module_fee_sats = 100 + amount_sats / 100;
868 module_fee_sats * 1000 + 2000
869 } else {
870 2000
871 };
872 crate::util::almost_equal(
873 after_fed_ecash_balance.msats,
874 prev_balance - amount - total_fee,
875 tolerance,
876 )
877 .map_err(|e| {
878 anyhow::anyhow!(
879 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
880 e
881 )
882 })?;
883 }
884
885 Ok(())
886 }
887
888 pub fn calculate_federation_id(&self) -> String {
889 self.client_config()
890 .unwrap()
891 .global
892 .calculate_federation_id()
893 .to_string()
894 }
895
896 pub async fn await_block_sync(&self) -> Result<u64> {
897 let finality_delay = self.get_finality_delay()?;
898 let block_count = self.bitcoind.get_block_count().await?;
899 let expected = block_count.saturating_sub(finality_delay.into());
900
901 if crate::util::supports_wallet_v2() {
902 let client = self.internal_client().await?;
904 loop {
905 let value = cmd!(client, "module", "walletv2", "info", "block-count")
906 .out_json()
907 .await?;
908 let current: u64 = serde_json::from_value(value)?;
909 if current >= expected {
910 break;
911 }
912 fedimint_core::task::sleep_in_test(
913 format!("Waiting for consensus block count to reach {expected}"),
914 std::time::Duration::from_secs(1),
915 )
916 .await;
917 }
918 } else {
919 cmd!(
920 self.internal_client().await?,
921 "dev",
922 "wait-block-count",
923 expected
924 )
925 .run()
926 .await?;
927 }
928
929 Ok(expected)
930 }
931
932 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
933 if crate::util::supports_wallet_v2() {
935 return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
936 }
937
938 let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
939 let client_config = &self.client_config()?;
940 let wallet_cfg = client_config
941 .modules
942 .get(&wallet_instance_id)
943 .context("wallet module not found")?
944 .clone()
945 .redecode_raw(&ModuleDecoderRegistry::new([(
946 wallet_instance_id,
947 fedimint_wallet_client::KIND,
948 fedimint_wallet_client::WalletModuleTypes::decoder(),
949 )]))?;
950 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
951
952 let finality_delay = wallet_cfg.finality_delay;
953 Ok(finality_delay)
954 }
955
956 pub async fn await_gateways_registered(&self) -> Result<()> {
957 let start_time = Instant::now();
958 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
959
960 poll("gateways registered", || async {
961 let num_gateways = cmd!(
962 self.internal_client()
963 .await
964 .map_err(ControlFlow::Continue)?,
965 "list-gateways"
966 )
967 .out_json()
968 .await
969 .map_err(ControlFlow::Continue)?
970 .as_array()
971 .context("invalid output")
972 .map_err(ControlFlow::Break)?
973 .len();
974
975 let expected_gateways =
978 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
979 1
980 } else {
981 2
982 };
983
984 poll_eq!(num_gateways, expected_gateways)
985 })
986 .await?;
987 debug!(target: LOG_DEVIMINT,
988 elapsed_ms = %start_time.elapsed().as_millis(),
989 "Gateways registered");
990 Ok(())
991 }
992
993 pub async fn await_all_peers(&self) -> Result<()> {
994 let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
995 ("walletv2", "consensus_block_count")
996 } else {
997 ("wallet", "block_count")
998 };
999 poll("Waiting for all peers to be online", || async {
1000 cmd!(
1001 self.internal_client()
1002 .await
1003 .map_err(ControlFlow::Continue)?,
1004 "dev",
1005 "api",
1006 "--module",
1007 module_name,
1008 endpoint
1009 )
1010 .run()
1011 .await
1012 .map_err(ControlFlow::Continue)?;
1013 Ok(())
1014 })
1015 .await
1016 }
1017
1018 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1019 poll("Waiting for all peers to be online", || async {
1020 cmd!(
1021 self.internal_client()
1022 .await
1023 .map_err(ControlFlow::Continue)?,
1024 "dev",
1025 "api",
1026 "--peer-id",
1027 peer_id,
1028 "--module",
1029 "wallet",
1030 "block_count"
1031 )
1032 .run()
1033 .await
1034 .map_err(ControlFlow::Continue)?;
1035 Ok(())
1036 })
1037 .await
1038 }
1039
1040 pub async fn finalize_mempool_tx(&self) -> Result<()> {
1050 let finality_delay = self.get_finality_delay()?;
1051 let blocks_to_mine = finality_delay + 1;
1052 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1053 self.await_block_sync().await?;
1054 Ok(())
1055 }
1056
1057 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1058 self.bitcoind.mine_blocks(blocks).await?;
1059 self.await_block_sync().await?;
1060 Ok(())
1061 }
1062
1063 pub fn num_members(&self) -> usize {
1064 self.members.len()
1065 }
1066
1067 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1068 self.members
1069 .keys()
1070 .map(|&peer_id| PeerId::from(peer_id as u16))
1071 }
1072}
1073
1074#[derive(Clone)]
1075pub struct Fedimintd {
1076 _bitcoind: Bitcoind,
1077 process: ProcessHandle,
1078}
1079
1080impl Fedimintd {
1081 pub async fn new(
1082 process_mgr: &ProcessManager,
1083 bitcoind: Bitcoind,
1084 peer_id: usize,
1085 env: &vars::Fedimintd,
1086 fed_name: String,
1087 ) -> Result<Self> {
1088 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1089 let process = process_mgr
1090 .spawn_daemon(
1091 &format!("fedimintd-{fed_name}-{peer_id}"),
1092 cmd!(FedimintdCmd).envs(env.vars()),
1093 )
1094 .await?;
1095
1096 Ok(Self {
1097 _bitcoind: bitcoind,
1098 process,
1099 })
1100 }
1101
1102 pub async fn terminate(self) -> Result<()> {
1103 self.process.terminate().await
1104 }
1105
1106 pub async fn await_terminated(&self) -> Result<()> {
1107 self.process.await_terminated().await
1108 }
1109}
1110
1111pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1112 let status_futures = endpoints.values().map(|endpoint| {
1114 let endpoint = endpoint.clone();
1115 async move {
1116 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1117 crate::util::FedimintCli
1118 .setup_status(&API_AUTH, &endpoint)
1119 .await
1120 .map_err(ControlFlow::Continue)
1121 })
1122 .await
1123 .unwrap();
1124
1125 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1126 }
1127 });
1128 join_all(status_futures).await;
1129
1130 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1131
1132 let federation_size =
1135 if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1136 Some(endpoints.len())
1137 } else {
1138 None
1139 };
1140 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1141 let peer = *peer;
1142 let endpoint = endpoint.clone();
1143 async move {
1144 let info = if peer.to_usize() == 0 {
1145 crate::util::FedimintCli
1146 .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1147 .await
1148 } else {
1149 crate::util::FedimintCli
1150 .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1151 .await
1152 };
1153 info.map(|i| (peer, i))
1154 }
1155 });
1156 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1157 .await?
1158 .into_iter()
1159 .collect();
1160
1161 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1162
1163 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1166 endpoints
1167 .iter()
1168 .filter(move |(p, _)| *p != peer)
1169 .map(move |(_, endpoint)| {
1170 let endpoint = endpoint.clone();
1171 let info = info.clone();
1172 async move {
1173 crate::util::FedimintCli
1174 .add_peer(&info, &API_AUTH, &endpoint)
1175 .await
1176 }
1177 })
1178 });
1179 try_join_all(add_peer_futures).await?;
1180
1181 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1182
1183 let start_dkg_futures = endpoints.values().map(|endpoint| {
1185 let endpoint = endpoint.clone();
1186 async move {
1187 crate::util::FedimintCli
1188 .start_dkg(&API_AUTH, &endpoint)
1189 .await
1190 }
1191 });
1192 try_join_all(start_dkg_futures).await?;
1193
1194 Ok(())
1195}