1use std::collections::{BTreeMap, HashMap};
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::api::net::ConnectorType;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::registry::ModuleDecoderRegistry;
18use fedimint_core::module::{ApiAuth, ModuleCommon};
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_server::config::ConfigGenParams;
27use fedimint_testing_core::config::local_config_gen_params;
28use fedimint_testing_core::node_type::LightningNodeType;
29use fedimint_wallet_client::WalletClientModule;
30use fedimint_wallet_client::config::WalletClientConfig;
31use fs_lock::FileLock;
32use futures::future::{join_all, try_join_all};
33use tokio::task::{JoinSet, spawn_blocking};
34use tokio::time::Instant;
35use tracing::{debug, info};
36
37use super::external::Bitcoind;
38use super::util::{Command, ProcessHandle, ProcessManager, cmd};
39use super::vars::utf8;
40use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
41use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
42use crate::version_constants::VERSION_0_10_0_ALPHA;
43use crate::{poll_almost_equal, poll_eq, vars};
44
45pub const PORTS_PER_FEDIMINTD: u16 = 4;
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
56
57#[derive(Clone)]
58pub struct Federation {
59 pub members: BTreeMap<usize, Fedimintd>,
61 pub vars: BTreeMap<usize, vars::Fedimintd>,
62 pub bitcoind: Bitcoind,
63
64 client: JitTryAnyhow<Client>,
66 #[allow(dead_code)] connectors: ConnectorRegistry,
68}
69
70impl Drop for Federation {
71 fn drop(&mut self) {
72 block_in_place(|| {
73 block_on(async {
74 let mut set = JoinSet::new();
75
76 while let Some((_id, fedimintd)) = self.members.pop_first() {
77 set.spawn(async { drop(fedimintd) });
78 }
79 while (set.join_next().await).is_some() {}
80 });
81 });
82 }
83}
84#[derive(Clone)]
86pub struct Client {
87 name: String,
88}
89
90impl Client {
91 fn clients_dir() -> PathBuf {
92 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
93 .expect("FM_DATA_DIR_ENV not set")
94 .parse()
95 .expect("FM_DATA_DIR_ENV invalid");
96 data_dir.join("clients")
97 }
98
99 fn client_dir(&self) -> PathBuf {
100 Self::clients_dir().join(&self.name)
101 }
102
103 pub fn client_name_lock(name: &str) -> Result<FileLock> {
104 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
105 let file_lock = std::fs::OpenOptions::new()
106 .write(true)
107 .create(true)
108 .truncate(true)
109 .open(&lock_path)
110 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
111
112 fs_lock::FileLock::new_exclusive(file_lock)
113 .with_context(|| format!("Failed to lock {}", lock_path.display()))
114 }
115
116 pub async fn create(name: impl ToString) -> Result<Client> {
118 let name = name.to_string();
119 spawn_blocking(move || {
120 let _lock = Self::client_name_lock(&name);
121 for i in 0u64.. {
122 let client = Self {
123 name: format!("{name}-{i}"),
124 };
125
126 if !client.client_dir().exists() {
127 std::fs::create_dir_all(client.client_dir())?;
128 return Ok(client);
129 }
130 }
131 unreachable!()
132 })
133 .await?
134 }
135
136 pub fn open_or_create(name: &str) -> Result<Client> {
138 block_in_place(|| {
139 let _lock = Self::client_name_lock(name);
140 let client = Self {
141 name: format!("{name}-0"),
142 };
143 if !client.client_dir().exists() {
144 std::fs::create_dir_all(client.client_dir())?;
145 }
146 Ok(client)
147 })
148 }
149
150 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
152 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
153 cmd!(self, "join-federation", invite_code).run().await?;
154
155 Ok(())
156 }
157
158 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
160 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
161 cmd!(
162 self,
163 "restore",
164 "--invite-code",
165 invite_code,
166 "--mnemonic",
167 mnemonic
168 )
169 .run()
170 .await?;
171
172 Ok(())
173 }
174
175 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
177 let restored = Self::open_or_create(name)?;
178
179 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
180 .as_str()
181 .unwrap()
182 .to_owned();
183
184 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
185 cmd!(
186 restored,
187 "restore",
188 "--invite-code",
189 invite_code,
190 "--mnemonic",
191 mnemonic
192 )
193 .run()
194 .await?;
195
196 Ok(restored)
197 }
198
199 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
202 let new = Client::create(name).await?;
203
204 cmd!(
205 "cp",
206 "-R",
207 self.client_dir().join("client.db").display(),
208 new.client_dir().display()
209 )
210 .run()
211 .await?;
212
213 Ok(new)
214 }
215
216 pub async fn balance(&self) -> Result<u64> {
217 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
218 .as_u64()
219 .unwrap())
220 }
221
222 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
223 let deposit = cmd!(self, "deposit-address").out_json().await?;
224 Ok((
225 deposit["address"].as_str().unwrap().to_string(),
226 deposit["operation_id"].as_str().unwrap().to_string(),
227 ))
228 }
229
230 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
231 cmd!(self, "await-deposit", operation_id).run().await
232 }
233
234 pub fn cmd(&self) -> Command {
235 cmd!(
236 crate::util::get_fedimint_cli_path(),
237 format!("--data-dir={}", self.client_dir().display())
238 )
239 }
240
241 pub fn get_name(&self) -> &str {
242 &self.name
243 }
244
245 pub async fn get_session_count(&self) -> Result<u64> {
247 cmd!(self, "dev", "session-count").out_json().await?["count"]
248 .as_u64()
249 .context("count field wasn't a number")
250 }
251
252 pub async fn wait_complete(&self) -> Result<()> {
254 cmd!(self, "dev", "wait-complete").run().await
255 }
256
257 pub async fn wait_session(&self) -> anyhow::Result<()> {
259 info!("Waiting for a new session");
260 let session_count = self.get_session_count().await?;
261 self.wait_session_outcome(session_count).await?;
262 Ok(())
263 }
264
265 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
267 let timeout = {
268 let current_session_count = self.get_session_count().await?;
269 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
270 let session_duration_seconds = 180;
271 Duration::from_secs(sessions_to_wait * session_duration_seconds)
272 };
273
274 let start = Instant::now();
275 poll_with_timeout("Waiting for a new session", timeout, || async {
276 info!("Awaiting session outcome {session_count}");
277 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
278 .run()
279 .await
280 {
281 Err(e) => Err(ControlFlow::Continue(e)),
282 Ok(()) => Ok(()),
283 }
284 })
285 .await?;
286
287 let session_found_in = start.elapsed();
288 info!("session found in {session_found_in:?}");
289 Ok(())
290 }
291}
292
293impl Federation {
294 pub async fn new(
295 process_mgr: &ProcessManager,
296 bitcoind: Bitcoind,
297 skip_setup: bool,
298 pre_dkg: bool,
299 fed_index: usize,
301 federation_name: String,
302 ) -> Result<Self> {
303 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
304 let mut members = BTreeMap::new();
305 let mut peer_to_env_vars_map = BTreeMap::new();
306
307 let peers: Vec<_> = num_peers.peer_ids().collect();
308 let params: HashMap<PeerId, ConfigGenParams> =
309 local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
310
311 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
312 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
313
314 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
315 for peer_id in num_peers.peer_ids() {
316 let peer_env_vars = vars::Fedimintd::init(
317 &process_mgr.globals,
318 federation_name.clone(),
319 peer_id,
320 process_mgr
321 .globals
322 .fedimintd_overrides
323 .peer_expect(fed_index, peer_id),
324 )
325 .await?;
326 members.insert(
327 peer_id.to_usize(),
328 Fedimintd::new(
329 process_mgr,
330 bitcoind.clone(),
331 peer_id.to_usize(),
332 &peer_env_vars,
333 federation_name.clone(),
334 )
335 .await?,
336 );
337 let admin_client = DynGlobalApi::new_admin_setup(
338 connectors.clone(),
339 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
340 )?;
343 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
344 admin_clients.insert(peer_id, admin_client);
345 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
346 }
347
348 if !skip_setup && !pre_dkg {
349 let (original_fedimint_cli_path, original_fm_mint_client) =
352 crate::util::use_matching_fedimint_cli_for_dkg().await?;
353
354 run_cli_dkg_v2(params, api_endpoints).await?;
355
356 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
358
359 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
361 let invite_code_filename_original = "invite-code";
362
363 for peer_env_vars in peer_to_env_vars_map.values() {
364 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
365
366 let invite_code = poll_simple("awaiting-invite-code", || async {
367 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
368 tokio::fs::read_to_string(&path)
369 .await
370 .with_context(|| format!("Awaiting invite code file: {path}"))
371 })
372 .await
373 .context("Awaiting invite code file")?;
374
375 ConnectorType::default()
376 .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
377 .await?;
378 }
379
380 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
382
383 tokio::fs::copy(
384 format!("{peer_data_dir}/{invite_code_filename_original}"),
385 format!("{client_dir}/{invite_code_filename_original}"),
386 )
387 .await
388 .context("copying invite-code file")?;
389
390 for (index, peer_env_vars) in &peer_to_env_vars_map {
393 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
394
395 let invite_code_filename_indexed =
396 format!("{invite_code_filename_original}-{index}");
397 tokio::fs::rename(
398 format!("{peer_data_dir}/{invite_code_filename_original}"),
399 format!("{client_dir}/{invite_code_filename_indexed}"),
400 )
401 .await
402 .context("moving invite-code file")?;
403 }
404
405 debug!("Moved invite-code files to client data directory");
406 }
407
408 let client = JitTryAnyhow::new_try({
409 move || async move {
410 let client = Client::open_or_create(federation_name.as_str())?;
411 let invite_code = Self::invite_code_static()?;
412 if !skip_setup && !pre_dkg {
413 cmd!(client, "join-federation", invite_code).run().await?;
414 }
415 Ok(client)
416 }
417 });
418
419 Ok(Self {
420 members,
421 vars: peer_to_env_vars_map,
422 bitcoind,
423 client,
424 connectors,
425 })
426 }
427
428 pub fn client_config(&self) -> Result<ClientConfig> {
429 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
430 load_from_file(&cfg_path)
431 }
432
433 pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
435 self.client_config()?
436 .modules
437 .iter()
438 .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
439 .with_context(|| format!("Module kind {kind} not found"))
440 }
441
442 pub fn module_client_config<M: ClientModule>(
443 &self,
444 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
445 self.client_config()?
446 .modules
447 .iter()
448 .find_map(|(module_instance_id, module_cfg)| {
449 if module_cfg.kind == M::kind() {
450 let decoders = ModuleDecoderRegistry::new(vec![(
451 *module_instance_id,
452 M::kind(),
453 M::decoder(),
454 )]);
455 Some(
456 module_cfg
457 .config
458 .clone()
459 .redecode_raw(&decoders)
460 .expect("Decoding client cfg failed")
461 .expect_decoded_ref()
462 .as_any()
463 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
464 .cloned()
465 .context("Cast to module config failed"),
466 )
467 } else {
468 None
469 }
470 })
471 .transpose()
472 }
473
474 pub fn deposit_fees(&self) -> Result<Amount> {
475 Ok(self
476 .module_client_config::<WalletClientModule>()?
477 .context("No wallet module found")?
478 .fee_consensus
479 .peg_in_abs)
480 }
481
482 pub fn invite_code(&self) -> Result<String> {
484 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486 Ok(invite_code)
487 }
488
489 pub fn invite_code_static() -> Result<String> {
490 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
491 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
492 Ok(invite_code)
493 }
494 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
495 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
496 let name = format!("invite-code-{peer_id}");
497 let invite_code = fs::read_to_string(data_dir.join(name))?;
498 Ok(invite_code)
499 }
500
501 pub async fn internal_client(&self) -> Result<&Client> {
505 self.client
506 .get_try()
507 .await
508 .context("Internal client joining Federation")
509 }
510
511 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
513 let client = Client::create(name).await?;
514 client.join_federation(self.invite_code()?).await?;
515 Ok(client)
516 }
517
518 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
519 if self.members.contains_key(&peer) {
520 bail!("fedimintd-{peer} already running");
521 }
522 self.members.insert(
523 peer,
524 Fedimintd::new(
525 process_mgr,
526 self.bitcoind.clone(),
527 peer,
528 &self.vars[&peer],
529 "default".to_string(),
530 )
531 .await?,
532 );
533 Ok(())
534 }
535
536 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
537 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
538 bail!("fedimintd-{peer_id} does not exist");
539 };
540 fedimintd.terminate().await?;
541 Ok(())
542 }
543
544 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
545 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
546 bail!("fedimintd-{peer_id} does not exist");
547 };
548 fedimintd.await_terminated().await?;
549 self.members.remove(&peer_id);
550 Ok(())
551 }
552
553 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
555 info!("starting all servers");
556 let fed_size = process_mgr.globals.FM_FED_SIZE;
557 for peer_id in 0..fed_size {
558 if self.members.contains_key(&peer_id) {
559 continue;
560 }
561 self.start_server(process_mgr, peer_id).await?;
562 }
563 self.await_all_peers().await?;
564 Ok(())
565 }
566
567 pub async fn terminate_all_servers(&mut self) -> Result<()> {
569 info!("terminating all servers");
570 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
571 for peer_id in running_peer_ids {
572 self.terminate_server(peer_id).await?;
573 }
574 Ok(())
575 }
576
577 pub async fn restart_all_staggered_with_bin(
582 &mut self,
583 process_mgr: &ProcessManager,
584 bin_path: &PathBuf,
585 ) -> Result<()> {
586 let fed_size = process_mgr.globals.FM_FED_SIZE;
587
588 self.start_all_servers(process_mgr).await?;
590
591 while self.num_members() > 0 {
593 self.terminate_server(self.num_members() - 1).await?;
594 if self.num_members() > 0 {
595 fedimint_core::task::sleep_in_test(
596 "waiting to shutdown remaining peers",
597 Duration::from_secs(10),
598 )
599 .await;
600 }
601 }
602
603 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
605
606 for peer_id in 0..fed_size {
608 self.start_server(process_mgr, peer_id).await?;
609 if peer_id < fed_size - 1 {
610 fedimint_core::task::sleep_in_test(
611 "waiting to restart remaining peers",
612 Duration::from_secs(10),
613 )
614 .await;
615 }
616 }
617
618 self.await_all_peers().await?;
619
620 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
621 info!("upgraded fedimintd to version: {}", fedimintd_version);
622 Ok(())
623 }
624
625 pub async fn restart_all_with_bin(
626 &mut self,
627 process_mgr: &ProcessManager,
628 bin_path: &PathBuf,
629 ) -> Result<()> {
630 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
632 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
636
637 self.restart_all_staggered_with_bin(process_mgr, bin_path)
638 .await
639 }
640
641 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
642 let fed_size = process_mgr.globals.FM_FED_SIZE;
643 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
644 anyhow::ensure!(
645 fed_size > 3 * offline_nodes,
646 "too many offline nodes ({offline_nodes}) to reach consensus"
647 );
648
649 while self.num_members() > fed_size - offline_nodes {
650 self.terminate_server(self.num_members() - 1).await?;
651 }
652
653 if offline_nodes > 0 {
654 info!(fed_size, offline_nodes, "federation is degraded");
655 }
656 Ok(())
657 }
658
659 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
660 let deposit_fees_msat = self.deposit_fees()?.msats;
661 assert_eq!(
662 deposit_fees_msat % 1000,
663 0,
664 "Deposit fees expected to be whole sats in test suite"
665 );
666 let deposit_fees = deposit_fees_msat / 1000;
667 info!(amount, deposit_fees, "Pegging-in client funds");
668
669 let (address, operation_id) = client.get_deposit_addr().await?;
670
671 self.bitcoind
672 .send_to(address, amount + deposit_fees)
673 .await?;
674 self.bitcoind.mine_blocks(21).await?;
675
676 Ok(operation_id)
677 }
678
679 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
680 let operation_id = self.pegin_client_no_wait(amount, client).await?;
681
682 client.await_deposit(&operation_id).await?;
683 Ok(())
684 }
685
686 pub async fn pegin_gateways(
689 &self,
690 amount: u64,
691 gateways: Vec<&super::gatewayd::Gatewayd>,
692 ) -> Result<()> {
693 let deposit_fees_msat = self.deposit_fees()?.msats;
694 assert_eq!(
695 deposit_fees_msat % 1000,
696 0,
697 "Deposit fees expected to be whole sats in test suite"
698 );
699 let deposit_fees = deposit_fees_msat / 1000;
700 info!(amount, deposit_fees, "Pegging-in gateway funds");
701 let fed_id = self.calculate_federation_id();
702 for gw in gateways.clone() {
703 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
704 self.bitcoind
705 .send_to(pegin_addr, amount + deposit_fees)
706 .await?;
707 }
708
709 self.bitcoind.mine_blocks(21).await?;
710 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
711 try_join_all(gateways.into_iter().map(|gw| {
712 poll("gateway pegin", || async {
713 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
714
715 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
716 gw_info["block_height"]
717 .as_u64()
718 .expect("Could not parse block height")
719 } else {
720 gw_info["lightning_info"]["connected"]["block_height"]
721 .as_u64()
722 .expect("Could not parse block height")
723 };
724
725 if bitcoind_block_height != block_height {
726 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
727 "gateway block height is not synced"
728 )));
729 }
730
731 let gateway_balance = gw
732 .ecash_balance(fed_id.clone())
733 .await
734 .map_err(ControlFlow::Continue)?;
735 poll_almost_equal!(gateway_balance, amount * 1000)
736 })
737 }))
738 .await?;
739
740 Ok(())
741 }
742
743 pub async fn pegout_gateways(
746 &self,
747 amount: u64,
748 gateways: Vec<&super::gatewayd::Gatewayd>,
749 ) -> Result<()> {
750 info!(amount, "Pegging-out gateway funds");
751 let fed_id = self.calculate_federation_id();
752 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
753 for gw in gateways.clone() {
754 let prev_fed_ecash_balance = gw
755 .get_balances()
756 .await?
757 .ecash_balances
758 .into_iter()
759 .find(|fed| fed.federation_id.to_string() == fed_id)
760 .expect("Gateway has not joined federation")
761 .ecash_balance_msats;
762
763 let pegout_address = self.bitcoind.get_new_address().await?;
764 let value = cmd!(
765 gw,
766 "ecash",
767 "pegout",
768 "--federation-id",
769 fed_id,
770 "--amount",
771 amount,
772 "--address",
773 pegout_address
774 )
775 .out_json()
776 .await?;
777 let response: WithdrawResponse = serde_json::from_value(value)?;
778 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
779 }
780 self.bitcoind.mine_blocks(21).await?;
781
782 try_join_all(
783 peg_outs
784 .values()
785 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
786 )
787 .await?;
788
789 for gw in gateways.clone() {
790 let after_fed_ecash_balance = gw
791 .get_balances()
792 .await?
793 .ecash_balances
794 .into_iter()
795 .find(|fed| fed.federation_id.to_string() == fed_id)
796 .expect("Gateway has not joined federation")
797 .ecash_balance_msats;
798
799 let ln_type = gw.ln.ln_type();
800 let prev_balance = peg_outs
801 .get(&ln_type)
802 .expect("peg out does not exist")
803 .0
804 .msats;
805 let fees = peg_outs
806 .get(&ln_type)
807 .expect("peg out does not exist")
808 .1
809 .fees;
810 let total_fee = fees.amount().to_sat() * 1000;
811 crate::util::almost_equal(
812 after_fed_ecash_balance.msats,
813 prev_balance - amount - total_fee,
814 2000,
815 )
816 .map_err(|e| {
817 anyhow::anyhow!(
818 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
819 e
820 )
821 })?;
822 }
823
824 Ok(())
825 }
826
827 pub fn calculate_federation_id(&self) -> String {
828 self.client_config()
829 .unwrap()
830 .global
831 .calculate_federation_id()
832 .to_string()
833 }
834
835 pub async fn await_block_sync(&self) -> Result<u64> {
836 let finality_delay = self.get_finality_delay()?;
837 let block_count = self.bitcoind.get_block_count().await?;
838 let expected = block_count.saturating_sub(finality_delay.into());
839 cmd!(
840 self.internal_client().await?,
841 "dev",
842 "wait-block-count",
843 expected
844 )
845 .run()
846 .await?;
847 Ok(expected)
848 }
849
850 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
851 let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
852 let client_config = &self.client_config()?;
853 let wallet_cfg = client_config
854 .modules
855 .get(&wallet_instance_id)
856 .context("wallet module not found")?
857 .clone()
858 .redecode_raw(&ModuleDecoderRegistry::new([(
859 wallet_instance_id,
860 fedimint_wallet_client::KIND,
861 fedimint_wallet_client::WalletModuleTypes::decoder(),
862 )]))?;
863 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
864
865 let finality_delay = wallet_cfg.finality_delay;
866 Ok(finality_delay)
867 }
868
869 pub async fn await_gateways_registered(&self) -> Result<()> {
870 let start_time = Instant::now();
871 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
872
873 poll("gateways registered", || async {
874 let num_gateways = cmd!(
875 self.internal_client()
876 .await
877 .map_err(ControlFlow::Continue)?,
878 "list-gateways"
879 )
880 .out_json()
881 .await
882 .map_err(ControlFlow::Continue)?
883 .as_array()
884 .context("invalid output")
885 .map_err(ControlFlow::Break)?
886 .len();
887
888 let expected_gateways =
891 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
892 1
893 } else {
894 2
895 };
896
897 poll_eq!(num_gateways, expected_gateways)
898 })
899 .await?;
900 debug!(target: LOG_DEVIMINT,
901 elapsed_ms = %start_time.elapsed().as_millis(),
902 "Gateways registered");
903 Ok(())
904 }
905
906 pub async fn await_all_peers(&self) -> Result<()> {
907 poll("Waiting for all peers to be online", || async {
908 cmd!(
909 self.internal_client()
910 .await
911 .map_err(ControlFlow::Continue)?,
912 "dev",
913 "api",
914 "--module",
915 "wallet",
916 "block_count"
917 )
918 .run()
919 .await
920 .map_err(ControlFlow::Continue)?;
921 Ok(())
922 })
923 .await
924 }
925
926 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
927 poll("Waiting for all peers to be online", || async {
928 cmd!(
929 self.internal_client()
930 .await
931 .map_err(ControlFlow::Continue)?,
932 "dev",
933 "api",
934 "--peer-id",
935 peer_id,
936 "--module",
937 "wallet",
938 "block_count"
939 )
940 .run()
941 .await
942 .map_err(ControlFlow::Continue)?;
943 Ok(())
944 })
945 .await
946 }
947
948 pub async fn finalize_mempool_tx(&self) -> Result<()> {
958 let finality_delay = self.get_finality_delay()?;
959 let blocks_to_mine = finality_delay + 1;
960 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
961 self.await_block_sync().await?;
962 Ok(())
963 }
964
965 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
966 self.bitcoind.mine_blocks(blocks).await?;
967 self.await_block_sync().await?;
968 Ok(())
969 }
970
971 pub fn num_members(&self) -> usize {
972 self.members.len()
973 }
974
975 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
976 self.members
977 .keys()
978 .map(|&peer_id| PeerId::from(peer_id as u16))
979 }
980}
981
982#[derive(Clone)]
983pub struct Fedimintd {
984 _bitcoind: Bitcoind,
985 process: ProcessHandle,
986}
987
988impl Fedimintd {
989 pub async fn new(
990 process_mgr: &ProcessManager,
991 bitcoind: Bitcoind,
992 peer_id: usize,
993 env: &vars::Fedimintd,
994 fed_name: String,
995 ) -> Result<Self> {
996 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
997 let process = process_mgr
998 .spawn_daemon(
999 &format!("fedimintd-{fed_name}-{peer_id}"),
1000 cmd!(FedimintdCmd).envs(env.vars()),
1001 )
1002 .await?;
1003
1004 Ok(Self {
1005 _bitcoind: bitcoind,
1006 process,
1007 })
1008 }
1009
1010 pub async fn terminate(self) -> Result<()> {
1011 self.process.terminate().await
1012 }
1013
1014 pub async fn await_terminated(&self) -> Result<()> {
1015 self.process.await_terminated().await
1016 }
1017}
1018
1019pub async fn run_cli_dkg_v2(
1020 params: HashMap<PeerId, ConfigGenParams>,
1021 endpoints: BTreeMap<PeerId, String>,
1022) -> Result<()> {
1023 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1024
1025 let status_futures = endpoints.iter().map(|(peer, endpoint)| {
1027 let peer = *peer;
1028 let endpoint = endpoint.clone();
1029 async move {
1030 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1031 crate::util::FedimintCli
1032 .setup_status(auth_for(&peer), &endpoint)
1033 .await
1034 .map_err(ControlFlow::Continue)
1035 })
1036 .await
1037 .unwrap();
1038
1039 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1040 }
1041 });
1042 join_all(status_futures).await;
1043
1044 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1045
1046 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1048 let peer = *peer;
1049 let endpoint = endpoint.clone();
1050 async move {
1051 let info = if peer.to_usize() == 0 {
1052 crate::util::FedimintCli
1053 .set_local_params_leader(&peer, auth_for(&peer), &endpoint)
1054 .await
1055 } else {
1056 crate::util::FedimintCli
1057 .set_local_params_follower(&peer, auth_for(&peer), &endpoint)
1058 .await
1059 };
1060 info.map(|i| (peer, i))
1061 }
1062 });
1063 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1064 .await?
1065 .into_iter()
1066 .collect();
1067
1068 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1069
1070 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1073 endpoints
1074 .iter()
1075 .filter(move |(p, _)| *p != peer)
1076 .map(move |(p, endpoint)| {
1077 let p = *p;
1078 let endpoint = endpoint.clone();
1079 let info = info.clone();
1080 async move {
1081 crate::util::FedimintCli
1082 .add_peer(&info, auth_for(&p), &endpoint)
1083 .await
1084 }
1085 })
1086 });
1087 try_join_all(add_peer_futures).await?;
1088
1089 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1090
1091 let start_dkg_futures = endpoints.iter().map(|(peer, endpoint)| {
1093 let peer = *peer;
1094 let endpoint = endpoint.clone();
1095 async move {
1096 crate::util::FedimintCli
1097 .start_dkg(auth_for(&peer), &endpoint)
1098 .await
1099 }
1100 });
1101 try_join_all(start_dkg_futures).await?;
1102
1103 Ok(())
1104}