1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::net::ConnectorType;
13use fedimint_api_client::api::{ConnectorRegistry, DynGlobalApi};
14use fedimint_client_module::module::ClientModule;
15use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
16use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
17use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
18use fedimint_core::envs::BitcoinRpcConfig;
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiAuth, ModuleCommon};
22use fedimint_core::runtime::block_in_place;
23use fedimint_core::task::block_on;
24use fedimint_core::task::jit::JitTryAnyhow;
25use fedimint_core::util::SafeUrl;
26use fedimint_core::{Amount, NumPeers, PeerId};
27use fedimint_gateway_common::WithdrawResponse;
28use fedimint_logging::LOG_DEVIMINT;
29use fedimint_server::config::ConfigGenParams;
30use fedimint_testing_core::config::local_config_gen_params;
31use fedimint_testing_core::node_type::LightningNodeType;
32use fedimint_wallet_client::WalletClientModule;
33use fedimint_wallet_client::config::WalletClientConfig;
34use fs_lock::FileLock;
35use futures::future::{join_all, try_join_all};
36use rand::Rng;
37use tokio::task::{JoinSet, spawn_blocking};
38use tokio::time::Instant;
39use tracing::{debug, info};
40
41use super::external::Bitcoind;
42use super::util::{Command, ProcessHandle, ProcessManager, cmd};
43use super::vars::utf8;
44use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
45use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
46use crate::{poll_almost_equal, poll_eq, vars};
47
48pub const PORTS_PER_FEDIMINTD: u16 = 4;
51pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
53pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
55pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
57pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
59
60#[derive(Clone)]
61pub struct Federation {
62 pub members: BTreeMap<usize, Fedimintd>,
64 pub vars: BTreeMap<usize, vars::Fedimintd>,
65 pub bitcoind: Bitcoind,
66
67 client: JitTryAnyhow<Client>,
69 #[allow(dead_code)] connectors: ConnectorRegistry,
71}
72
73impl Drop for Federation {
74 fn drop(&mut self) {
75 block_in_place(|| {
76 block_on(async {
77 let mut set = JoinSet::new();
78
79 while let Some((_id, fedimintd)) = self.members.pop_first() {
80 set.spawn(async { drop(fedimintd) });
81 }
82 while (set.join_next().await).is_some() {}
83 });
84 });
85 }
86}
87#[derive(Clone)]
89pub struct Client {
90 name: String,
91}
92
93impl Client {
94 fn clients_dir() -> PathBuf {
95 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
96 .expect("FM_DATA_DIR_ENV not set")
97 .parse()
98 .expect("FM_DATA_DIR_ENV invalid");
99 data_dir.join("clients")
100 }
101
102 fn client_dir(&self) -> PathBuf {
103 Self::clients_dir().join(&self.name)
104 }
105
106 pub fn client_name_lock(name: &str) -> Result<FileLock> {
107 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
108 let file_lock = std::fs::OpenOptions::new()
109 .write(true)
110 .create(true)
111 .truncate(true)
112 .open(&lock_path)
113 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
114
115 fs_lock::FileLock::new_exclusive(file_lock)
116 .with_context(|| format!("Failed to lock {}", lock_path.display()))
117 }
118
119 pub async fn create(name: impl ToString) -> Result<Client> {
121 let name = name.to_string();
122 spawn_blocking(move || {
123 let _lock = Self::client_name_lock(&name);
124 for i in 0u64.. {
125 let client = Self {
126 name: format!("{name}-{i}"),
127 };
128
129 if !client.client_dir().exists() {
130 std::fs::create_dir_all(client.client_dir())?;
131 return Ok(client);
132 }
133 }
134 unreachable!()
135 })
136 .await?
137 }
138
139 pub fn open_or_create(name: &str) -> Result<Client> {
141 block_in_place(|| {
142 let _lock = Self::client_name_lock(name);
143 let client = Self {
144 name: format!("{name}-0"),
145 };
146 if !client.client_dir().exists() {
147 std::fs::create_dir_all(client.client_dir())?;
148 }
149 Ok(client)
150 })
151 }
152
153 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
155 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
156 cmd!(self, "join-federation", invite_code).run().await?;
157
158 Ok(())
159 }
160
161 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
163 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
164 cmd!(
165 self,
166 "restore",
167 "--invite-code",
168 invite_code,
169 "--mnemonic",
170 mnemonic
171 )
172 .run()
173 .await?;
174
175 Ok(())
176 }
177
178 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
180 let restored = Self::open_or_create(name)?;
181
182 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
183 .as_str()
184 .unwrap()
185 .to_owned();
186
187 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
188 cmd!(
189 restored,
190 "restore",
191 "--invite-code",
192 invite_code,
193 "--mnemonic",
194 mnemonic
195 )
196 .run()
197 .await?;
198
199 Ok(restored)
200 }
201
202 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
205 let new = Client::create(name).await?;
206
207 cmd!(
208 "cp",
209 "-R",
210 self.client_dir().join("client.db").display(),
211 new.client_dir().display()
212 )
213 .run()
214 .await?;
215
216 Ok(new)
217 }
218
219 pub async fn balance(&self) -> Result<u64> {
220 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
221 .as_u64()
222 .unwrap())
223 }
224
225 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
226 let deposit = cmd!(self, "deposit-address").out_json().await?;
227 Ok((
228 deposit["address"].as_str().unwrap().to_string(),
229 deposit["operation_id"].as_str().unwrap().to_string(),
230 ))
231 }
232
233 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
234 cmd!(self, "await-deposit", operation_id).run().await
235 }
236
237 pub fn cmd(&self) -> Command {
238 cmd!(
239 crate::util::get_fedimint_cli_path(),
240 format!("--data-dir={}", self.client_dir().display())
241 )
242 }
243
244 pub fn get_name(&self) -> &str {
245 &self.name
246 }
247
248 pub async fn get_session_count(&self) -> Result<u64> {
250 cmd!(self, "dev", "session-count").out_json().await?["count"]
251 .as_u64()
252 .context("count field wasn't a number")
253 }
254
255 pub async fn wait_complete(&self) -> Result<()> {
257 cmd!(self, "dev", "wait-complete").run().await
258 }
259
260 pub async fn wait_session(&self) -> anyhow::Result<()> {
262 info!("Waiting for a new session");
263 let session_count = self.get_session_count().await?;
264 self.wait_session_outcome(session_count).await?;
265 Ok(())
266 }
267
268 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
270 let timeout = {
271 let current_session_count = self.get_session_count().await?;
272 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
273 let session_duration_seconds = 180;
274 Duration::from_secs(sessions_to_wait * session_duration_seconds)
275 };
276
277 let start = Instant::now();
278 poll_with_timeout("Waiting for a new session", timeout, || async {
279 info!("Awaiting session outcome {session_count}");
280 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
281 .run()
282 .await
283 {
284 Err(e) => Err(ControlFlow::Continue(e)),
285 Ok(()) => Ok(()),
286 }
287 })
288 .await?;
289
290 let session_found_in = start.elapsed();
291 info!("session found in {session_found_in:?}");
292 Ok(())
293 }
294}
295
296impl Federation {
297 pub async fn new(
298 process_mgr: &ProcessManager,
299 bitcoind: Bitcoind,
300 skip_setup: bool,
301 pre_dkg: bool,
302 fed_index: usize,
304 federation_name: String,
305 ) -> Result<Self> {
306 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
307 let mut members = BTreeMap::new();
308 let mut peer_to_env_vars_map = BTreeMap::new();
309
310 let peers: Vec<_> = num_peers.peer_ids().collect();
311 let params: HashMap<PeerId, ConfigGenParams> =
312 local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
313
314 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
315 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
316
317 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
318 for peer_id in num_peers.peer_ids() {
319 let peer_env_vars = vars::Fedimintd::init(
320 &process_mgr.globals,
321 federation_name.clone(),
322 peer_id,
323 process_mgr
324 .globals
325 .fedimintd_overrides
326 .peer_expect(fed_index, peer_id),
327 )
328 .await?;
329 members.insert(
330 peer_id.to_usize(),
331 Fedimintd::new(
332 process_mgr,
333 bitcoind.clone(),
334 peer_id.to_usize(),
335 &peer_env_vars,
336 federation_name.clone(),
337 )
338 .await?,
339 );
340 let admin_client = DynGlobalApi::new_admin_setup(
341 connectors.clone(),
342 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
343 )?;
346 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
347 admin_clients.insert(peer_id, admin_client);
348 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
349 }
350
351 if !skip_setup && !pre_dkg {
352 let (original_fedimint_cli_path, original_fm_mint_client) =
355 crate::util::use_matching_fedimint_cli_for_dkg().await?;
356
357 run_cli_dkg_v2(params, api_endpoints).await?;
358
359 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
361
362 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
364 let invite_code_filename_original = "invite-code";
365
366 for peer_env_vars in peer_to_env_vars_map.values() {
367 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
368
369 let invite_code = poll_simple("awaiting-invite-code", || async {
370 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
371 tokio::fs::read_to_string(&path)
372 .await
373 .with_context(|| format!("Awaiting invite code file: {path}"))
374 })
375 .await
376 .context("Awaiting invite code file")?;
377
378 ConnectorType::default()
379 .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
380 .await?;
381 }
382
383 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
385
386 tokio::fs::copy(
387 format!("{peer_data_dir}/{invite_code_filename_original}"),
388 format!("{client_dir}/{invite_code_filename_original}"),
389 )
390 .await
391 .context("copying invite-code file")?;
392
393 for (index, peer_env_vars) in &peer_to_env_vars_map {
396 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
397
398 let invite_code_filename_indexed =
399 format!("{invite_code_filename_original}-{index}");
400 tokio::fs::rename(
401 format!("{peer_data_dir}/{invite_code_filename_original}"),
402 format!("{client_dir}/{invite_code_filename_indexed}"),
403 )
404 .await
405 .context("moving invite-code file")?;
406 }
407
408 debug!("Moved invite-code files to client data directory");
409 }
410
411 let client = JitTryAnyhow::new_try({
412 move || async move {
413 let client = Client::open_or_create(federation_name.as_str())?;
414 let invite_code = Self::invite_code_static()?;
415 if !skip_setup && !pre_dkg {
416 cmd!(client, "join-federation", invite_code).run().await?;
417 }
418 Ok(client)
419 }
420 });
421
422 Ok(Self {
423 members,
424 vars: peer_to_env_vars_map,
425 bitcoind,
426 client,
427 connectors,
428 })
429 }
430
431 pub fn client_config(&self) -> Result<ClientConfig> {
432 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
433 load_from_file(&cfg_path)
434 }
435
436 pub fn module_client_config<M: ClientModule>(
437 &self,
438 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
439 self.client_config()?
440 .modules
441 .iter()
442 .find_map(|(module_instance_id, module_cfg)| {
443 if module_cfg.kind == M::kind() {
444 let decoders = ModuleDecoderRegistry::new(vec![(
445 *module_instance_id,
446 M::kind(),
447 M::decoder(),
448 )]);
449 Some(
450 module_cfg
451 .config
452 .clone()
453 .redecode_raw(&decoders)
454 .expect("Decoding client cfg failed")
455 .expect_decoded_ref()
456 .as_any()
457 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
458 .cloned()
459 .context("Cast to module config failed"),
460 )
461 } else {
462 None
463 }
464 })
465 .transpose()
466 }
467
468 pub fn deposit_fees(&self) -> Result<Amount> {
469 Ok(self
470 .module_client_config::<WalletClientModule>()?
471 .context("No wallet module found")?
472 .fee_consensus
473 .peg_in_abs)
474 }
475
476 pub fn invite_code(&self) -> Result<String> {
478 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
479 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
480 Ok(invite_code)
481 }
482
483 pub fn invite_code_static() -> Result<String> {
484 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486 Ok(invite_code)
487 }
488 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
489 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
490 let name = format!("invite-code-{peer_id}");
491 let invite_code = fs::read_to_string(data_dir.join(name))?;
492 Ok(invite_code)
493 }
494
495 pub async fn internal_client(&self) -> Result<&Client> {
499 self.client
500 .get_try()
501 .await
502 .context("Internal client joining Federation")
503 }
504
505 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
507 let client = Client::create(name).await?;
508 client.join_federation(self.invite_code()?).await?;
509 Ok(client)
510 }
511
512 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
513 if self.members.contains_key(&peer) {
514 bail!("fedimintd-{peer} already running");
515 }
516 self.members.insert(
517 peer,
518 Fedimintd::new(
519 process_mgr,
520 self.bitcoind.clone(),
521 peer,
522 &self.vars[&peer],
523 "default".to_string(),
524 )
525 .await?,
526 );
527 Ok(())
528 }
529
530 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
531 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
532 bail!("fedimintd-{peer_id} does not exist");
533 };
534 fedimintd.terminate().await?;
535 Ok(())
536 }
537
538 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
539 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
540 bail!("fedimintd-{peer_id} does not exist");
541 };
542 fedimintd.await_terminated().await?;
543 self.members.remove(&peer_id);
544 Ok(())
545 }
546
547 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
549 info!("starting all servers");
550 let fed_size = process_mgr.globals.FM_FED_SIZE;
551 for peer_id in 0..fed_size {
552 if self.members.contains_key(&peer_id) {
553 continue;
554 }
555 self.start_server(process_mgr, peer_id).await?;
556 }
557 self.await_all_peers().await?;
558 Ok(())
559 }
560
561 pub async fn terminate_all_servers(&mut self) -> Result<()> {
563 info!("terminating all servers");
564 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
565 for peer_id in running_peer_ids {
566 self.terminate_server(peer_id).await?;
567 }
568 Ok(())
569 }
570
571 pub async fn restart_all_staggered_with_bin(
576 &mut self,
577 process_mgr: &ProcessManager,
578 bin_path: &PathBuf,
579 ) -> Result<()> {
580 let fed_size = process_mgr.globals.FM_FED_SIZE;
581
582 self.start_all_servers(process_mgr).await?;
584
585 while self.num_members() > 0 {
587 self.terminate_server(self.num_members() - 1).await?;
588 if self.num_members() > 0 {
589 fedimint_core::task::sleep_in_test(
590 "waiting to shutdown remaining peers",
591 Duration::from_secs(10),
592 )
593 .await;
594 }
595 }
596
597 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
599
600 for peer_id in 0..fed_size {
602 self.start_server(process_mgr, peer_id).await?;
603 if peer_id < fed_size - 1 {
604 fedimint_core::task::sleep_in_test(
605 "waiting to restart remaining peers",
606 Duration::from_secs(10),
607 )
608 .await;
609 }
610 }
611
612 self.await_all_peers().await?;
613
614 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
615 info!("upgraded fedimintd to version: {}", fedimintd_version);
616 Ok(())
617 }
618
619 pub async fn restart_all_with_bin(
620 &mut self,
621 process_mgr: &ProcessManager,
622 bin_path: &PathBuf,
623 ) -> Result<()> {
624 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
626 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
628 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
630
631 self.restart_all_staggered_with_bin(process_mgr, bin_path)
632 .await
633 }
634
635 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
636 let fed_size = process_mgr.globals.FM_FED_SIZE;
637 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
638 anyhow::ensure!(
639 fed_size > 3 * offline_nodes,
640 "too many offline nodes ({offline_nodes}) to reach consensus"
641 );
642
643 while self.num_members() > fed_size - offline_nodes {
644 self.terminate_server(self.num_members() - 1).await?;
645 }
646
647 if offline_nodes > 0 {
648 info!(fed_size, offline_nodes, "federation is degraded");
649 }
650 Ok(())
651 }
652
653 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
654 let deposit_fees_msat = self.deposit_fees()?.msats;
655 assert_eq!(
656 deposit_fees_msat % 1000,
657 0,
658 "Deposit fees expected to be whole sats in test suite"
659 );
660 let deposit_fees = deposit_fees_msat / 1000;
661 info!(amount, deposit_fees, "Pegging-in client funds");
662
663 let (address, operation_id) = client.get_deposit_addr().await?;
664
665 self.bitcoind
666 .send_to(address, amount + deposit_fees)
667 .await?;
668 self.bitcoind.mine_blocks(21).await?;
669
670 Ok(operation_id)
671 }
672
673 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
674 let operation_id = self.pegin_client_no_wait(amount, client).await?;
675
676 client.await_deposit(&operation_id).await?;
677 Ok(())
678 }
679
680 pub async fn pegin_gateways(
683 &self,
684 amount: u64,
685 gateways: Vec<&super::gatewayd::Gatewayd>,
686 ) -> Result<()> {
687 let deposit_fees_msat = self.deposit_fees()?.msats;
688 assert_eq!(
689 deposit_fees_msat % 1000,
690 0,
691 "Deposit fees expected to be whole sats in test suite"
692 );
693 let deposit_fees = deposit_fees_msat / 1000;
694 info!(amount, deposit_fees, "Pegging-in gateway funds");
695 let fed_id = self.calculate_federation_id();
696 for gw in gateways.clone() {
697 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
698 self.bitcoind
699 .send_to(pegin_addr, amount + deposit_fees)
700 .await?;
701 }
702
703 self.bitcoind.mine_blocks(21).await?;
704 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
705 try_join_all(gateways.into_iter().map(|gw| {
706 poll("gateway pegin", || async {
707 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
708 let block_height: u64 = gw_info["block_height"]
709 .as_u64()
710 .expect("Could not parse block height");
711 if bitcoind_block_height != block_height {
712 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
713 "gateway block height is not synced"
714 )));
715 }
716
717 let gateway_balance = gw
718 .ecash_balance(fed_id.clone())
719 .await
720 .map_err(ControlFlow::Continue)?;
721 poll_almost_equal!(gateway_balance, amount * 1000)
722 })
723 }))
724 .await?;
725
726 Ok(())
727 }
728
729 pub async fn pegout_gateways(
732 &self,
733 amount: u64,
734 gateways: Vec<&super::gatewayd::Gatewayd>,
735 ) -> Result<()> {
736 info!(amount, "Pegging-out gateway funds");
737 let fed_id = self.calculate_federation_id();
738 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
739 for gw in gateways.clone() {
740 let prev_fed_ecash_balance = gw
741 .get_balances()
742 .await?
743 .ecash_balances
744 .into_iter()
745 .find(|fed| fed.federation_id.to_string() == fed_id)
746 .expect("Gateway has not joined federation")
747 .ecash_balance_msats;
748
749 let pegout_address = self.bitcoind.get_new_address().await?;
750 let value = cmd!(
751 gw,
752 "ecash",
753 "pegout",
754 "--federation-id",
755 fed_id,
756 "--amount",
757 amount,
758 "--address",
759 pegout_address
760 )
761 .out_json()
762 .await?;
763 let response: WithdrawResponse = serde_json::from_value(value)?;
764 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
765 }
766 self.bitcoind.mine_blocks(21).await?;
767
768 try_join_all(
769 peg_outs
770 .values()
771 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
772 )
773 .await?;
774
775 for gw in gateways.clone() {
776 let after_fed_ecash_balance = gw
777 .get_balances()
778 .await?
779 .ecash_balances
780 .into_iter()
781 .find(|fed| fed.federation_id.to_string() == fed_id)
782 .expect("Gateway has not joined federation")
783 .ecash_balance_msats;
784
785 let ln_type = gw.ln.ln_type();
786 let prev_balance = peg_outs
787 .get(&ln_type)
788 .expect("peg out does not exist")
789 .0
790 .msats;
791 let fees = peg_outs
792 .get(&ln_type)
793 .expect("peg out does not exist")
794 .1
795 .fees;
796 let total_fee = fees.amount().to_sat() * 1000;
797 crate::util::almost_equal(
798 after_fed_ecash_balance.msats,
799 prev_balance - amount - total_fee,
800 2000,
801 )
802 .map_err(|e| {
803 anyhow::anyhow!(
804 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
805 e
806 )
807 })?;
808 }
809
810 Ok(())
811 }
812
813 pub fn calculate_federation_id(&self) -> String {
814 self.client_config()
815 .unwrap()
816 .global
817 .calculate_federation_id()
818 .to_string()
819 }
820
821 pub async fn await_block_sync(&self) -> Result<u64> {
822 let finality_delay = self.get_finality_delay()?;
823 let block_count = self.bitcoind.get_block_count().await?;
824 let expected = block_count.saturating_sub(finality_delay.into());
825 cmd!(
826 self.internal_client().await?,
827 "dev",
828 "wait-block-count",
829 expected
830 )
831 .run()
832 .await?;
833 Ok(expected)
834 }
835
836 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
837 let client_config = &self.client_config()?;
838 let wallet_cfg = client_config
839 .modules
840 .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
841 .context("wallet module not found")?
842 .clone()
843 .redecode_raw(&ModuleDecoderRegistry::new([(
844 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
845 fedimint_wallet_client::KIND,
846 fedimint_wallet_client::WalletModuleTypes::decoder(),
847 )]))?;
848 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
849
850 let finality_delay = wallet_cfg.finality_delay;
851 Ok(finality_delay)
852 }
853
854 pub async fn await_gateways_registered(&self) -> Result<()> {
855 let start_time = Instant::now();
856 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
857
858 poll("gateways registered", || async {
859 let num_gateways = cmd!(
860 self.internal_client()
861 .await
862 .map_err(ControlFlow::Continue)?,
863 "list-gateways"
864 )
865 .out_json()
866 .await
867 .map_err(ControlFlow::Continue)?
868 .as_array()
869 .context("invalid output")
870 .map_err(ControlFlow::Break)?
871 .len();
872 poll_eq!(num_gateways, 1)
873 })
874 .await?;
875 debug!(target: LOG_DEVIMINT,
876 elapsed_ms = %start_time.elapsed().as_millis(),
877 "Gateways registered");
878 Ok(())
879 }
880
881 pub async fn await_all_peers(&self) -> Result<()> {
882 poll("Waiting for all peers to be online", || async {
883 cmd!(
884 self.internal_client()
885 .await
886 .map_err(ControlFlow::Continue)?,
887 "dev",
888 "api",
889 "--module",
890 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
891 "block_count"
892 )
893 .run()
894 .await
895 .map_err(ControlFlow::Continue)?;
896 Ok(())
897 })
898 .await
899 }
900
901 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
902 poll("Waiting for all peers to be online", || async {
903 cmd!(
904 self.internal_client()
905 .await
906 .map_err(ControlFlow::Continue)?,
907 "dev",
908 "api",
909 "--peer-id",
910 peer_id,
911 "--module",
912 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
913 "block_count"
914 )
915 .run()
916 .await
917 .map_err(ControlFlow::Continue)?;
918 Ok(())
919 })
920 .await
921 }
922
923 pub async fn finalize_mempool_tx(&self) -> Result<()> {
933 let finality_delay = self.get_finality_delay()?;
934 let blocks_to_mine = finality_delay + 1;
935 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
936 self.await_block_sync().await?;
937 Ok(())
938 }
939
940 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
941 self.bitcoind.mine_blocks(blocks).await?;
942 self.await_block_sync().await?;
943 Ok(())
944 }
945
946 pub fn num_members(&self) -> usize {
947 self.members.len()
948 }
949
950 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
951 self.members
952 .keys()
953 .map(|&peer_id| PeerId::from(peer_id as u16))
954 }
955}
956
957#[derive(Clone)]
958pub struct Fedimintd {
959 _bitcoind: Bitcoind,
960 process: ProcessHandle,
961}
962
963impl Fedimintd {
964 pub async fn new(
965 process_mgr: &ProcessManager,
966 bitcoind: Bitcoind,
967 peer_id: usize,
968 env: &vars::Fedimintd,
969 fed_name: String,
970 ) -> Result<Self> {
971 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
972 let process = process_mgr
973 .spawn_daemon(
974 &format!("fedimintd-{fed_name}-{peer_id}"),
975 cmd!(FedimintdCmd).envs(env.vars()),
976 )
977 .await?;
978
979 Ok(Self {
980 _bitcoind: bitcoind,
981 process,
982 })
983 }
984
985 pub async fn terminate(self) -> Result<()> {
986 self.process.terminate().await
987 }
988
989 pub async fn await_terminated(&self) -> Result<()> {
990 self.process.await_terminated().await
991 }
992}
993
994pub async fn run_cli_dkg(
995 params: HashMap<PeerId, ConfigGenParams>,
996 endpoints: BTreeMap<PeerId, String>,
997) -> Result<()> {
998 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
999
1000 debug!(target: LOG_DEVIMINT, "Running DKG");
1001 for endpoint in endpoints.values() {
1002 poll("trying-to-connect-to-peers", || async {
1003 crate::util::FedimintCli
1004 .ws_status(endpoint)
1005 .await
1006 .context("dkg status")
1007 .map_err(ControlFlow::Continue)
1008 })
1009 .await?;
1010 }
1011
1012 debug!(target: LOG_DEVIMINT, "Connected to all peers");
1013
1014 for (peer_id, endpoint) in &endpoints {
1015 let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1016 assert_eq!(
1017 status.server,
1018 ServerStatusLegacy::AwaitingPassword,
1019 "peer_id isn't waiting for password: {peer_id}"
1020 );
1021 }
1022
1023 debug!(target: LOG_DEVIMINT, "Setting passwords");
1024 for (peer_id, endpoint) in &endpoints {
1025 crate::util::FedimintCli
1026 .set_password(auth_for(peer_id), endpoint)
1027 .await?;
1028 }
1029 let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1030 let followers = endpoints
1031 .iter()
1032 .filter(|(id, _)| *id != leader_id)
1033 .collect::<BTreeMap<_, _>>();
1034
1035 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1036 let leader_name = "leader".to_string();
1037 crate::util::FedimintCli
1038 .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1039 .await?;
1040
1041 let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1042
1043 debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1044 cli_set_config_gen_params(
1045 leader_endpoint,
1046 auth_for(leader_id),
1047 server_gen_params.clone(),
1048 )
1049 .await?;
1050
1051 let followers_names = followers
1052 .keys()
1053 .map(|peer_id| {
1054 (*peer_id, {
1055 let random_string = rand::thread_rng()
1057 .sample_iter(&rand::distributions::Alphanumeric)
1058 .take(5)
1059 .map(char::from)
1060 .collect::<String>();
1061 format!("random-{random_string}{peer_id}")
1062 })
1063 })
1064 .collect::<BTreeMap<_, _>>();
1065 for (peer_id, endpoint) in &followers {
1066 let name = followers_names
1067 .get(peer_id)
1068 .context("missing follower name")?;
1069 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1070
1071 crate::util::FedimintCli
1072 .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1073 .await?;
1074
1075 cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1076 }
1077
1078 debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1079 let peers = crate::util::FedimintCli
1080 .get_config_gen_peers(leader_endpoint)
1081 .await?;
1082
1083 let found_names = peers
1084 .into_iter()
1085 .map(|peer| peer.name)
1086 .collect::<HashSet<_>>();
1087 let all_names = followers_names
1088 .values()
1089 .cloned()
1090 .chain(iter::once(leader_name))
1091 .collect::<HashSet<_>>();
1092 assert_eq!(found_names, all_names);
1093
1094 debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1095 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1096
1097 debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1098 let mut configs = vec![];
1099 for endpoint in endpoints.values() {
1100 let config = crate::util::FedimintCli
1101 .consensus_config_gen_params_legacy(endpoint)
1102 .await?;
1103 configs.push(config);
1104 }
1105 let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1107 consensus.dedup();
1108 assert_eq!(consensus.len(), 1);
1109 let ids = configs
1111 .iter()
1112 .map(|p| p.our_current_id)
1113 .collect::<HashSet<_>>();
1114 assert_eq!(ids.len(), endpoints.len());
1115 let dkg_results = endpoints
1116 .iter()
1117 .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1118 debug!(target: LOG_DEVIMINT, "Running DKG");
1119 let (dkg_results, leader_wait_result) = tokio::join!(
1120 join_all(dkg_results),
1121 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1122 );
1123 for result in dkg_results {
1124 result?;
1125 }
1126 leader_wait_result?;
1127
1128 debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1130 let mut hashes = HashSet::new();
1131 for (peer_id, endpoint) in &endpoints {
1132 cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1133 let hash = crate::util::FedimintCli
1134 .get_verify_config_hash(auth_for(peer_id), endpoint)
1135 .await?;
1136 hashes.insert(hash);
1137 }
1138 assert_eq!(hashes.len(), 1);
1139 for (peer_id, endpoint) in &endpoints {
1140 let result = crate::util::FedimintCli
1141 .start_consensus(auth_for(peer_id), endpoint)
1142 .await;
1143 if let Err(e) = result {
1144 tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1145 }
1146 cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1147 }
1148 Ok(())
1149}
1150
1151pub async fn run_cli_dkg_v2(
1152 params: HashMap<PeerId, ConfigGenParams>,
1153 endpoints: BTreeMap<PeerId, String>,
1154) -> Result<()> {
1155 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1156
1157 for (peer, endpoint) in &endpoints {
1158 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1159 crate::util::FedimintCli
1160 .setup_status(auth_for(peer), endpoint)
1161 .await
1162 .map_err(ControlFlow::Continue)
1163 })
1164 .await
1165 .unwrap();
1166
1167 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1168 }
1169
1170 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1171
1172 let mut connection_info = BTreeMap::new();
1173
1174 for (peer, endpoint) in &endpoints {
1175 let info = if peer.to_usize() == 0 {
1176 crate::util::FedimintCli
1177 .set_local_params_leader(peer, auth_for(peer), endpoint)
1178 .await?
1179 } else {
1180 crate::util::FedimintCli
1181 .set_local_params_follower(peer, auth_for(peer), endpoint)
1182 .await?
1183 };
1184
1185 connection_info.insert(peer, info);
1186 }
1187
1188 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1189
1190 for (peer, info) in connection_info {
1191 for (p, endpoint) in &endpoints {
1192 if p != peer {
1193 crate::util::FedimintCli
1194 .add_peer(&info, auth_for(p), endpoint)
1195 .await?;
1196 }
1197 }
1198 }
1199
1200 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1201
1202 for (peer, endpoint) in &endpoints {
1203 crate::util::FedimintCli
1204 .start_dkg(auth_for(peer), endpoint)
1205 .await?;
1206 }
1207
1208 Ok(())
1209}
1210
1211async fn cli_set_config_gen_params(
1212 endpoint: &str,
1213 auth: &ApiAuth,
1214 mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1215) -> Result<()> {
1216 self::config::attach_default_module_init_params(
1217 &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1218 &mut server_gen_params,
1219 Network::Regtest,
1220 10,
1221 );
1222
1223 let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1224
1225 crate::util::FedimintCli
1226 .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1227 .await?;
1228
1229 Ok(())
1230}
1231
1232async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1233 poll(
1234 &format!("waiting-server-status: {expected_status:?}"),
1235 || async {
1236 let server_status = crate::util::FedimintCli
1237 .ws_status(endpoint)
1238 .await
1239 .context("server status")
1240 .map_err(ControlFlow::Continue)?
1241 .server;
1242 if server_status == expected_status {
1243 Ok(())
1244 } else {
1245 Err(ControlFlow::Continue(anyhow!(
1246 "expected status: {expected_status:?} current status: {server_status:?}"
1247 )))
1248 }
1249 },
1250 )
1251 .await?;
1252 Ok(())
1253}