1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::api::net::ConnectorType;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::VERSION_0_10_0_ALPHA;
42use crate::{poll_almost_equal, poll_eq, vars};
43
44pub const PORTS_PER_FEDIMINTD: u16 = 4;
47pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58 pub members: BTreeMap<usize, Fedimintd>,
60 pub vars: BTreeMap<usize, vars::Fedimintd>,
61 pub bitcoind: Bitcoind,
62
63 client: JitTryAnyhow<Client>,
65 #[allow(dead_code)] connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70 fn drop(&mut self) {
71 block_in_place(|| {
72 block_on(async {
73 let mut set = JoinSet::new();
74
75 while let Some((_id, fedimintd)) = self.members.pop_first() {
76 set.spawn(async { drop(fedimintd) });
77 }
78 while (set.join_next().await).is_some() {}
79 });
80 });
81 }
82}
83#[derive(Clone)]
85pub struct Client {
86 name: String,
87}
88
89impl Client {
90 fn clients_dir() -> PathBuf {
91 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92 .expect("FM_DATA_DIR_ENV not set")
93 .parse()
94 .expect("FM_DATA_DIR_ENV invalid");
95 data_dir.join("clients")
96 }
97
98 fn client_dir(&self) -> PathBuf {
99 Self::clients_dir().join(&self.name)
100 }
101
102 pub fn client_name_lock(name: &str) -> Result<FileLock> {
103 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104 let file_lock = std::fs::OpenOptions::new()
105 .write(true)
106 .create(true)
107 .truncate(true)
108 .open(&lock_path)
109 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111 fs_lock::FileLock::new_exclusive(file_lock)
112 .with_context(|| format!("Failed to lock {}", lock_path.display()))
113 }
114
115 pub async fn create(name: impl ToString) -> Result<Client> {
117 let name = name.to_string();
118 spawn_blocking(move || {
119 let _lock = Self::client_name_lock(&name);
120 for i in 0u64.. {
121 let client = Self {
122 name: format!("{name}-{i}"),
123 };
124
125 if !client.client_dir().exists() {
126 std::fs::create_dir_all(client.client_dir())?;
127 return Ok(client);
128 }
129 }
130 unreachable!()
131 })
132 .await?
133 }
134
135 pub fn open_or_create(name: &str) -> Result<Client> {
137 block_in_place(|| {
138 let _lock = Self::client_name_lock(name);
139 let client = Self {
140 name: format!("{name}-0"),
141 };
142 if !client.client_dir().exists() {
143 std::fs::create_dir_all(client.client_dir())?;
144 }
145 Ok(client)
146 })
147 }
148
149 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152 cmd!(self, "join-federation", invite_code).run().await?;
153
154 Ok(())
155 }
156
157 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160 cmd!(
161 self,
162 "restore",
163 "--invite-code",
164 invite_code,
165 "--mnemonic",
166 mnemonic
167 )
168 .run()
169 .await?;
170
171 Ok(())
172 }
173
174 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176 let restored = Self::open_or_create(name)?;
177
178 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179 .as_str()
180 .unwrap()
181 .to_owned();
182
183 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184 cmd!(
185 restored,
186 "restore",
187 "--invite-code",
188 invite_code,
189 "--mnemonic",
190 mnemonic
191 )
192 .run()
193 .await?;
194
195 Ok(restored)
196 }
197
198 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201 let new = Client::create(name).await?;
202
203 cmd!(
204 "cp",
205 "-R",
206 self.client_dir().join("client.db").display(),
207 new.client_dir().display()
208 )
209 .run()
210 .await?;
211
212 Ok(new)
213 }
214
215 pub async fn balance(&self) -> Result<u64> {
216 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217 .as_u64()
218 .unwrap())
219 }
220
221 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
222 let deposit = cmd!(self, "deposit-address").out_json().await?;
223 Ok((
224 deposit["address"].as_str().unwrap().to_string(),
225 deposit["operation_id"].as_str().unwrap().to_string(),
226 ))
227 }
228
229 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
230 cmd!(self, "await-deposit", operation_id).run().await
231 }
232
233 pub fn cmd(&self) -> Command {
234 cmd!(
235 crate::util::get_fedimint_cli_path(),
236 format!("--data-dir={}", self.client_dir().display())
237 )
238 }
239
240 pub fn get_name(&self) -> &str {
241 &self.name
242 }
243
244 pub async fn get_session_count(&self) -> Result<u64> {
246 cmd!(self, "dev", "session-count").out_json().await?["count"]
247 .as_u64()
248 .context("count field wasn't a number")
249 }
250
251 pub async fn wait_complete(&self) -> Result<()> {
253 cmd!(self, "dev", "wait-complete").run().await
254 }
255
256 pub async fn wait_session(&self) -> anyhow::Result<()> {
258 info!("Waiting for a new session");
259 let session_count = self.get_session_count().await?;
260 self.wait_session_outcome(session_count).await?;
261 Ok(())
262 }
263
264 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
266 let timeout = {
267 let current_session_count = self.get_session_count().await?;
268 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
269 let session_duration_seconds = 180;
270 Duration::from_secs(sessions_to_wait * session_duration_seconds)
271 };
272
273 let start = Instant::now();
274 poll_with_timeout("Waiting for a new session", timeout, || async {
275 info!("Awaiting session outcome {session_count}");
276 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
277 .run()
278 .await
279 {
280 Err(e) => Err(ControlFlow::Continue(e)),
281 Ok(()) => Ok(()),
282 }
283 })
284 .await?;
285
286 let session_found_in = start.elapsed();
287 info!("session found in {session_found_in:?}");
288 Ok(())
289 }
290}
291
292impl Federation {
293 pub async fn new(
294 process_mgr: &ProcessManager,
295 bitcoind: Bitcoind,
296 skip_setup: bool,
297 pre_dkg: bool,
298 fed_index: usize,
300 federation_name: String,
301 ) -> Result<Self> {
302 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
303 let mut members = BTreeMap::new();
304 let mut peer_to_env_vars_map = BTreeMap::new();
305
306 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
307 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
308
309 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
310 for peer_id in num_peers.peer_ids() {
311 let peer_env_vars = vars::Fedimintd::init(
312 &process_mgr.globals,
313 federation_name.clone(),
314 peer_id,
315 process_mgr
316 .globals
317 .fedimintd_overrides
318 .peer_expect(fed_index, peer_id),
319 )
320 .await?;
321 members.insert(
322 peer_id.to_usize(),
323 Fedimintd::new(
324 process_mgr,
325 bitcoind.clone(),
326 peer_id.to_usize(),
327 &peer_env_vars,
328 federation_name.clone(),
329 )
330 .await?,
331 );
332 let admin_client = DynGlobalApi::new_admin_setup(
333 connectors.clone(),
334 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
335 )?;
338 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
339 admin_clients.insert(peer_id, admin_client);
340 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
341 }
342
343 if !skip_setup && !pre_dkg {
344 let (original_fedimint_cli_path, original_fm_mint_client) =
347 crate::util::use_matching_fedimint_cli_for_dkg().await?;
348
349 run_cli_dkg_v2(api_endpoints).await?;
350
351 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
353
354 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
356 let invite_code_filename_original = "invite-code";
357
358 for peer_env_vars in peer_to_env_vars_map.values() {
359 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
360
361 let invite_code = poll_simple("awaiting-invite-code", || async {
362 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
363 tokio::fs::read_to_string(&path)
364 .await
365 .with_context(|| format!("Awaiting invite code file: {path}"))
366 })
367 .await
368 .context("Awaiting invite code file")?;
369
370 ConnectorType::default()
371 .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
372 .await?;
373 }
374
375 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
377
378 tokio::fs::copy(
379 format!("{peer_data_dir}/{invite_code_filename_original}"),
380 format!("{client_dir}/{invite_code_filename_original}"),
381 )
382 .await
383 .context("copying invite-code file")?;
384
385 for (index, peer_env_vars) in &peer_to_env_vars_map {
388 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
389
390 let invite_code_filename_indexed =
391 format!("{invite_code_filename_original}-{index}");
392 tokio::fs::rename(
393 format!("{peer_data_dir}/{invite_code_filename_original}"),
394 format!("{client_dir}/{invite_code_filename_indexed}"),
395 )
396 .await
397 .context("moving invite-code file")?;
398 }
399
400 debug!("Moved invite-code files to client data directory");
401 }
402
403 let client = JitTryAnyhow::new_try({
404 move || async move {
405 let client = Client::open_or_create(federation_name.as_str())?;
406 let invite_code = Self::invite_code_static()?;
407 if !skip_setup && !pre_dkg {
408 cmd!(client, "join-federation", invite_code).run().await?;
409 }
410 Ok(client)
411 }
412 });
413
414 Ok(Self {
415 members,
416 vars: peer_to_env_vars_map,
417 bitcoind,
418 client,
419 connectors,
420 })
421 }
422
423 pub fn client_config(&self) -> Result<ClientConfig> {
424 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
425 load_from_file(&cfg_path)
426 }
427
428 pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
430 self.client_config()?
431 .modules
432 .iter()
433 .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
434 .with_context(|| format!("Module kind {kind} not found"))
435 }
436
437 pub fn module_client_config<M: ClientModule>(
438 &self,
439 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
440 self.client_config()?
441 .modules
442 .iter()
443 .find_map(|(module_instance_id, module_cfg)| {
444 if module_cfg.kind == M::kind() {
445 let decoders = ModuleDecoderRegistry::new(vec![(
446 *module_instance_id,
447 M::kind(),
448 M::decoder(),
449 )]);
450 Some(
451 module_cfg
452 .config
453 .clone()
454 .redecode_raw(&decoders)
455 .expect("Decoding client cfg failed")
456 .expect_decoded_ref()
457 .as_any()
458 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
459 .cloned()
460 .context("Cast to module config failed"),
461 )
462 } else {
463 None
464 }
465 })
466 .transpose()
467 }
468
469 pub fn deposit_fees(&self) -> Result<Amount> {
470 Ok(self
471 .module_client_config::<WalletClientModule>()?
472 .context("No wallet module found")?
473 .fee_consensus
474 .peg_in_abs)
475 }
476
477 pub fn invite_code(&self) -> Result<String> {
479 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
480 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
481 Ok(invite_code)
482 }
483
484 pub fn invite_code_static() -> Result<String> {
485 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
486 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
487 Ok(invite_code)
488 }
489 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
490 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
491 let name = format!("invite-code-{peer_id}");
492 let invite_code = fs::read_to_string(data_dir.join(name))?;
493 Ok(invite_code)
494 }
495
496 pub async fn internal_client(&self) -> Result<&Client> {
500 self.client
501 .get_try()
502 .await
503 .context("Internal client joining Federation")
504 }
505
506 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
508 let client = Client::create(name).await?;
509 client.join_federation(self.invite_code()?).await?;
510 Ok(client)
511 }
512
513 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
514 if self.members.contains_key(&peer) {
515 bail!("fedimintd-{peer} already running");
516 }
517 self.members.insert(
518 peer,
519 Fedimintd::new(
520 process_mgr,
521 self.bitcoind.clone(),
522 peer,
523 &self.vars[&peer],
524 "default".to_string(),
525 )
526 .await?,
527 );
528 Ok(())
529 }
530
531 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
532 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
533 bail!("fedimintd-{peer_id} does not exist");
534 };
535 fedimintd.terminate().await?;
536 Ok(())
537 }
538
539 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
540 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
541 bail!("fedimintd-{peer_id} does not exist");
542 };
543 fedimintd.await_terminated().await?;
544 self.members.remove(&peer_id);
545 Ok(())
546 }
547
548 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
550 info!("starting all servers");
551 let fed_size = process_mgr.globals.FM_FED_SIZE;
552 for peer_id in 0..fed_size {
553 if self.members.contains_key(&peer_id) {
554 continue;
555 }
556 self.start_server(process_mgr, peer_id).await?;
557 }
558 self.await_all_peers().await?;
559 Ok(())
560 }
561
562 pub async fn terminate_all_servers(&mut self) -> Result<()> {
564 info!("terminating all servers");
565 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
566 for peer_id in running_peer_ids {
567 self.terminate_server(peer_id).await?;
568 }
569 Ok(())
570 }
571
572 pub async fn restart_all_staggered_with_bin(
577 &mut self,
578 process_mgr: &ProcessManager,
579 bin_path: &PathBuf,
580 ) -> Result<()> {
581 let fed_size = process_mgr.globals.FM_FED_SIZE;
582
583 self.start_all_servers(process_mgr).await?;
585
586 while self.num_members() > 0 {
588 self.terminate_server(self.num_members() - 1).await?;
589 if self.num_members() > 0 {
590 fedimint_core::task::sleep_in_test(
591 "waiting to shutdown remaining peers",
592 Duration::from_secs(10),
593 )
594 .await;
595 }
596 }
597
598 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
600
601 for peer_id in 0..fed_size {
603 self.start_server(process_mgr, peer_id).await?;
604 if peer_id < fed_size - 1 {
605 fedimint_core::task::sleep_in_test(
606 "waiting to restart remaining peers",
607 Duration::from_secs(10),
608 )
609 .await;
610 }
611 }
612
613 self.await_all_peers().await?;
614
615 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
616 info!("upgraded fedimintd to version: {}", fedimintd_version);
617 Ok(())
618 }
619
620 pub async fn restart_all_with_bin(
621 &mut self,
622 process_mgr: &ProcessManager,
623 bin_path: &PathBuf,
624 ) -> Result<()> {
625 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
627 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
629 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
631
632 self.restart_all_staggered_with_bin(process_mgr, bin_path)
633 .await
634 }
635
636 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
637 let fed_size = process_mgr.globals.FM_FED_SIZE;
638 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
639 anyhow::ensure!(
640 fed_size > 3 * offline_nodes,
641 "too many offline nodes ({offline_nodes}) to reach consensus"
642 );
643
644 while self.num_members() > fed_size - offline_nodes {
645 self.terminate_server(self.num_members() - 1).await?;
646 }
647
648 if offline_nodes > 0 {
649 info!(fed_size, offline_nodes, "federation is degraded");
650 }
651 Ok(())
652 }
653
654 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
655 let deposit_fees_msat = self.deposit_fees()?.msats;
656 assert_eq!(
657 deposit_fees_msat % 1000,
658 0,
659 "Deposit fees expected to be whole sats in test suite"
660 );
661 let deposit_fees = deposit_fees_msat / 1000;
662 info!(amount, deposit_fees, "Pegging-in client funds");
663
664 let (address, operation_id) = client.get_deposit_addr().await?;
665
666 self.bitcoind
667 .send_to(address, amount + deposit_fees)
668 .await?;
669 self.bitcoind.mine_blocks(21).await?;
670
671 Ok(operation_id)
672 }
673
674 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
675 let operation_id = self.pegin_client_no_wait(amount, client).await?;
676
677 client.await_deposit(&operation_id).await?;
678 Ok(())
679 }
680
681 pub async fn pegin_gateways(
684 &self,
685 amount: u64,
686 gateways: Vec<&super::gatewayd::Gatewayd>,
687 ) -> Result<()> {
688 let deposit_fees_msat = self.deposit_fees()?.msats;
689 assert_eq!(
690 deposit_fees_msat % 1000,
691 0,
692 "Deposit fees expected to be whole sats in test suite"
693 );
694 let deposit_fees = deposit_fees_msat / 1000;
695 info!(amount, deposit_fees, "Pegging-in gateway funds");
696 let fed_id = self.calculate_federation_id();
697 for gw in gateways.clone() {
698 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
699 self.bitcoind
700 .send_to(pegin_addr, amount + deposit_fees)
701 .await?;
702 }
703
704 self.bitcoind.mine_blocks(21).await?;
705 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
706 try_join_all(gateways.into_iter().map(|gw| {
707 poll("gateway pegin", || async {
708 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
709
710 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
711 gw_info["block_height"]
712 .as_u64()
713 .expect("Could not parse block height")
714 } else {
715 gw_info["lightning_info"]["connected"]["block_height"]
716 .as_u64()
717 .expect("Could not parse block height")
718 };
719
720 if bitcoind_block_height != block_height {
721 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
722 "gateway block height is not synced"
723 )));
724 }
725
726 let gateway_balance = gw
727 .ecash_balance(fed_id.clone())
728 .await
729 .map_err(ControlFlow::Continue)?;
730 poll_almost_equal!(gateway_balance, amount * 1000)
731 })
732 }))
733 .await?;
734
735 Ok(())
736 }
737
738 pub async fn pegout_gateways(
741 &self,
742 amount: u64,
743 gateways: Vec<&super::gatewayd::Gatewayd>,
744 ) -> Result<()> {
745 info!(amount, "Pegging-out gateway funds");
746 let fed_id = self.calculate_federation_id();
747 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
748 for gw in gateways.clone() {
749 let prev_fed_ecash_balance = gw
750 .get_balances()
751 .await?
752 .ecash_balances
753 .into_iter()
754 .find(|fed| fed.federation_id.to_string() == fed_id)
755 .expect("Gateway has not joined federation")
756 .ecash_balance_msats;
757
758 let pegout_address = self.bitcoind.get_new_address().await?;
759 let value = cmd!(
760 gw,
761 "ecash",
762 "pegout",
763 "--federation-id",
764 fed_id,
765 "--amount",
766 amount,
767 "--address",
768 pegout_address
769 )
770 .out_json()
771 .await?;
772 let response: WithdrawResponse = serde_json::from_value(value)?;
773 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
774 }
775 self.bitcoind.mine_blocks(21).await?;
776
777 try_join_all(
778 peg_outs
779 .values()
780 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
781 )
782 .await?;
783
784 for gw in gateways.clone() {
785 let after_fed_ecash_balance = gw
786 .get_balances()
787 .await?
788 .ecash_balances
789 .into_iter()
790 .find(|fed| fed.federation_id.to_string() == fed_id)
791 .expect("Gateway has not joined federation")
792 .ecash_balance_msats;
793
794 let ln_type = gw.ln.ln_type();
795 let prev_balance = peg_outs
796 .get(&ln_type)
797 .expect("peg out does not exist")
798 .0
799 .msats;
800 let fees = peg_outs
801 .get(&ln_type)
802 .expect("peg out does not exist")
803 .1
804 .fees;
805 let total_fee = fees.amount().to_sat() * 1000;
806 crate::util::almost_equal(
807 after_fed_ecash_balance.msats,
808 prev_balance - amount - total_fee,
809 2000,
810 )
811 .map_err(|e| {
812 anyhow::anyhow!(
813 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
814 e
815 )
816 })?;
817 }
818
819 Ok(())
820 }
821
822 pub fn calculate_federation_id(&self) -> String {
823 self.client_config()
824 .unwrap()
825 .global
826 .calculate_federation_id()
827 .to_string()
828 }
829
830 pub async fn await_block_sync(&self) -> Result<u64> {
831 let finality_delay = self.get_finality_delay()?;
832 let block_count = self.bitcoind.get_block_count().await?;
833 let expected = block_count.saturating_sub(finality_delay.into());
834 cmd!(
835 self.internal_client().await?,
836 "dev",
837 "wait-block-count",
838 expected
839 )
840 .run()
841 .await?;
842 Ok(expected)
843 }
844
845 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
846 let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
847 let client_config = &self.client_config()?;
848 let wallet_cfg = client_config
849 .modules
850 .get(&wallet_instance_id)
851 .context("wallet module not found")?
852 .clone()
853 .redecode_raw(&ModuleDecoderRegistry::new([(
854 wallet_instance_id,
855 fedimint_wallet_client::KIND,
856 fedimint_wallet_client::WalletModuleTypes::decoder(),
857 )]))?;
858 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
859
860 let finality_delay = wallet_cfg.finality_delay;
861 Ok(finality_delay)
862 }
863
864 pub async fn await_gateways_registered(&self) -> Result<()> {
865 let start_time = Instant::now();
866 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
867
868 poll("gateways registered", || async {
869 let num_gateways = cmd!(
870 self.internal_client()
871 .await
872 .map_err(ControlFlow::Continue)?,
873 "list-gateways"
874 )
875 .out_json()
876 .await
877 .map_err(ControlFlow::Continue)?
878 .as_array()
879 .context("invalid output")
880 .map_err(ControlFlow::Break)?
881 .len();
882
883 let expected_gateways =
886 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
887 1
888 } else {
889 2
890 };
891
892 poll_eq!(num_gateways, expected_gateways)
893 })
894 .await?;
895 debug!(target: LOG_DEVIMINT,
896 elapsed_ms = %start_time.elapsed().as_millis(),
897 "Gateways registered");
898 Ok(())
899 }
900
901 pub async fn await_all_peers(&self) -> Result<()> {
902 poll("Waiting for all peers to be online", || async {
903 cmd!(
904 self.internal_client()
905 .await
906 .map_err(ControlFlow::Continue)?,
907 "dev",
908 "api",
909 "--module",
910 "wallet",
911 "block_count"
912 )
913 .run()
914 .await
915 .map_err(ControlFlow::Continue)?;
916 Ok(())
917 })
918 .await
919 }
920
921 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
922 poll("Waiting for all peers to be online", || async {
923 cmd!(
924 self.internal_client()
925 .await
926 .map_err(ControlFlow::Continue)?,
927 "dev",
928 "api",
929 "--peer-id",
930 peer_id,
931 "--module",
932 "wallet",
933 "block_count"
934 )
935 .run()
936 .await
937 .map_err(ControlFlow::Continue)?;
938 Ok(())
939 })
940 .await
941 }
942
943 pub async fn finalize_mempool_tx(&self) -> Result<()> {
953 let finality_delay = self.get_finality_delay()?;
954 let blocks_to_mine = finality_delay + 1;
955 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
956 self.await_block_sync().await?;
957 Ok(())
958 }
959
960 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
961 self.bitcoind.mine_blocks(blocks).await?;
962 self.await_block_sync().await?;
963 Ok(())
964 }
965
966 pub fn num_members(&self) -> usize {
967 self.members.len()
968 }
969
970 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
971 self.members
972 .keys()
973 .map(|&peer_id| PeerId::from(peer_id as u16))
974 }
975}
976
977#[derive(Clone)]
978pub struct Fedimintd {
979 _bitcoind: Bitcoind,
980 process: ProcessHandle,
981}
982
983impl Fedimintd {
984 pub async fn new(
985 process_mgr: &ProcessManager,
986 bitcoind: Bitcoind,
987 peer_id: usize,
988 env: &vars::Fedimintd,
989 fed_name: String,
990 ) -> Result<Self> {
991 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
992 let process = process_mgr
993 .spawn_daemon(
994 &format!("fedimintd-{fed_name}-{peer_id}"),
995 cmd!(FedimintdCmd).envs(env.vars()),
996 )
997 .await?;
998
999 Ok(Self {
1000 _bitcoind: bitcoind,
1001 process,
1002 })
1003 }
1004
1005 pub async fn terminate(self) -> Result<()> {
1006 self.process.terminate().await
1007 }
1008
1009 pub async fn await_terminated(&self) -> Result<()> {
1010 self.process.await_terminated().await
1011 }
1012}
1013
1014pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1015 let status_futures = endpoints.values().map(|endpoint| {
1017 let endpoint = endpoint.clone();
1018 async move {
1019 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1020 crate::util::FedimintCli
1021 .setup_status(&API_AUTH, &endpoint)
1022 .await
1023 .map_err(ControlFlow::Continue)
1024 })
1025 .await
1026 .unwrap();
1027
1028 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1029 }
1030 });
1031 join_all(status_futures).await;
1032
1033 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1034
1035 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1037 let peer = *peer;
1038 let endpoint = endpoint.clone();
1039 async move {
1040 let info = if peer.to_usize() == 0 {
1041 crate::util::FedimintCli
1042 .set_local_params_leader(&peer, &API_AUTH, &endpoint)
1043 .await
1044 } else {
1045 crate::util::FedimintCli
1046 .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1047 .await
1048 };
1049 info.map(|i| (peer, i))
1050 }
1051 });
1052 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1053 .await?
1054 .into_iter()
1055 .collect();
1056
1057 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1058
1059 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1062 endpoints
1063 .iter()
1064 .filter(move |(p, _)| *p != peer)
1065 .map(move |(_, endpoint)| {
1066 let endpoint = endpoint.clone();
1067 let info = info.clone();
1068 async move {
1069 crate::util::FedimintCli
1070 .add_peer(&info, &API_AUTH, &endpoint)
1071 .await
1072 }
1073 })
1074 });
1075 try_join_all(add_peer_futures).await?;
1076
1077 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1078
1079 let start_dkg_futures = endpoints.values().map(|endpoint| {
1081 let endpoint = endpoint.clone();
1082 async move {
1083 crate::util::FedimintCli
1084 .start_dkg(&API_AUTH, &endpoint)
1085 .await
1086 }
1087 });
1088 try_join_all(start_dkg_futures).await?;
1089
1090 Ok(())
1091}