1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::DynGlobalApi;
13use fedimint_api_client::api::net::Connector;
14use fedimint_client_module::module::ClientModule;
15use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
16use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
17use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
18use fedimint_core::envs::BitcoinRpcConfig;
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiAuth, ModuleCommon};
22use fedimint_core::runtime::block_in_place;
23use fedimint_core::task::block_on;
24use fedimint_core::task::jit::JitTryAnyhow;
25use fedimint_core::util::SafeUrl;
26use fedimint_core::{Amount, NumPeers, PeerId};
27use fedimint_gateway_common::WithdrawResponse;
28use fedimint_logging::LOG_DEVIMINT;
29use fedimint_server::config::ConfigGenParams;
30use fedimint_testing_core::config::local_config_gen_params;
31use fedimint_testing_core::node_type::LightningNodeType;
32use fedimint_wallet_client::WalletClientModule;
33use fedimint_wallet_client::config::WalletClientConfig;
34use fs_lock::FileLock;
35use futures::future::{join_all, try_join_all};
36use rand::Rng;
37use tokio::task::{JoinSet, spawn_blocking};
38use tokio::time::Instant;
39use tracing::{debug, info};
40
41use super::external::Bitcoind;
42use super::util::{Command, ProcessHandle, ProcessManager, cmd};
43use super::vars::utf8;
44use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
45use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
46use crate::version_constants::VERSION_0_7_0_ALPHA;
47use crate::{poll_almost_equal, poll_eq, vars};
48
49pub const PORTS_PER_FEDIMINTD: u16 = 4;
52pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
54pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
56pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
58pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
60
61#[derive(Clone)]
62pub struct Federation {
63 pub members: BTreeMap<usize, Fedimintd>,
65 pub vars: BTreeMap<usize, vars::Fedimintd>,
66 pub bitcoind: Bitcoind,
67
68 client: JitTryAnyhow<Client>,
70}
71
72impl Drop for Federation {
73 fn drop(&mut self) {
74 block_in_place(|| {
75 block_on(async {
76 let mut set = JoinSet::new();
77
78 while let Some((_id, fedimintd)) = self.members.pop_first() {
79 set.spawn(async { drop(fedimintd) });
80 }
81 while (set.join_next().await).is_some() {}
82 });
83 });
84 }
85}
86#[derive(Clone)]
88pub struct Client {
89 name: String,
90}
91
92impl Client {
93 fn clients_dir() -> PathBuf {
94 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
95 .expect("FM_DATA_DIR_ENV not set")
96 .parse()
97 .expect("FM_DATA_DIR_ENV invalid");
98 data_dir.join("clients")
99 }
100
101 fn client_dir(&self) -> PathBuf {
102 Self::clients_dir().join(&self.name)
103 }
104
105 pub fn client_name_lock(name: &str) -> Result<FileLock> {
106 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
107 let file_lock = std::fs::OpenOptions::new()
108 .write(true)
109 .create(true)
110 .truncate(true)
111 .open(&lock_path)
112 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
113
114 fs_lock::FileLock::new_exclusive(file_lock)
115 .with_context(|| format!("Failed to lock {}", lock_path.display()))
116 }
117
118 pub async fn create(name: impl ToString) -> Result<Client> {
120 let name = name.to_string();
121 spawn_blocking(move || {
122 let _lock = Self::client_name_lock(&name);
123 for i in 0u64.. {
124 let client = Self {
125 name: format!("{name}-{i}"),
126 };
127
128 if !client.client_dir().exists() {
129 std::fs::create_dir_all(client.client_dir())?;
130 return Ok(client);
131 }
132 }
133 unreachable!()
134 })
135 .await?
136 }
137
138 pub fn open_or_create(name: &str) -> Result<Client> {
140 block_in_place(|| {
141 let _lock = Self::client_name_lock(name);
142 let client = Self {
143 name: format!("{name}-0"),
144 };
145 if !client.client_dir().exists() {
146 std::fs::create_dir_all(client.client_dir())?;
147 }
148 Ok(client)
149 })
150 }
151
152 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
154 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
155 cmd!(self, "join-federation", invite_code).run().await?;
156
157 Ok(())
158 }
159
160 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
162 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
163 cmd!(
164 self,
165 "restore",
166 "--invite-code",
167 invite_code,
168 "--mnemonic",
169 mnemonic
170 )
171 .run()
172 .await?;
173
174 Ok(())
175 }
176
177 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
179 let restored = Self::open_or_create(name)?;
180
181 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
182 .as_str()
183 .unwrap()
184 .to_owned();
185
186 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
187 cmd!(
188 restored,
189 "restore",
190 "--invite-code",
191 invite_code,
192 "--mnemonic",
193 mnemonic
194 )
195 .run()
196 .await?;
197
198 Ok(restored)
199 }
200
201 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
204 let new = Client::create(name).await?;
205
206 cmd!(
207 "cp",
208 "-R",
209 self.client_dir().join("client.db").display(),
210 new.client_dir().display()
211 )
212 .run()
213 .await?;
214
215 Ok(new)
216 }
217
218 pub async fn balance(&self) -> Result<u64> {
219 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
220 .as_u64()
221 .unwrap())
222 }
223
224 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
225 let deposit = cmd!(self, "deposit-address").out_json().await?;
226 Ok((
227 deposit["address"].as_str().unwrap().to_string(),
228 deposit["operation_id"].as_str().unwrap().to_string(),
229 ))
230 }
231
232 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
233 cmd!(self, "await-deposit", operation_id).run().await
234 }
235
236 pub fn cmd(&self) -> Command {
237 cmd!(
238 crate::util::get_fedimint_cli_path(),
239 format!("--data-dir={}", self.client_dir().display())
240 )
241 }
242
243 pub fn get_name(&self) -> &str {
244 &self.name
245 }
246
247 pub async fn get_session_count(&self) -> Result<u64> {
249 cmd!(self, "dev", "session-count").out_json().await?["count"]
250 .as_u64()
251 .context("count field wasn't a number")
252 }
253
254 pub async fn wait_complete(&self) -> Result<()> {
256 cmd!(self, "dev", "wait-complete").run().await
257 }
258
259 pub async fn wait_session(&self) -> anyhow::Result<()> {
261 info!("Waiting for a new session");
262 let session_count = self.get_session_count().await?;
263 self.wait_session_outcome(session_count).await?;
264 Ok(())
265 }
266
267 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
269 let timeout = {
270 let current_session_count = self.get_session_count().await?;
271 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
272 let session_duration_seconds = 180;
273 Duration::from_secs(sessions_to_wait * session_duration_seconds)
274 };
275
276 let start = Instant::now();
277 poll_with_timeout("Waiting for a new session", timeout, || async {
278 info!("Awaiting session outcome {session_count}");
279 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
280 .run()
281 .await
282 {
283 Err(e) => Err(ControlFlow::Continue(e)),
284 Ok(()) => Ok(()),
285 }
286 })
287 .await?;
288
289 let session_found_in = start.elapsed();
290 info!("session found in {session_found_in:?}");
291 Ok(())
292 }
293}
294
295impl Federation {
296 pub async fn new(
297 process_mgr: &ProcessManager,
298 bitcoind: Bitcoind,
299 skip_setup: bool,
300 pre_dkg: bool,
301 fed_index: usize,
303 federation_name: String,
304 ) -> Result<Self> {
305 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
306 let mut members = BTreeMap::new();
307 let mut peer_to_env_vars_map = BTreeMap::new();
308
309 let peers: Vec<_> = num_peers.peer_ids().collect();
310 let params: HashMap<PeerId, ConfigGenParams> =
311 local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT)?;
312
313 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
314 let mut endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
315 for peer_id in num_peers.peer_ids() {
316 let peer_env_vars = vars::Fedimintd::init(
317 &process_mgr.globals,
318 federation_name.clone(),
319 peer_id,
320 process_mgr
321 .globals
322 .fedimintd_overrides
323 .peer_expect(fed_index, peer_id),
324 )
325 .await?;
326 members.insert(
327 peer_id.to_usize(),
328 Fedimintd::new(
329 process_mgr,
330 bitcoind.clone(),
331 peer_id.to_usize(),
332 &peer_env_vars,
333 federation_name.clone(),
334 )
335 .await?,
336 );
337 let admin_client = DynGlobalApi::from_setup_endpoint(
338 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
339 &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
340 false,
342 false,
343 )
344 .await?;
345 endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
346 admin_clients.insert(peer_id, admin_client);
347 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
348 }
349
350 if !skip_setup && !pre_dkg {
351 let (original_fedimint_cli_path, original_fm_mint_client) =
354 crate::util::use_matching_fedimint_cli_for_dkg().await?;
355
356 let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
357
358 if fedimint_cli_version >= *VERSION_0_7_0_ALPHA {
359 run_cli_dkg_v2(params, endpoints).await?;
360 } else {
361 run_cli_dkg(params, endpoints).await?;
362 }
363
364 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
366
367 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
369 let invite_code_filename_original = "invite-code";
370
371 for peer_env_vars in peer_to_env_vars_map.values() {
372 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
373
374 let invite_code = poll_simple("awaiting-invite-code", || async {
375 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
376 tokio::fs::read_to_string(&path)
377 .await
378 .with_context(|| format!("Awaiting invite code file: {path}"))
379 })
380 .await
381 .context("Awaiting invite code file")?;
382
383 Connector::default()
384 .download_from_invite_code(&InviteCode::from_str(&invite_code)?, false, false)
385 .await?;
386 }
387
388 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
390
391 tokio::fs::copy(
392 format!("{peer_data_dir}/{invite_code_filename_original}"),
393 format!("{client_dir}/{invite_code_filename_original}"),
394 )
395 .await
396 .context("copying invite-code file")?;
397
398 for (index, peer_env_vars) in &peer_to_env_vars_map {
401 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
402
403 let invite_code_filename_indexed =
404 format!("{invite_code_filename_original}-{index}");
405 tokio::fs::rename(
406 format!("{peer_data_dir}/{invite_code_filename_original}"),
407 format!("{client_dir}/{invite_code_filename_indexed}"),
408 )
409 .await
410 .context("moving invite-code file")?;
411 }
412
413 debug!("Moved invite-code files to client data directory");
414 }
415
416 let client = JitTryAnyhow::new_try({
417 move || async move {
418 let client = Client::open_or_create(federation_name.as_str())?;
419 let invite_code = Self::invite_code_static()?;
420 if !skip_setup && !pre_dkg {
421 cmd!(client, "join-federation", invite_code).run().await?;
422 }
423 Ok(client)
424 }
425 });
426
427 Ok(Self {
428 members,
429 vars: peer_to_env_vars_map,
430 bitcoind,
431 client,
432 })
433 }
434
435 pub fn client_config(&self) -> Result<ClientConfig> {
436 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
437 load_from_file(&cfg_path)
438 }
439
440 pub fn module_client_config<M: ClientModule>(
441 &self,
442 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
443 self.client_config()?
444 .modules
445 .iter()
446 .find_map(|(module_instance_id, module_cfg)| {
447 if module_cfg.kind == M::kind() {
448 let decoders = ModuleDecoderRegistry::new(vec![(
449 *module_instance_id,
450 M::kind(),
451 M::decoder(),
452 )]);
453 Some(
454 module_cfg
455 .config
456 .clone()
457 .redecode_raw(&decoders)
458 .expect("Decoding client cfg failed")
459 .expect_decoded_ref()
460 .as_any()
461 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
462 .cloned()
463 .context("Cast to module config failed"),
464 )
465 } else {
466 None
467 }
468 })
469 .transpose()
470 }
471
472 pub fn deposit_fees(&self) -> Result<Amount> {
473 Ok(self
474 .module_client_config::<WalletClientModule>()?
475 .context("No wallet module found")?
476 .fee_consensus
477 .peg_in_abs)
478 }
479
480 pub fn invite_code(&self) -> Result<String> {
482 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
483 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
484 Ok(invite_code)
485 }
486
487 pub fn invite_code_static() -> Result<String> {
488 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
489 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
490 Ok(invite_code)
491 }
492 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
493 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
494 let name = format!("invite-code-{peer_id}");
495 let invite_code = fs::read_to_string(data_dir.join(name))?;
496 Ok(invite_code)
497 }
498
499 pub async fn internal_client(&self) -> Result<&Client> {
503 self.client
504 .get_try()
505 .await
506 .context("Internal client joining Federation")
507 }
508
509 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
511 let client = Client::create(name).await?;
512 client.join_federation(self.invite_code()?).await?;
513 Ok(client)
514 }
515
516 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
517 if self.members.contains_key(&peer) {
518 bail!("fedimintd-{peer} already running");
519 }
520 self.members.insert(
521 peer,
522 Fedimintd::new(
523 process_mgr,
524 self.bitcoind.clone(),
525 peer,
526 &self.vars[&peer],
527 "default".to_string(),
528 )
529 .await?,
530 );
531 Ok(())
532 }
533
534 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
535 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
536 bail!("fedimintd-{peer_id} does not exist");
537 };
538 fedimintd.terminate().await?;
539 Ok(())
540 }
541
542 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
543 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
544 bail!("fedimintd-{peer_id} does not exist");
545 };
546 fedimintd.await_terminated().await?;
547 self.members.remove(&peer_id);
548 Ok(())
549 }
550
551 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
553 info!("starting all servers");
554 let fed_size = process_mgr.globals.FM_FED_SIZE;
555 for peer_id in 0..fed_size {
556 if self.members.contains_key(&peer_id) {
557 continue;
558 }
559 self.start_server(process_mgr, peer_id).await?;
560 }
561 self.await_all_peers().await?;
562 Ok(())
563 }
564
565 pub async fn terminate_all_servers(&mut self) -> Result<()> {
567 info!("terminating all servers");
568 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
569 for peer_id in running_peer_ids {
570 self.terminate_server(peer_id).await?;
571 }
572 Ok(())
573 }
574
575 pub async fn restart_all_staggered_with_bin(
580 &mut self,
581 process_mgr: &ProcessManager,
582 bin_path: &PathBuf,
583 ) -> Result<()> {
584 let fed_size = process_mgr.globals.FM_FED_SIZE;
585
586 self.start_all_servers(process_mgr).await?;
588
589 while self.num_members() > 0 {
591 self.terminate_server(self.num_members() - 1).await?;
592 if self.num_members() > 0 {
593 fedimint_core::task::sleep_in_test(
594 "waiting to shutdown remaining peers",
595 Duration::from_secs(10),
596 )
597 .await;
598 }
599 }
600
601 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
603
604 for peer_id in 0..fed_size {
606 self.start_server(process_mgr, peer_id).await?;
607 if peer_id < fed_size - 1 {
608 fedimint_core::task::sleep_in_test(
609 "waiting to restart remaining peers",
610 Duration::from_secs(10),
611 )
612 .await;
613 }
614 }
615
616 self.await_all_peers().await?;
617
618 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
619 info!("upgraded fedimintd to version: {}", fedimintd_version);
620 Ok(())
621 }
622
623 pub async fn restart_all_with_bin(
624 &mut self,
625 process_mgr: &ProcessManager,
626 bin_path: &PathBuf,
627 ) -> Result<()> {
628 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
630 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
632 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
634
635 self.restart_all_staggered_with_bin(process_mgr, bin_path)
636 .await
637 }
638
639 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
640 let fed_size = process_mgr.globals.FM_FED_SIZE;
641 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
642 anyhow::ensure!(
643 fed_size > 3 * offline_nodes,
644 "too many offline nodes ({offline_nodes}) to reach consensus"
645 );
646
647 while self.num_members() > fed_size - offline_nodes {
648 self.terminate_server(self.num_members() - 1).await?;
649 }
650
651 if offline_nodes > 0 {
652 info!(fed_size, offline_nodes, "federation is degraded");
653 }
654 Ok(())
655 }
656
657 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
658 let deposit_fees_msat = self.deposit_fees()?.msats;
659 assert_eq!(
660 deposit_fees_msat % 1000,
661 0,
662 "Deposit fees expected to be whole sats in test suite"
663 );
664 let deposit_fees = deposit_fees_msat / 1000;
665 info!(amount, deposit_fees, "Pegging-in client funds");
666
667 let (address, operation_id) = client.get_deposit_addr().await?;
668
669 self.bitcoind
670 .send_to(address, amount + deposit_fees)
671 .await?;
672 self.bitcoind.mine_blocks(21).await?;
673
674 Ok(operation_id)
675 }
676
677 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
678 let operation_id = self.pegin_client_no_wait(amount, client).await?;
679
680 client.await_deposit(&operation_id).await?;
681 Ok(())
682 }
683
684 pub async fn pegin_gateways(
687 &self,
688 amount: u64,
689 gateways: Vec<&super::gatewayd::Gatewayd>,
690 ) -> Result<()> {
691 let deposit_fees_msat = self.deposit_fees()?.msats;
692 assert_eq!(
693 deposit_fees_msat % 1000,
694 0,
695 "Deposit fees expected to be whole sats in test suite"
696 );
697 let deposit_fees = deposit_fees_msat / 1000;
698 info!(amount, deposit_fees, "Pegging-in gateway funds");
699 let fed_id = self.calculate_federation_id();
700 for gw in gateways.clone() {
701 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
702 self.bitcoind
703 .send_to(pegin_addr, amount + deposit_fees)
704 .await?;
705 }
706
707 self.bitcoind.mine_blocks(21).await?;
708 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
709 try_join_all(gateways.into_iter().map(|gw| {
710 poll("gateway pegin", || async {
711 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
712 let block_height: u64 = gw_info["block_height"]
713 .as_u64()
714 .expect("Could not parse block height");
715 if bitcoind_block_height != block_height {
716 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
717 "gateway block height is not synced"
718 )));
719 }
720
721 let gateway_balance = gw
722 .ecash_balance(fed_id.clone())
723 .await
724 .map_err(ControlFlow::Continue)?;
725 poll_almost_equal!(gateway_balance, amount * 1000)
726 })
727 }))
728 .await?;
729
730 Ok(())
731 }
732
733 pub async fn pegout_gateways(
736 &self,
737 amount: u64,
738 gateways: Vec<&super::gatewayd::Gatewayd>,
739 ) -> Result<()> {
740 info!(amount, "Pegging-out gateway funds");
741 let fed_id = self.calculate_federation_id();
742 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
743 for gw in gateways.clone() {
744 let prev_fed_ecash_balance = gw
745 .get_balances()
746 .await?
747 .ecash_balances
748 .into_iter()
749 .find(|fed| fed.federation_id.to_string() == fed_id)
750 .expect("Gateway has not joined federation")
751 .ecash_balance_msats;
752
753 let pegout_address = self.bitcoind.get_new_address().await?;
754 let value = cmd!(
755 gw,
756 "ecash",
757 "pegout",
758 "--federation-id",
759 fed_id,
760 "--amount",
761 amount,
762 "--address",
763 pegout_address
764 )
765 .out_json()
766 .await?;
767 let response: WithdrawResponse = serde_json::from_value(value)?;
768 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
769 }
770 self.bitcoind.mine_blocks(21).await?;
771
772 try_join_all(
773 peg_outs
774 .values()
775 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
776 )
777 .await?;
778
779 for gw in gateways.clone() {
780 let after_fed_ecash_balance = gw
781 .get_balances()
782 .await?
783 .ecash_balances
784 .into_iter()
785 .find(|fed| fed.federation_id.to_string() == fed_id)
786 .expect("Gateway has not joined federation")
787 .ecash_balance_msats;
788
789 let ln_type = gw.ln.ln_type();
790 let prev_balance = peg_outs
791 .get(&ln_type)
792 .expect("peg out does not exist")
793 .0
794 .msats;
795 let fees = peg_outs
796 .get(&ln_type)
797 .expect("peg out does not exist")
798 .1
799 .fees;
800 let total_fee = fees.amount().to_sat() * 1000;
801 crate::util::almost_equal(
802 after_fed_ecash_balance.msats,
803 prev_balance - amount - total_fee,
804 2000,
805 )
806 .map_err(|e| {
807 anyhow::anyhow!(
808 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
809 e
810 )
811 })?;
812 }
813
814 Ok(())
815 }
816
817 pub fn calculate_federation_id(&self) -> String {
818 self.client_config()
819 .unwrap()
820 .global
821 .calculate_federation_id()
822 .to_string()
823 }
824
825 pub async fn await_block_sync(&self) -> Result<u64> {
826 let finality_delay = self.get_finality_delay()?;
827 let block_count = self.bitcoind.get_block_count().await?;
828 let expected = block_count.saturating_sub(finality_delay.into());
829 cmd!(
830 self.internal_client().await?,
831 "dev",
832 "wait-block-count",
833 expected
834 )
835 .run()
836 .await?;
837 Ok(expected)
838 }
839
840 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
841 let client_config = &self.client_config()?;
842 let wallet_cfg = client_config
843 .modules
844 .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
845 .context("wallet module not found")?
846 .clone()
847 .redecode_raw(&ModuleDecoderRegistry::new([(
848 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
849 fedimint_wallet_client::KIND,
850 fedimint_wallet_client::WalletModuleTypes::decoder(),
851 )]))?;
852 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
853
854 let finality_delay = wallet_cfg.finality_delay;
855 Ok(finality_delay)
856 }
857
858 pub async fn await_gateways_registered(&self) -> Result<()> {
859 let start_time = Instant::now();
860 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
861
862 poll("gateways registered", || async {
863 let num_gateways = cmd!(
864 self.internal_client()
865 .await
866 .map_err(ControlFlow::Continue)?,
867 "list-gateways"
868 )
869 .out_json()
870 .await
871 .map_err(ControlFlow::Continue)?
872 .as_array()
873 .context("invalid output")
874 .map_err(ControlFlow::Break)?
875 .len();
876 poll_eq!(num_gateways, 1)
877 })
878 .await?;
879 debug!(target: LOG_DEVIMINT,
880 elapsed_ms = %start_time.elapsed().as_millis(),
881 "Gateways registered");
882 Ok(())
883 }
884
885 pub async fn await_all_peers(&self) -> Result<()> {
886 poll("Waiting for all peers to be online", || async {
887 cmd!(
888 self.internal_client()
889 .await
890 .map_err(ControlFlow::Continue)?,
891 "dev",
892 "api",
893 "--module",
894 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
895 "block_count"
896 )
897 .run()
898 .await
899 .map_err(ControlFlow::Continue)?;
900 Ok(())
901 })
902 .await
903 }
904
905 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
906 poll("Waiting for all peers to be online", || async {
907 cmd!(
908 self.internal_client()
909 .await
910 .map_err(ControlFlow::Continue)?,
911 "dev",
912 "api",
913 "--peer-id",
914 peer_id,
915 "--module",
916 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
917 "block_count"
918 )
919 .run()
920 .await
921 .map_err(ControlFlow::Continue)?;
922 Ok(())
923 })
924 .await
925 }
926
927 pub async fn finalize_mempool_tx(&self) -> Result<()> {
937 let finality_delay = self.get_finality_delay()?;
938 let blocks_to_mine = finality_delay + 1;
939 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
940 self.await_block_sync().await?;
941 Ok(())
942 }
943
944 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
945 self.bitcoind.mine_blocks(blocks).await?;
946 self.await_block_sync().await?;
947 Ok(())
948 }
949
950 pub fn num_members(&self) -> usize {
951 self.members.len()
952 }
953
954 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
955 self.members
956 .keys()
957 .map(|&peer_id| PeerId::from(peer_id as u16))
958 }
959}
960
961#[derive(Clone)]
962pub struct Fedimintd {
963 _bitcoind: Bitcoind,
964 process: ProcessHandle,
965}
966
967impl Fedimintd {
968 pub async fn new(
969 process_mgr: &ProcessManager,
970 bitcoind: Bitcoind,
971 peer_id: usize,
972 env: &vars::Fedimintd,
973 fed_name: String,
974 ) -> Result<Self> {
975 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
976 let process = process_mgr
977 .spawn_daemon(
978 &format!("fedimintd-{fed_name}-{peer_id}"),
979 cmd!(FedimintdCmd).envs(env.vars()),
980 )
981 .await?;
982
983 Ok(Self {
984 _bitcoind: bitcoind,
985 process,
986 })
987 }
988
989 pub async fn terminate(self) -> Result<()> {
990 self.process.terminate().await
991 }
992
993 pub async fn await_terminated(&self) -> Result<()> {
994 self.process.await_terminated().await
995 }
996}
997
998pub async fn run_cli_dkg(
999 params: HashMap<PeerId, ConfigGenParams>,
1000 endpoints: BTreeMap<PeerId, String>,
1001) -> Result<()> {
1002 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1003
1004 debug!(target: LOG_DEVIMINT, "Running DKG");
1005 for endpoint in endpoints.values() {
1006 poll("trying-to-connect-to-peers", || async {
1007 crate::util::FedimintCli
1008 .ws_status(endpoint)
1009 .await
1010 .context("dkg status")
1011 .map_err(ControlFlow::Continue)
1012 })
1013 .await?;
1014 }
1015
1016 debug!(target: LOG_DEVIMINT, "Connected to all peers");
1017
1018 for (peer_id, endpoint) in &endpoints {
1019 let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1020 assert_eq!(
1021 status.server,
1022 ServerStatusLegacy::AwaitingPassword,
1023 "peer_id isn't waiting for password: {peer_id}"
1024 );
1025 }
1026
1027 debug!(target: LOG_DEVIMINT, "Setting passwords");
1028 for (peer_id, endpoint) in &endpoints {
1029 crate::util::FedimintCli
1030 .set_password(auth_for(peer_id), endpoint)
1031 .await?;
1032 }
1033 let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1034 let followers = endpoints
1035 .iter()
1036 .filter(|(id, _)| *id != leader_id)
1037 .collect::<BTreeMap<_, _>>();
1038
1039 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1040 let leader_name = "leader".to_string();
1041 crate::util::FedimintCli
1042 .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1043 .await?;
1044
1045 let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1046
1047 debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1048 cli_set_config_gen_params(
1049 leader_endpoint,
1050 auth_for(leader_id),
1051 server_gen_params.clone(),
1052 )
1053 .await?;
1054
1055 let followers_names = followers
1056 .keys()
1057 .map(|peer_id| {
1058 (*peer_id, {
1059 let random_string = rand::thread_rng()
1061 .sample_iter(&rand::distributions::Alphanumeric)
1062 .take(5)
1063 .map(char::from)
1064 .collect::<String>();
1065 format!("random-{random_string}{peer_id}")
1066 })
1067 })
1068 .collect::<BTreeMap<_, _>>();
1069 for (peer_id, endpoint) in &followers {
1070 let name = followers_names
1071 .get(peer_id)
1072 .context("missing follower name")?;
1073 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1074
1075 crate::util::FedimintCli
1076 .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1077 .await?;
1078
1079 cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1080 }
1081
1082 debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1083 let peers = crate::util::FedimintCli
1084 .get_config_gen_peers(leader_endpoint)
1085 .await?;
1086
1087 let found_names = peers
1088 .into_iter()
1089 .map(|peer| peer.name)
1090 .collect::<HashSet<_>>();
1091 let all_names = followers_names
1092 .values()
1093 .cloned()
1094 .chain(iter::once(leader_name))
1095 .collect::<HashSet<_>>();
1096 assert_eq!(found_names, all_names);
1097
1098 debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1099 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1100
1101 debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1102 let mut configs = vec![];
1103 for endpoint in endpoints.values() {
1104 let config = crate::util::FedimintCli
1105 .consensus_config_gen_params_legacy(endpoint)
1106 .await?;
1107 configs.push(config);
1108 }
1109 let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1111 consensus.dedup();
1112 assert_eq!(consensus.len(), 1);
1113 let ids = configs
1115 .iter()
1116 .map(|p| p.our_current_id)
1117 .collect::<HashSet<_>>();
1118 assert_eq!(ids.len(), endpoints.len());
1119 let dkg_results = endpoints
1120 .iter()
1121 .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1122 debug!(target: LOG_DEVIMINT, "Running DKG");
1123 let (dkg_results, leader_wait_result) = tokio::join!(
1124 join_all(dkg_results),
1125 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1126 );
1127 for result in dkg_results {
1128 result?;
1129 }
1130 leader_wait_result?;
1131
1132 debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1134 let mut hashes = HashSet::new();
1135 for (peer_id, endpoint) in &endpoints {
1136 cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1137 let hash = crate::util::FedimintCli
1138 .get_verify_config_hash(auth_for(peer_id), endpoint)
1139 .await?;
1140 hashes.insert(hash);
1141 }
1142 assert_eq!(hashes.len(), 1);
1143 for (peer_id, endpoint) in &endpoints {
1144 let result = crate::util::FedimintCli
1145 .start_consensus(auth_for(peer_id), endpoint)
1146 .await;
1147 if let Err(e) = result {
1148 tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1149 }
1150 cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1151 }
1152 Ok(())
1153}
1154
1155pub async fn run_cli_dkg_v2(
1156 params: HashMap<PeerId, ConfigGenParams>,
1157 endpoints: BTreeMap<PeerId, String>,
1158) -> Result<()> {
1159 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1160
1161 for (peer, endpoint) in &endpoints {
1162 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1163 crate::util::FedimintCli
1164 .setup_status(auth_for(peer), endpoint)
1165 .await
1166 .map_err(ControlFlow::Continue)
1167 })
1168 .await
1169 .unwrap();
1170
1171 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1172 }
1173
1174 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1175
1176 let mut connection_info = BTreeMap::new();
1177
1178 for (peer, endpoint) in &endpoints {
1179 let info = if peer.to_usize() == 0 {
1180 crate::util::FedimintCli
1181 .set_local_params_leader(peer, auth_for(peer), endpoint)
1182 .await?
1183 } else {
1184 crate::util::FedimintCli
1185 .set_local_params_follower(peer, auth_for(peer), endpoint)
1186 .await?
1187 };
1188
1189 connection_info.insert(peer, info);
1190 }
1191
1192 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1193
1194 for (peer, info) in connection_info {
1195 for (p, endpoint) in &endpoints {
1196 if p != peer {
1197 crate::util::FedimintCli
1198 .add_peer(&info, auth_for(p), endpoint)
1199 .await?;
1200 }
1201 }
1202 }
1203
1204 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1205
1206 for (peer, endpoint) in &endpoints {
1207 crate::util::FedimintCli
1208 .start_dkg(auth_for(peer), endpoint)
1209 .await?;
1210 }
1211
1212 Ok(())
1213}
1214
1215async fn cli_set_config_gen_params(
1216 endpoint: &str,
1217 auth: &ApiAuth,
1218 mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1219) -> Result<()> {
1220 self::config::attach_default_module_init_params(
1221 &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1222 &mut server_gen_params,
1223 Network::Regtest,
1224 10,
1225 );
1226
1227 let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1228
1229 crate::util::FedimintCli
1230 .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1231 .await?;
1232
1233 Ok(())
1234}
1235
1236async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1237 poll(
1238 &format!("waiting-server-status: {expected_status:?}"),
1239 || async {
1240 let server_status = crate::util::FedimintCli
1241 .ws_status(endpoint)
1242 .await
1243 .context("server status")
1244 .map_err(ControlFlow::Continue)?
1245 .server;
1246 if server_status == expected_status {
1247 Ok(())
1248 } else {
1249 Err(ControlFlow::Continue(anyhow!(
1250 "expected status: {expected_status:?} current status: {server_status:?}"
1251 )))
1252 }
1253 },
1254 )
1255 .await?;
1256 Ok(())
1257}