1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::VERSION_0_10_0_ALPHA;
42use crate::{poll_almost_equal, poll_eq, vars};
43
44pub const PORTS_PER_FEDIMINTD: u16 = 4;
47pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58 pub members: BTreeMap<usize, Fedimintd>,
60 pub vars: BTreeMap<usize, vars::Fedimintd>,
61 pub bitcoind: Bitcoind,
62
63 client: JitTryAnyhow<Client>,
65 #[allow(dead_code)] connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70 fn drop(&mut self) {
71 block_in_place(|| {
72 block_on(async {
73 let mut set = JoinSet::new();
74
75 while let Some((_id, fedimintd)) = self.members.pop_first() {
76 set.spawn(async { drop(fedimintd) });
77 }
78 while (set.join_next().await).is_some() {}
79 });
80 });
81 }
82}
83#[derive(Clone)]
85pub struct Client {
86 name: String,
87}
88
89impl Client {
90 fn clients_dir() -> PathBuf {
91 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92 .expect("FM_DATA_DIR_ENV not set")
93 .parse()
94 .expect("FM_DATA_DIR_ENV invalid");
95 data_dir.join("clients")
96 }
97
98 fn client_dir(&self) -> PathBuf {
99 Self::clients_dir().join(&self.name)
100 }
101
102 pub fn client_name_lock(name: &str) -> Result<FileLock> {
103 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104 let file_lock = std::fs::OpenOptions::new()
105 .write(true)
106 .create(true)
107 .truncate(true)
108 .open(&lock_path)
109 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111 fs_lock::FileLock::new_exclusive(file_lock)
112 .with_context(|| format!("Failed to lock {}", lock_path.display()))
113 }
114
115 pub async fn create(name: impl ToString) -> Result<Client> {
117 let name = name.to_string();
118 spawn_blocking(move || {
119 let _lock = Self::client_name_lock(&name);
120 for i in 0u64.. {
121 let client = Self {
122 name: format!("{name}-{i}"),
123 };
124
125 if !client.client_dir().exists() {
126 std::fs::create_dir_all(client.client_dir())?;
127 return Ok(client);
128 }
129 }
130 unreachable!()
131 })
132 .await?
133 }
134
135 pub fn open_or_create(name: &str) -> Result<Client> {
137 block_in_place(|| {
138 let _lock = Self::client_name_lock(name);
139 let client = Self {
140 name: format!("{name}-0"),
141 };
142 if !client.client_dir().exists() {
143 std::fs::create_dir_all(client.client_dir())?;
144 }
145 Ok(client)
146 })
147 }
148
149 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152 cmd!(self, "join-federation", invite_code).run().await?;
153
154 Ok(())
155 }
156
157 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160 cmd!(
161 self,
162 "restore",
163 "--invite-code",
164 invite_code,
165 "--mnemonic",
166 mnemonic
167 )
168 .run()
169 .await?;
170
171 Ok(())
172 }
173
174 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176 let restored = Self::open_or_create(name)?;
177
178 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179 .as_str()
180 .unwrap()
181 .to_owned();
182
183 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184 cmd!(
185 restored,
186 "restore",
187 "--invite-code",
188 invite_code,
189 "--mnemonic",
190 mnemonic
191 )
192 .run()
193 .await?;
194
195 Ok(restored)
196 }
197
198 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201 let new = Client::create(name).await?;
202
203 cmd!(
204 "cp",
205 "-R",
206 self.client_dir().join("client.db").display(),
207 new.client_dir().display()
208 )
209 .run()
210 .await?;
211
212 Ok(new)
213 }
214
215 pub async fn balance(&self) -> Result<u64> {
216 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217 .as_u64()
218 .unwrap())
219 }
220
221 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
222 let deposit = cmd!(self, "deposit-address").out_json().await?;
223 Ok((
224 deposit["address"].as_str().unwrap().to_string(),
225 deposit["operation_id"].as_str().unwrap().to_string(),
226 ))
227 }
228
229 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
230 cmd!(self, "await-deposit", operation_id).run().await
231 }
232
233 pub fn cmd(&self) -> Command {
234 cmd!(
235 crate::util::get_fedimint_cli_path(),
236 format!("--data-dir={}", self.client_dir().display())
237 )
238 }
239
240 pub fn get_name(&self) -> &str {
241 &self.name
242 }
243
244 pub async fn get_session_count(&self) -> Result<u64> {
246 cmd!(self, "dev", "session-count").out_json().await?["count"]
247 .as_u64()
248 .context("count field wasn't a number")
249 }
250
251 pub async fn wait_complete(&self) -> Result<()> {
253 cmd!(self, "dev", "wait-complete").run().await
254 }
255
256 pub async fn wait_session(&self) -> anyhow::Result<()> {
258 info!("Waiting for a new session");
259 let session_count = self.get_session_count().await?;
260 self.wait_session_outcome(session_count).await?;
261 Ok(())
262 }
263
264 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
266 let timeout = {
267 let current_session_count = self.get_session_count().await?;
268 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
269 let session_duration_seconds = 180;
270 Duration::from_secs(sessions_to_wait * session_duration_seconds)
271 };
272
273 let start = Instant::now();
274 poll_with_timeout("Waiting for a new session", timeout, || async {
275 info!("Awaiting session outcome {session_count}");
276 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
277 .run()
278 .await
279 {
280 Err(e) => Err(ControlFlow::Continue(e)),
281 Ok(()) => Ok(()),
282 }
283 })
284 .await?;
285
286 let session_found_in = start.elapsed();
287 info!("session found in {session_found_in:?}");
288 Ok(())
289 }
290}
291
292impl Federation {
293 pub async fn new(
294 process_mgr: &ProcessManager,
295 bitcoind: Bitcoind,
296 skip_setup: bool,
297 pre_dkg: bool,
298 fed_index: usize,
300 federation_name: String,
301 ) -> Result<Self> {
302 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
303 let mut members = BTreeMap::new();
304 let mut peer_to_env_vars_map = BTreeMap::new();
305
306 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
307 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
308
309 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
310 for peer_id in num_peers.peer_ids() {
311 let peer_env_vars = vars::Fedimintd::init(
312 &process_mgr.globals,
313 federation_name.clone(),
314 peer_id,
315 process_mgr
316 .globals
317 .fedimintd_overrides
318 .peer_expect(fed_index, peer_id),
319 )
320 .await?;
321 members.insert(
322 peer_id.to_usize(),
323 Fedimintd::new(
324 process_mgr,
325 bitcoind.clone(),
326 peer_id.to_usize(),
327 &peer_env_vars,
328 federation_name.clone(),
329 )
330 .await?,
331 );
332 let admin_client = DynGlobalApi::new_admin_setup(
333 connectors.clone(),
334 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
335 )?;
338 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
339 admin_clients.insert(peer_id, admin_client);
340 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
341 }
342
343 if !skip_setup && !pre_dkg {
344 let (original_fedimint_cli_path, original_fm_mint_client) =
347 crate::util::use_matching_fedimint_cli_for_dkg().await?;
348
349 run_cli_dkg_v2(api_endpoints).await?;
350
351 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
353
354 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
356 let invite_code_filename_original = "invite-code";
357
358 for peer_env_vars in peer_to_env_vars_map.values() {
359 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
360
361 let invite_code = poll_simple("awaiting-invite-code", || async {
362 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
363 tokio::fs::read_to_string(&path)
364 .await
365 .with_context(|| format!("Awaiting invite code file: {path}"))
366 })
367 .await
368 .context("Awaiting invite code file")?;
369
370 download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
371 .await?;
372 }
373
374 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
376
377 tokio::fs::copy(
378 format!("{peer_data_dir}/{invite_code_filename_original}"),
379 format!("{client_dir}/{invite_code_filename_original}"),
380 )
381 .await
382 .context("copying invite-code file")?;
383
384 for (index, peer_env_vars) in &peer_to_env_vars_map {
387 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
388
389 let invite_code_filename_indexed =
390 format!("{invite_code_filename_original}-{index}");
391 tokio::fs::rename(
392 format!("{peer_data_dir}/{invite_code_filename_original}"),
393 format!("{client_dir}/{invite_code_filename_indexed}"),
394 )
395 .await
396 .context("moving invite-code file")?;
397 }
398
399 debug!("Moved invite-code files to client data directory");
400 }
401
402 let client = JitTryAnyhow::new_try({
403 move || async move {
404 let client = Client::open_or_create(federation_name.as_str())?;
405 let invite_code = Self::invite_code_static()?;
406 if !skip_setup && !pre_dkg {
407 cmd!(client, "join-federation", invite_code).run().await?;
408 }
409 Ok(client)
410 }
411 });
412
413 Ok(Self {
414 members,
415 vars: peer_to_env_vars_map,
416 bitcoind,
417 client,
418 connectors,
419 })
420 }
421
422 pub fn client_config(&self) -> Result<ClientConfig> {
423 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
424 load_from_file(&cfg_path)
425 }
426
427 pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
429 self.client_config()?
430 .modules
431 .iter()
432 .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
433 .with_context(|| format!("Module kind {kind} not found"))
434 }
435
436 pub fn module_client_config<M: ClientModule>(
437 &self,
438 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
439 self.client_config()?
440 .modules
441 .iter()
442 .find_map(|(module_instance_id, module_cfg)| {
443 if module_cfg.kind == M::kind() {
444 let decoders = ModuleDecoderRegistry::new(vec![(
445 *module_instance_id,
446 M::kind(),
447 M::decoder(),
448 )]);
449 Some(
450 module_cfg
451 .config
452 .clone()
453 .redecode_raw(&decoders)
454 .expect("Decoding client cfg failed")
455 .expect_decoded_ref()
456 .as_any()
457 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
458 .cloned()
459 .context("Cast to module config failed"),
460 )
461 } else {
462 None
463 }
464 })
465 .transpose()
466 }
467
468 pub fn deposit_fees(&self) -> Result<Amount> {
469 Ok(self
470 .module_client_config::<WalletClientModule>()?
471 .context("No wallet module found")?
472 .fee_consensus
473 .peg_in_abs)
474 }
475
476 pub fn invite_code(&self) -> Result<String> {
478 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
479 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
480 Ok(invite_code)
481 }
482
483 pub fn invite_code_static() -> Result<String> {
484 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486 Ok(invite_code)
487 }
488 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
489 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
490 let name = format!("invite-code-{peer_id}");
491 let invite_code = fs::read_to_string(data_dir.join(name))?;
492 Ok(invite_code)
493 }
494
495 pub async fn internal_client(&self) -> Result<&Client> {
499 self.client
500 .get_try()
501 .await
502 .context("Internal client joining Federation")
503 }
504
505 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
507 let client = Client::create(name).await?;
508 client.join_federation(self.invite_code()?).await?;
509 Ok(client)
510 }
511
512 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
513 if self.members.contains_key(&peer) {
514 bail!("fedimintd-{peer} already running");
515 }
516 self.members.insert(
517 peer,
518 Fedimintd::new(
519 process_mgr,
520 self.bitcoind.clone(),
521 peer,
522 &self.vars[&peer],
523 "default".to_string(),
524 )
525 .await?,
526 );
527 Ok(())
528 }
529
530 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
531 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
532 bail!("fedimintd-{peer_id} does not exist");
533 };
534 fedimintd.terminate().await?;
535 Ok(())
536 }
537
538 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
539 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
540 bail!("fedimintd-{peer_id} does not exist");
541 };
542 fedimintd.await_terminated().await?;
543 self.members.remove(&peer_id);
544 Ok(())
545 }
546
547 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
549 info!("starting all servers");
550 let fed_size = process_mgr.globals.FM_FED_SIZE;
551 for peer_id in 0..fed_size {
552 if self.members.contains_key(&peer_id) {
553 continue;
554 }
555 self.start_server(process_mgr, peer_id).await?;
556 }
557 self.await_all_peers().await?;
558 Ok(())
559 }
560
561 pub async fn terminate_all_servers(&mut self) -> Result<()> {
563 info!("terminating all servers");
564 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
565 for peer_id in running_peer_ids {
566 self.terminate_server(peer_id).await?;
567 }
568 Ok(())
569 }
570
571 pub async fn restart_all_staggered_with_bin(
576 &mut self,
577 process_mgr: &ProcessManager,
578 bin_path: &PathBuf,
579 ) -> Result<()> {
580 let fed_size = process_mgr.globals.FM_FED_SIZE;
581
582 self.start_all_servers(process_mgr).await?;
584
585 while self.num_members() > 0 {
587 self.terminate_server(self.num_members() - 1).await?;
588 if self.num_members() > 0 {
589 fedimint_core::task::sleep_in_test(
590 "waiting to shutdown remaining peers",
591 Duration::from_secs(10),
592 )
593 .await;
594 }
595 }
596
597 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
599
600 for peer_id in 0..fed_size {
602 self.start_server(process_mgr, peer_id).await?;
603 if peer_id < fed_size - 1 {
604 fedimint_core::task::sleep_in_test(
605 "waiting to restart remaining peers",
606 Duration::from_secs(10),
607 )
608 .await;
609 }
610 }
611
612 self.await_all_peers().await?;
613
614 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
615 info!("upgraded fedimintd to version: {}", fedimintd_version);
616 Ok(())
617 }
618
619 pub async fn restart_all_with_bin(
620 &mut self,
621 process_mgr: &ProcessManager,
622 bin_path: &PathBuf,
623 ) -> Result<()> {
624 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
626 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
628 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
630
631 self.restart_all_staggered_with_bin(process_mgr, bin_path)
632 .await
633 }
634
635 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
636 let fed_size = process_mgr.globals.FM_FED_SIZE;
637 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
638 anyhow::ensure!(
639 fed_size > 3 * offline_nodes,
640 "too many offline nodes ({offline_nodes}) to reach consensus"
641 );
642
643 while self.num_members() > fed_size - offline_nodes {
644 self.terminate_server(self.num_members() - 1).await?;
645 }
646
647 if offline_nodes > 0 {
648 info!(fed_size, offline_nodes, "federation is degraded");
649 }
650 Ok(())
651 }
652
653 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
654 let deposit_fees_msat = self.deposit_fees()?.msats;
655 assert_eq!(
656 deposit_fees_msat % 1000,
657 0,
658 "Deposit fees expected to be whole sats in test suite"
659 );
660 let deposit_fees = deposit_fees_msat / 1000;
661 info!(amount, deposit_fees, "Pegging-in client funds");
662
663 let (address, operation_id) = client.get_deposit_addr().await?;
664
665 self.bitcoind
666 .send_to(address, amount + deposit_fees)
667 .await?;
668 self.bitcoind.mine_blocks(21).await?;
669
670 Ok(operation_id)
671 }
672
673 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
674 let operation_id = self.pegin_client_no_wait(amount, client).await?;
675
676 client.await_deposit(&operation_id).await?;
677 Ok(())
678 }
679
680 pub async fn pegin_gateways(
683 &self,
684 amount: u64,
685 gateways: Vec<&super::gatewayd::Gatewayd>,
686 ) -> Result<()> {
687 let deposit_fees_msat = self.deposit_fees()?.msats;
688 assert_eq!(
689 deposit_fees_msat % 1000,
690 0,
691 "Deposit fees expected to be whole sats in test suite"
692 );
693 let deposit_fees = deposit_fees_msat / 1000;
694 info!(amount, deposit_fees, "Pegging-in gateway funds");
695 let fed_id = self.calculate_federation_id();
696 for gw in gateways.clone() {
697 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
698 self.bitcoind
699 .send_to(pegin_addr, amount + deposit_fees)
700 .await?;
701 }
702
703 self.bitcoind.mine_blocks(21).await?;
704 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
705 try_join_all(gateways.into_iter().map(|gw| {
706 poll("gateway pegin", || async {
707 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
708
709 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
710 gw_info["block_height"]
711 .as_u64()
712 .expect("Could not parse block height")
713 } else {
714 gw_info["lightning_info"]["connected"]["block_height"]
715 .as_u64()
716 .expect("Could not parse block height")
717 };
718
719 if bitcoind_block_height != block_height {
720 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
721 "gateway block height is not synced"
722 )));
723 }
724
725 let gateway_balance = gw
726 .ecash_balance(fed_id.clone())
727 .await
728 .map_err(ControlFlow::Continue)?;
729 poll_almost_equal!(gateway_balance, amount * 1000)
730 })
731 }))
732 .await?;
733
734 Ok(())
735 }
736
737 pub async fn pegout_gateways(
740 &self,
741 amount: u64,
742 gateways: Vec<&super::gatewayd::Gatewayd>,
743 ) -> Result<()> {
744 info!(amount, "Pegging-out gateway funds");
745 let fed_id = self.calculate_federation_id();
746 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
747 for gw in gateways.clone() {
748 let prev_fed_ecash_balance = gw
749 .get_balances()
750 .await?
751 .ecash_balances
752 .into_iter()
753 .find(|fed| fed.federation_id.to_string() == fed_id)
754 .expect("Gateway has not joined federation")
755 .ecash_balance_msats;
756
757 let pegout_address = self.bitcoind.get_new_address().await?;
758 let value = cmd!(
759 gw,
760 "ecash",
761 "pegout",
762 "--federation-id",
763 fed_id,
764 "--amount",
765 amount,
766 "--address",
767 pegout_address
768 )
769 .out_json()
770 .await?;
771 let response: WithdrawResponse = serde_json::from_value(value)?;
772 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
773 }
774 self.bitcoind.mine_blocks(21).await?;
775
776 try_join_all(
777 peg_outs
778 .values()
779 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
780 )
781 .await?;
782
783 for gw in gateways.clone() {
784 let after_fed_ecash_balance = gw
785 .get_balances()
786 .await?
787 .ecash_balances
788 .into_iter()
789 .find(|fed| fed.federation_id.to_string() == fed_id)
790 .expect("Gateway has not joined federation")
791 .ecash_balance_msats;
792
793 let ln_type = gw.ln.ln_type();
794 let prev_balance = peg_outs
795 .get(&ln_type)
796 .expect("peg out does not exist")
797 .0
798 .msats;
799 let fees = peg_outs
800 .get(&ln_type)
801 .expect("peg out does not exist")
802 .1
803 .fees;
804 let total_fee = fees.amount().to_sat() * 1000;
805 crate::util::almost_equal(
806 after_fed_ecash_balance.msats,
807 prev_balance - amount - total_fee,
808 2000,
809 )
810 .map_err(|e| {
811 anyhow::anyhow!(
812 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
813 e
814 )
815 })?;
816 }
817
818 Ok(())
819 }
820
821 pub fn calculate_federation_id(&self) -> String {
822 self.client_config()
823 .unwrap()
824 .global
825 .calculate_federation_id()
826 .to_string()
827 }
828
829 pub async fn await_block_sync(&self) -> Result<u64> {
830 let finality_delay = self.get_finality_delay()?;
831 let block_count = self.bitcoind.get_block_count().await?;
832 let expected = block_count.saturating_sub(finality_delay.into());
833 cmd!(
834 self.internal_client().await?,
835 "dev",
836 "wait-block-count",
837 expected
838 )
839 .run()
840 .await?;
841 Ok(expected)
842 }
843
844 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
845 let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
846 let client_config = &self.client_config()?;
847 let wallet_cfg = client_config
848 .modules
849 .get(&wallet_instance_id)
850 .context("wallet module not found")?
851 .clone()
852 .redecode_raw(&ModuleDecoderRegistry::new([(
853 wallet_instance_id,
854 fedimint_wallet_client::KIND,
855 fedimint_wallet_client::WalletModuleTypes::decoder(),
856 )]))?;
857 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
858
859 let finality_delay = wallet_cfg.finality_delay;
860 Ok(finality_delay)
861 }
862
863 pub async fn await_gateways_registered(&self) -> Result<()> {
864 let start_time = Instant::now();
865 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
866
867 poll("gateways registered", || async {
868 let num_gateways = cmd!(
869 self.internal_client()
870 .await
871 .map_err(ControlFlow::Continue)?,
872 "list-gateways"
873 )
874 .out_json()
875 .await
876 .map_err(ControlFlow::Continue)?
877 .as_array()
878 .context("invalid output")
879 .map_err(ControlFlow::Break)?
880 .len();
881
882 let expected_gateways =
885 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
886 1
887 } else {
888 2
889 };
890
891 poll_eq!(num_gateways, expected_gateways)
892 })
893 .await?;
894 debug!(target: LOG_DEVIMINT,
895 elapsed_ms = %start_time.elapsed().as_millis(),
896 "Gateways registered");
897 Ok(())
898 }
899
900 pub async fn await_all_peers(&self) -> Result<()> {
901 poll("Waiting for all peers to be online", || async {
902 cmd!(
903 self.internal_client()
904 .await
905 .map_err(ControlFlow::Continue)?,
906 "dev",
907 "api",
908 "--module",
909 "wallet",
910 "block_count"
911 )
912 .run()
913 .await
914 .map_err(ControlFlow::Continue)?;
915 Ok(())
916 })
917 .await
918 }
919
920 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
921 poll("Waiting for all peers to be online", || async {
922 cmd!(
923 self.internal_client()
924 .await
925 .map_err(ControlFlow::Continue)?,
926 "dev",
927 "api",
928 "--peer-id",
929 peer_id,
930 "--module",
931 "wallet",
932 "block_count"
933 )
934 .run()
935 .await
936 .map_err(ControlFlow::Continue)?;
937 Ok(())
938 })
939 .await
940 }
941
942 pub async fn finalize_mempool_tx(&self) -> Result<()> {
952 let finality_delay = self.get_finality_delay()?;
953 let blocks_to_mine = finality_delay + 1;
954 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
955 self.await_block_sync().await?;
956 Ok(())
957 }
958
959 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
960 self.bitcoind.mine_blocks(blocks).await?;
961 self.await_block_sync().await?;
962 Ok(())
963 }
964
965 pub fn num_members(&self) -> usize {
966 self.members.len()
967 }
968
969 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
970 self.members
971 .keys()
972 .map(|&peer_id| PeerId::from(peer_id as u16))
973 }
974}
975
976#[derive(Clone)]
977pub struct Fedimintd {
978 _bitcoind: Bitcoind,
979 process: ProcessHandle,
980}
981
982impl Fedimintd {
983 pub async fn new(
984 process_mgr: &ProcessManager,
985 bitcoind: Bitcoind,
986 peer_id: usize,
987 env: &vars::Fedimintd,
988 fed_name: String,
989 ) -> Result<Self> {
990 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
991 let process = process_mgr
992 .spawn_daemon(
993 &format!("fedimintd-{fed_name}-{peer_id}"),
994 cmd!(FedimintdCmd).envs(env.vars()),
995 )
996 .await?;
997
998 Ok(Self {
999 _bitcoind: bitcoind,
1000 process,
1001 })
1002 }
1003
1004 pub async fn terminate(self) -> Result<()> {
1005 self.process.terminate().await
1006 }
1007
1008 pub async fn await_terminated(&self) -> Result<()> {
1009 self.process.await_terminated().await
1010 }
1011}
1012
1013pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1014 let status_futures = endpoints.values().map(|endpoint| {
1016 let endpoint = endpoint.clone();
1017 async move {
1018 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1019 crate::util::FedimintCli
1020 .setup_status(&API_AUTH, &endpoint)
1021 .await
1022 .map_err(ControlFlow::Continue)
1023 })
1024 .await
1025 .unwrap();
1026
1027 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1028 }
1029 });
1030 join_all(status_futures).await;
1031
1032 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1033
1034 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1036 let peer = *peer;
1037 let endpoint = endpoint.clone();
1038 async move {
1039 let info = if peer.to_usize() == 0 {
1040 crate::util::FedimintCli
1041 .set_local_params_leader(&peer, &API_AUTH, &endpoint)
1042 .await
1043 } else {
1044 crate::util::FedimintCli
1045 .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1046 .await
1047 };
1048 info.map(|i| (peer, i))
1049 }
1050 });
1051 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1052 .await?
1053 .into_iter()
1054 .collect();
1055
1056 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1057
1058 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1061 endpoints
1062 .iter()
1063 .filter(move |(p, _)| *p != peer)
1064 .map(move |(_, endpoint)| {
1065 let endpoint = endpoint.clone();
1066 let info = info.clone();
1067 async move {
1068 crate::util::FedimintCli
1069 .add_peer(&info, &API_AUTH, &endpoint)
1070 .await
1071 }
1072 })
1073 });
1074 try_join_all(add_peer_futures).await?;
1075
1076 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1077
1078 let start_dkg_futures = endpoints.values().map(|endpoint| {
1080 let endpoint = endpoint.clone();
1081 async move {
1082 crate::util::FedimintCli
1083 .start_dkg(&API_AUTH, &endpoint)
1084 .await
1085 }
1086 });
1087 try_join_all(start_dkg_futures).await?;
1088
1089 Ok(())
1090}