1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::DynGlobalApi;
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_client_module::module::ClientModule;
15use fedimint_connectors::ConnectorRegistry;
16use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
17use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
18use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
19use fedimint_core::envs::BitcoinRpcConfig;
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::ModuleDecoderRegistry;
22use fedimint_core::module::{ApiAuth, ModuleCommon};
23use fedimint_core::runtime::block_in_place;
24use fedimint_core::task::block_on;
25use fedimint_core::task::jit::JitTryAnyhow;
26use fedimint_core::util::SafeUrl;
27use fedimint_core::{Amount, NumPeers, PeerId};
28use fedimint_gateway_common::WithdrawResponse;
29use fedimint_logging::LOG_DEVIMINT;
30use fedimint_server::config::ConfigGenParams;
31use fedimint_testing_core::config::local_config_gen_params;
32use fedimint_testing_core::node_type::LightningNodeType;
33use fedimint_wallet_client::WalletClientModule;
34use fedimint_wallet_client::config::WalletClientConfig;
35use fs_lock::FileLock;
36use futures::future::{join_all, try_join_all};
37use rand::Rng;
38use tokio::task::{JoinSet, spawn_blocking};
39use tokio::time::Instant;
40use tracing::{debug, info};
41
42use super::external::Bitcoind;
43use super::util::{Command, ProcessHandle, ProcessManager, cmd};
44use super::vars::utf8;
45use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
46use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
47use crate::version_constants::VERSION_0_10_0_ALPHA;
48use crate::{poll_almost_equal, poll_eq, vars};
49
50pub const PORTS_PER_FEDIMINTD: u16 = 4;
53pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
55pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
57pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
59pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
61
62#[derive(Clone)]
63pub struct Federation {
64 pub members: BTreeMap<usize, Fedimintd>,
66 pub vars: BTreeMap<usize, vars::Fedimintd>,
67 pub bitcoind: Bitcoind,
68
69 client: JitTryAnyhow<Client>,
71 #[allow(dead_code)] connectors: ConnectorRegistry,
73}
74
75impl Drop for Federation {
76 fn drop(&mut self) {
77 block_in_place(|| {
78 block_on(async {
79 let mut set = JoinSet::new();
80
81 while let Some((_id, fedimintd)) = self.members.pop_first() {
82 set.spawn(async { drop(fedimintd) });
83 }
84 while (set.join_next().await).is_some() {}
85 });
86 });
87 }
88}
89#[derive(Clone)]
91pub struct Client {
92 name: String,
93}
94
95impl Client {
96 fn clients_dir() -> PathBuf {
97 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
98 .expect("FM_DATA_DIR_ENV not set")
99 .parse()
100 .expect("FM_DATA_DIR_ENV invalid");
101 data_dir.join("clients")
102 }
103
104 fn client_dir(&self) -> PathBuf {
105 Self::clients_dir().join(&self.name)
106 }
107
108 pub fn client_name_lock(name: &str) -> Result<FileLock> {
109 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
110 let file_lock = std::fs::OpenOptions::new()
111 .write(true)
112 .create(true)
113 .truncate(true)
114 .open(&lock_path)
115 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
116
117 fs_lock::FileLock::new_exclusive(file_lock)
118 .with_context(|| format!("Failed to lock {}", lock_path.display()))
119 }
120
121 pub async fn create(name: impl ToString) -> Result<Client> {
123 let name = name.to_string();
124 spawn_blocking(move || {
125 let _lock = Self::client_name_lock(&name);
126 for i in 0u64.. {
127 let client = Self {
128 name: format!("{name}-{i}"),
129 };
130
131 if !client.client_dir().exists() {
132 std::fs::create_dir_all(client.client_dir())?;
133 return Ok(client);
134 }
135 }
136 unreachable!()
137 })
138 .await?
139 }
140
141 pub fn open_or_create(name: &str) -> Result<Client> {
143 block_in_place(|| {
144 let _lock = Self::client_name_lock(name);
145 let client = Self {
146 name: format!("{name}-0"),
147 };
148 if !client.client_dir().exists() {
149 std::fs::create_dir_all(client.client_dir())?;
150 }
151 Ok(client)
152 })
153 }
154
155 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
157 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
158 cmd!(self, "join-federation", invite_code).run().await?;
159
160 Ok(())
161 }
162
163 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
165 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
166 cmd!(
167 self,
168 "restore",
169 "--invite-code",
170 invite_code,
171 "--mnemonic",
172 mnemonic
173 )
174 .run()
175 .await?;
176
177 Ok(())
178 }
179
180 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
182 let restored = Self::open_or_create(name)?;
183
184 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
185 .as_str()
186 .unwrap()
187 .to_owned();
188
189 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
190 cmd!(
191 restored,
192 "restore",
193 "--invite-code",
194 invite_code,
195 "--mnemonic",
196 mnemonic
197 )
198 .run()
199 .await?;
200
201 Ok(restored)
202 }
203
204 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
207 let new = Client::create(name).await?;
208
209 cmd!(
210 "cp",
211 "-R",
212 self.client_dir().join("client.db").display(),
213 new.client_dir().display()
214 )
215 .run()
216 .await?;
217
218 Ok(new)
219 }
220
221 pub async fn balance(&self) -> Result<u64> {
222 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
223 .as_u64()
224 .unwrap())
225 }
226
227 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
228 let deposit = cmd!(self, "deposit-address").out_json().await?;
229 Ok((
230 deposit["address"].as_str().unwrap().to_string(),
231 deposit["operation_id"].as_str().unwrap().to_string(),
232 ))
233 }
234
235 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
236 cmd!(self, "await-deposit", operation_id).run().await
237 }
238
239 pub fn cmd(&self) -> Command {
240 cmd!(
241 crate::util::get_fedimint_cli_path(),
242 format!("--data-dir={}", self.client_dir().display())
243 )
244 }
245
246 pub fn get_name(&self) -> &str {
247 &self.name
248 }
249
250 pub async fn get_session_count(&self) -> Result<u64> {
252 cmd!(self, "dev", "session-count").out_json().await?["count"]
253 .as_u64()
254 .context("count field wasn't a number")
255 }
256
257 pub async fn wait_complete(&self) -> Result<()> {
259 cmd!(self, "dev", "wait-complete").run().await
260 }
261
262 pub async fn wait_session(&self) -> anyhow::Result<()> {
264 info!("Waiting for a new session");
265 let session_count = self.get_session_count().await?;
266 self.wait_session_outcome(session_count).await?;
267 Ok(())
268 }
269
270 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
272 let timeout = {
273 let current_session_count = self.get_session_count().await?;
274 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
275 let session_duration_seconds = 180;
276 Duration::from_secs(sessions_to_wait * session_duration_seconds)
277 };
278
279 let start = Instant::now();
280 poll_with_timeout("Waiting for a new session", timeout, || async {
281 info!("Awaiting session outcome {session_count}");
282 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
283 .run()
284 .await
285 {
286 Err(e) => Err(ControlFlow::Continue(e)),
287 Ok(()) => Ok(()),
288 }
289 })
290 .await?;
291
292 let session_found_in = start.elapsed();
293 info!("session found in {session_found_in:?}");
294 Ok(())
295 }
296}
297
298impl Federation {
299 pub async fn new(
300 process_mgr: &ProcessManager,
301 bitcoind: Bitcoind,
302 skip_setup: bool,
303 pre_dkg: bool,
304 fed_index: usize,
306 federation_name: String,
307 ) -> Result<Self> {
308 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
309 let mut members = BTreeMap::new();
310 let mut peer_to_env_vars_map = BTreeMap::new();
311
312 let peers: Vec<_> = num_peers.peer_ids().collect();
313 let params: HashMap<PeerId, ConfigGenParams> =
314 local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
315
316 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
317 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
318
319 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
320 for peer_id in num_peers.peer_ids() {
321 let peer_env_vars = vars::Fedimintd::init(
322 &process_mgr.globals,
323 federation_name.clone(),
324 peer_id,
325 process_mgr
326 .globals
327 .fedimintd_overrides
328 .peer_expect(fed_index, peer_id),
329 )
330 .await?;
331 members.insert(
332 peer_id.to_usize(),
333 Fedimintd::new(
334 process_mgr,
335 bitcoind.clone(),
336 peer_id.to_usize(),
337 &peer_env_vars,
338 federation_name.clone(),
339 )
340 .await?,
341 );
342 let admin_client = DynGlobalApi::new_admin_setup(
343 connectors.clone(),
344 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
345 )?;
348 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
349 admin_clients.insert(peer_id, admin_client);
350 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
351 }
352
353 if !skip_setup && !pre_dkg {
354 let (original_fedimint_cli_path, original_fm_mint_client) =
357 crate::util::use_matching_fedimint_cli_for_dkg().await?;
358
359 run_cli_dkg_v2(params, api_endpoints).await?;
360
361 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
363
364 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
366 let invite_code_filename_original = "invite-code";
367
368 for peer_env_vars in peer_to_env_vars_map.values() {
369 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
370
371 let invite_code = poll_simple("awaiting-invite-code", || async {
372 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
373 tokio::fs::read_to_string(&path)
374 .await
375 .with_context(|| format!("Awaiting invite code file: {path}"))
376 })
377 .await
378 .context("Awaiting invite code file")?;
379
380 ConnectorType::default()
381 .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
382 .await?;
383 }
384
385 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
387
388 tokio::fs::copy(
389 format!("{peer_data_dir}/{invite_code_filename_original}"),
390 format!("{client_dir}/{invite_code_filename_original}"),
391 )
392 .await
393 .context("copying invite-code file")?;
394
395 for (index, peer_env_vars) in &peer_to_env_vars_map {
398 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
399
400 let invite_code_filename_indexed =
401 format!("{invite_code_filename_original}-{index}");
402 tokio::fs::rename(
403 format!("{peer_data_dir}/{invite_code_filename_original}"),
404 format!("{client_dir}/{invite_code_filename_indexed}"),
405 )
406 .await
407 .context("moving invite-code file")?;
408 }
409
410 debug!("Moved invite-code files to client data directory");
411 }
412
413 let client = JitTryAnyhow::new_try({
414 move || async move {
415 let client = Client::open_or_create(federation_name.as_str())?;
416 let invite_code = Self::invite_code_static()?;
417 if !skip_setup && !pre_dkg {
418 cmd!(client, "join-federation", invite_code).run().await?;
419 }
420 Ok(client)
421 }
422 });
423
424 Ok(Self {
425 members,
426 vars: peer_to_env_vars_map,
427 bitcoind,
428 client,
429 connectors,
430 })
431 }
432
433 pub fn client_config(&self) -> Result<ClientConfig> {
434 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
435 load_from_file(&cfg_path)
436 }
437
438 pub fn module_client_config<M: ClientModule>(
439 &self,
440 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
441 self.client_config()?
442 .modules
443 .iter()
444 .find_map(|(module_instance_id, module_cfg)| {
445 if module_cfg.kind == M::kind() {
446 let decoders = ModuleDecoderRegistry::new(vec![(
447 *module_instance_id,
448 M::kind(),
449 M::decoder(),
450 )]);
451 Some(
452 module_cfg
453 .config
454 .clone()
455 .redecode_raw(&decoders)
456 .expect("Decoding client cfg failed")
457 .expect_decoded_ref()
458 .as_any()
459 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
460 .cloned()
461 .context("Cast to module config failed"),
462 )
463 } else {
464 None
465 }
466 })
467 .transpose()
468 }
469
470 pub fn deposit_fees(&self) -> Result<Amount> {
471 Ok(self
472 .module_client_config::<WalletClientModule>()?
473 .context("No wallet module found")?
474 .fee_consensus
475 .peg_in_abs)
476 }
477
478 pub fn invite_code(&self) -> Result<String> {
480 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
481 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
482 Ok(invite_code)
483 }
484
485 pub fn invite_code_static() -> Result<String> {
486 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
487 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
488 Ok(invite_code)
489 }
490 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
491 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
492 let name = format!("invite-code-{peer_id}");
493 let invite_code = fs::read_to_string(data_dir.join(name))?;
494 Ok(invite_code)
495 }
496
497 pub async fn internal_client(&self) -> Result<&Client> {
501 self.client
502 .get_try()
503 .await
504 .context("Internal client joining Federation")
505 }
506
507 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
509 let client = Client::create(name).await?;
510 client.join_federation(self.invite_code()?).await?;
511 Ok(client)
512 }
513
514 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
515 if self.members.contains_key(&peer) {
516 bail!("fedimintd-{peer} already running");
517 }
518 self.members.insert(
519 peer,
520 Fedimintd::new(
521 process_mgr,
522 self.bitcoind.clone(),
523 peer,
524 &self.vars[&peer],
525 "default".to_string(),
526 )
527 .await?,
528 );
529 Ok(())
530 }
531
532 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
533 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
534 bail!("fedimintd-{peer_id} does not exist");
535 };
536 fedimintd.terminate().await?;
537 Ok(())
538 }
539
540 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
541 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
542 bail!("fedimintd-{peer_id} does not exist");
543 };
544 fedimintd.await_terminated().await?;
545 self.members.remove(&peer_id);
546 Ok(())
547 }
548
549 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
551 info!("starting all servers");
552 let fed_size = process_mgr.globals.FM_FED_SIZE;
553 for peer_id in 0..fed_size {
554 if self.members.contains_key(&peer_id) {
555 continue;
556 }
557 self.start_server(process_mgr, peer_id).await?;
558 }
559 self.await_all_peers().await?;
560 Ok(())
561 }
562
563 pub async fn terminate_all_servers(&mut self) -> Result<()> {
565 info!("terminating all servers");
566 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
567 for peer_id in running_peer_ids {
568 self.terminate_server(peer_id).await?;
569 }
570 Ok(())
571 }
572
573 pub async fn restart_all_staggered_with_bin(
578 &mut self,
579 process_mgr: &ProcessManager,
580 bin_path: &PathBuf,
581 ) -> Result<()> {
582 let fed_size = process_mgr.globals.FM_FED_SIZE;
583
584 self.start_all_servers(process_mgr).await?;
586
587 while self.num_members() > 0 {
589 self.terminate_server(self.num_members() - 1).await?;
590 if self.num_members() > 0 {
591 fedimint_core::task::sleep_in_test(
592 "waiting to shutdown remaining peers",
593 Duration::from_secs(10),
594 )
595 .await;
596 }
597 }
598
599 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
601
602 for peer_id in 0..fed_size {
604 self.start_server(process_mgr, peer_id).await?;
605 if peer_id < fed_size - 1 {
606 fedimint_core::task::sleep_in_test(
607 "waiting to restart remaining peers",
608 Duration::from_secs(10),
609 )
610 .await;
611 }
612 }
613
614 self.await_all_peers().await?;
615
616 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
617 info!("upgraded fedimintd to version: {}", fedimintd_version);
618 Ok(())
619 }
620
621 pub async fn restart_all_with_bin(
622 &mut self,
623 process_mgr: &ProcessManager,
624 bin_path: &PathBuf,
625 ) -> Result<()> {
626 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
628 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
630 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
632
633 self.restart_all_staggered_with_bin(process_mgr, bin_path)
634 .await
635 }
636
637 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
638 let fed_size = process_mgr.globals.FM_FED_SIZE;
639 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
640 anyhow::ensure!(
641 fed_size > 3 * offline_nodes,
642 "too many offline nodes ({offline_nodes}) to reach consensus"
643 );
644
645 while self.num_members() > fed_size - offline_nodes {
646 self.terminate_server(self.num_members() - 1).await?;
647 }
648
649 if offline_nodes > 0 {
650 info!(fed_size, offline_nodes, "federation is degraded");
651 }
652 Ok(())
653 }
654
655 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
656 let deposit_fees_msat = self.deposit_fees()?.msats;
657 assert_eq!(
658 deposit_fees_msat % 1000,
659 0,
660 "Deposit fees expected to be whole sats in test suite"
661 );
662 let deposit_fees = deposit_fees_msat / 1000;
663 info!(amount, deposit_fees, "Pegging-in client funds");
664
665 let (address, operation_id) = client.get_deposit_addr().await?;
666
667 self.bitcoind
668 .send_to(address, amount + deposit_fees)
669 .await?;
670 self.bitcoind.mine_blocks(21).await?;
671
672 Ok(operation_id)
673 }
674
675 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
676 let operation_id = self.pegin_client_no_wait(amount, client).await?;
677
678 client.await_deposit(&operation_id).await?;
679 Ok(())
680 }
681
682 pub async fn pegin_gateways(
685 &self,
686 amount: u64,
687 gateways: Vec<&super::gatewayd::Gatewayd>,
688 ) -> Result<()> {
689 let deposit_fees_msat = self.deposit_fees()?.msats;
690 assert_eq!(
691 deposit_fees_msat % 1000,
692 0,
693 "Deposit fees expected to be whole sats in test suite"
694 );
695 let deposit_fees = deposit_fees_msat / 1000;
696 info!(amount, deposit_fees, "Pegging-in gateway funds");
697 let fed_id = self.calculate_federation_id();
698 for gw in gateways.clone() {
699 let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
700 self.bitcoind
701 .send_to(pegin_addr, amount + deposit_fees)
702 .await?;
703 }
704
705 self.bitcoind.mine_blocks(21).await?;
706 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
707 try_join_all(gateways.into_iter().map(|gw| {
708 poll("gateway pegin", || async {
709 let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
710
711 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
712 gw_info["block_height"]
713 .as_u64()
714 .expect("Could not parse block height")
715 } else {
716 gw_info["lightning_info"]["connected"]["block_height"]
717 .as_u64()
718 .expect("Could not parse block height")
719 };
720
721 if bitcoind_block_height != block_height {
722 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
723 "gateway block height is not synced"
724 )));
725 }
726
727 let gateway_balance = gw
728 .ecash_balance(fed_id.clone())
729 .await
730 .map_err(ControlFlow::Continue)?;
731 poll_almost_equal!(gateway_balance, amount * 1000)
732 })
733 }))
734 .await?;
735
736 Ok(())
737 }
738
739 pub async fn pegout_gateways(
742 &self,
743 amount: u64,
744 gateways: Vec<&super::gatewayd::Gatewayd>,
745 ) -> Result<()> {
746 info!(amount, "Pegging-out gateway funds");
747 let fed_id = self.calculate_federation_id();
748 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
749 for gw in gateways.clone() {
750 let prev_fed_ecash_balance = gw
751 .get_balances()
752 .await?
753 .ecash_balances
754 .into_iter()
755 .find(|fed| fed.federation_id.to_string() == fed_id)
756 .expect("Gateway has not joined federation")
757 .ecash_balance_msats;
758
759 let pegout_address = self.bitcoind.get_new_address().await?;
760 let value = cmd!(
761 gw,
762 "ecash",
763 "pegout",
764 "--federation-id",
765 fed_id,
766 "--amount",
767 amount,
768 "--address",
769 pegout_address
770 )
771 .out_json()
772 .await?;
773 let response: WithdrawResponse = serde_json::from_value(value)?;
774 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
775 }
776 self.bitcoind.mine_blocks(21).await?;
777
778 try_join_all(
779 peg_outs
780 .values()
781 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
782 )
783 .await?;
784
785 for gw in gateways.clone() {
786 let after_fed_ecash_balance = gw
787 .get_balances()
788 .await?
789 .ecash_balances
790 .into_iter()
791 .find(|fed| fed.federation_id.to_string() == fed_id)
792 .expect("Gateway has not joined federation")
793 .ecash_balance_msats;
794
795 let ln_type = gw.ln.ln_type();
796 let prev_balance = peg_outs
797 .get(&ln_type)
798 .expect("peg out does not exist")
799 .0
800 .msats;
801 let fees = peg_outs
802 .get(&ln_type)
803 .expect("peg out does not exist")
804 .1
805 .fees;
806 let total_fee = fees.amount().to_sat() * 1000;
807 crate::util::almost_equal(
808 after_fed_ecash_balance.msats,
809 prev_balance - amount - total_fee,
810 2000,
811 )
812 .map_err(|e| {
813 anyhow::anyhow!(
814 "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
815 e
816 )
817 })?;
818 }
819
820 Ok(())
821 }
822
823 pub fn calculate_federation_id(&self) -> String {
824 self.client_config()
825 .unwrap()
826 .global
827 .calculate_federation_id()
828 .to_string()
829 }
830
831 pub async fn await_block_sync(&self) -> Result<u64> {
832 let finality_delay = self.get_finality_delay()?;
833 let block_count = self.bitcoind.get_block_count().await?;
834 let expected = block_count.saturating_sub(finality_delay.into());
835 cmd!(
836 self.internal_client().await?,
837 "dev",
838 "wait-block-count",
839 expected
840 )
841 .run()
842 .await?;
843 Ok(expected)
844 }
845
846 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
847 let client_config = &self.client_config()?;
848 let wallet_cfg = client_config
849 .modules
850 .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
851 .context("wallet module not found")?
852 .clone()
853 .redecode_raw(&ModuleDecoderRegistry::new([(
854 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
855 fedimint_wallet_client::KIND,
856 fedimint_wallet_client::WalletModuleTypes::decoder(),
857 )]))?;
858 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
859
860 let finality_delay = wallet_cfg.finality_delay;
861 Ok(finality_delay)
862 }
863
864 pub async fn await_gateways_registered(&self) -> Result<()> {
865 let start_time = Instant::now();
866 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
867
868 poll("gateways registered", || async {
869 let num_gateways = cmd!(
870 self.internal_client()
871 .await
872 .map_err(ControlFlow::Continue)?,
873 "list-gateways"
874 )
875 .out_json()
876 .await
877 .map_err(ControlFlow::Continue)?
878 .as_array()
879 .context("invalid output")
880 .map_err(ControlFlow::Break)?
881 .len();
882 poll_eq!(num_gateways, 1)
883 })
884 .await?;
885 debug!(target: LOG_DEVIMINT,
886 elapsed_ms = %start_time.elapsed().as_millis(),
887 "Gateways registered");
888 Ok(())
889 }
890
891 pub async fn await_all_peers(&self) -> Result<()> {
892 poll("Waiting for all peers to be online", || async {
893 cmd!(
894 self.internal_client()
895 .await
896 .map_err(ControlFlow::Continue)?,
897 "dev",
898 "api",
899 "--module",
900 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
901 "block_count"
902 )
903 .run()
904 .await
905 .map_err(ControlFlow::Continue)?;
906 Ok(())
907 })
908 .await
909 }
910
911 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
912 poll("Waiting for all peers to be online", || async {
913 cmd!(
914 self.internal_client()
915 .await
916 .map_err(ControlFlow::Continue)?,
917 "dev",
918 "api",
919 "--peer-id",
920 peer_id,
921 "--module",
922 LEGACY_HARDCODED_INSTANCE_ID_WALLET,
923 "block_count"
924 )
925 .run()
926 .await
927 .map_err(ControlFlow::Continue)?;
928 Ok(())
929 })
930 .await
931 }
932
933 pub async fn finalize_mempool_tx(&self) -> Result<()> {
943 let finality_delay = self.get_finality_delay()?;
944 let blocks_to_mine = finality_delay + 1;
945 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
946 self.await_block_sync().await?;
947 Ok(())
948 }
949
950 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
951 self.bitcoind.mine_blocks(blocks).await?;
952 self.await_block_sync().await?;
953 Ok(())
954 }
955
956 pub fn num_members(&self) -> usize {
957 self.members.len()
958 }
959
960 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
961 self.members
962 .keys()
963 .map(|&peer_id| PeerId::from(peer_id as u16))
964 }
965}
966
967#[derive(Clone)]
968pub struct Fedimintd {
969 _bitcoind: Bitcoind,
970 process: ProcessHandle,
971}
972
973impl Fedimintd {
974 pub async fn new(
975 process_mgr: &ProcessManager,
976 bitcoind: Bitcoind,
977 peer_id: usize,
978 env: &vars::Fedimintd,
979 fed_name: String,
980 ) -> Result<Self> {
981 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
982 let process = process_mgr
983 .spawn_daemon(
984 &format!("fedimintd-{fed_name}-{peer_id}"),
985 cmd!(FedimintdCmd).envs(env.vars()),
986 )
987 .await?;
988
989 Ok(Self {
990 _bitcoind: bitcoind,
991 process,
992 })
993 }
994
995 pub async fn terminate(self) -> Result<()> {
996 self.process.terminate().await
997 }
998
999 pub async fn await_terminated(&self) -> Result<()> {
1000 self.process.await_terminated().await
1001 }
1002}
1003
1004pub async fn run_cli_dkg(
1005 params: HashMap<PeerId, ConfigGenParams>,
1006 endpoints: BTreeMap<PeerId, String>,
1007) -> Result<()> {
1008 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1009
1010 debug!(target: LOG_DEVIMINT, "Running DKG");
1011 for endpoint in endpoints.values() {
1012 poll("trying-to-connect-to-peers", || async {
1013 crate::util::FedimintCli
1014 .ws_status(endpoint)
1015 .await
1016 .context("dkg status")
1017 .map_err(ControlFlow::Continue)
1018 })
1019 .await?;
1020 }
1021
1022 debug!(target: LOG_DEVIMINT, "Connected to all peers");
1023
1024 for (peer_id, endpoint) in &endpoints {
1025 let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1026 assert_eq!(
1027 status.server,
1028 ServerStatusLegacy::AwaitingPassword,
1029 "peer_id isn't waiting for password: {peer_id}"
1030 );
1031 }
1032
1033 debug!(target: LOG_DEVIMINT, "Setting passwords");
1034 for (peer_id, endpoint) in &endpoints {
1035 crate::util::FedimintCli
1036 .set_password(auth_for(peer_id), endpoint)
1037 .await?;
1038 }
1039 let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1040 let followers = endpoints
1041 .iter()
1042 .filter(|(id, _)| *id != leader_id)
1043 .collect::<BTreeMap<_, _>>();
1044
1045 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1046 let leader_name = "leader".to_string();
1047 crate::util::FedimintCli
1048 .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1049 .await?;
1050
1051 let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1052
1053 debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1054 cli_set_config_gen_params(
1055 leader_endpoint,
1056 auth_for(leader_id),
1057 server_gen_params.clone(),
1058 )
1059 .await?;
1060
1061 let followers_names = followers
1062 .keys()
1063 .map(|peer_id| {
1064 (*peer_id, {
1065 let random_string = rand::thread_rng()
1067 .sample_iter(&rand::distributions::Alphanumeric)
1068 .take(5)
1069 .map(char::from)
1070 .collect::<String>();
1071 format!("random-{random_string}{peer_id}")
1072 })
1073 })
1074 .collect::<BTreeMap<_, _>>();
1075 for (peer_id, endpoint) in &followers {
1076 let name = followers_names
1077 .get(peer_id)
1078 .context("missing follower name")?;
1079 debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1080
1081 crate::util::FedimintCli
1082 .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1083 .await?;
1084
1085 cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1086 }
1087
1088 debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1089 let peers = crate::util::FedimintCli
1090 .get_config_gen_peers(leader_endpoint)
1091 .await?;
1092
1093 let found_names = peers
1094 .into_iter()
1095 .map(|peer| peer.name)
1096 .collect::<HashSet<_>>();
1097 let all_names = followers_names
1098 .values()
1099 .cloned()
1100 .chain(iter::once(leader_name))
1101 .collect::<HashSet<_>>();
1102 assert_eq!(found_names, all_names);
1103
1104 debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1105 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1106
1107 debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1108 let mut configs = vec![];
1109 for endpoint in endpoints.values() {
1110 let config = crate::util::FedimintCli
1111 .consensus_config_gen_params_legacy(endpoint)
1112 .await?;
1113 configs.push(config);
1114 }
1115 let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1117 consensus.dedup();
1118 assert_eq!(consensus.len(), 1);
1119 let ids = configs
1121 .iter()
1122 .map(|p| p.our_current_id)
1123 .collect::<HashSet<_>>();
1124 assert_eq!(ids.len(), endpoints.len());
1125 let dkg_results = endpoints
1126 .iter()
1127 .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1128 debug!(target: LOG_DEVIMINT, "Running DKG");
1129 let (dkg_results, leader_wait_result) = tokio::join!(
1130 join_all(dkg_results),
1131 cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1132 );
1133 for result in dkg_results {
1134 result?;
1135 }
1136 leader_wait_result?;
1137
1138 debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1140 let mut hashes = HashSet::new();
1141 for (peer_id, endpoint) in &endpoints {
1142 cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1143 let hash = crate::util::FedimintCli
1144 .get_verify_config_hash(auth_for(peer_id), endpoint)
1145 .await?;
1146 hashes.insert(hash);
1147 }
1148 assert_eq!(hashes.len(), 1);
1149 for (peer_id, endpoint) in &endpoints {
1150 let result = crate::util::FedimintCli
1151 .start_consensus(auth_for(peer_id), endpoint)
1152 .await;
1153 if let Err(e) = result {
1154 tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1155 }
1156 cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1157 }
1158 Ok(())
1159}
1160
1161pub async fn run_cli_dkg_v2(
1162 params: HashMap<PeerId, ConfigGenParams>,
1163 endpoints: BTreeMap<PeerId, String>,
1164) -> Result<()> {
1165 let auth_for = |peer: &PeerId| -> &ApiAuth { ¶ms[peer].api_auth };
1166
1167 let status_futures = endpoints.iter().map(|(peer, endpoint)| {
1169 let peer = *peer;
1170 let endpoint = endpoint.clone();
1171 async move {
1172 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1173 crate::util::FedimintCli
1174 .setup_status(auth_for(&peer), &endpoint)
1175 .await
1176 .map_err(ControlFlow::Continue)
1177 })
1178 .await
1179 .unwrap();
1180
1181 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1182 }
1183 });
1184 join_all(status_futures).await;
1185
1186 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1187
1188 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1190 let peer = *peer;
1191 let endpoint = endpoint.clone();
1192 async move {
1193 let info = if peer.to_usize() == 0 {
1194 crate::util::FedimintCli
1195 .set_local_params_leader(&peer, auth_for(&peer), &endpoint)
1196 .await
1197 } else {
1198 crate::util::FedimintCli
1199 .set_local_params_follower(&peer, auth_for(&peer), &endpoint)
1200 .await
1201 };
1202 info.map(|i| (peer, i))
1203 }
1204 });
1205 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1206 .await?
1207 .into_iter()
1208 .collect();
1209
1210 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1211
1212 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1215 endpoints
1216 .iter()
1217 .filter(move |(p, _)| *p != peer)
1218 .map(move |(p, endpoint)| {
1219 let p = *p;
1220 let endpoint = endpoint.clone();
1221 let info = info.clone();
1222 async move {
1223 crate::util::FedimintCli
1224 .add_peer(&info, auth_for(&p), &endpoint)
1225 .await
1226 }
1227 })
1228 });
1229 try_join_all(add_peer_futures).await?;
1230
1231 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1232
1233 let start_dkg_futures = endpoints.iter().map(|(peer, endpoint)| {
1235 let peer = *peer;
1236 let endpoint = endpoint.clone();
1237 async move {
1238 crate::util::FedimintCli
1239 .start_dkg(auth_for(&peer), &endpoint)
1240 .await
1241 }
1242 });
1243 try_join_all(start_dkg_futures).await?;
1244
1245 Ok(())
1246}
1247
1248async fn cli_set_config_gen_params(
1249 endpoint: &str,
1250 auth: &ApiAuth,
1251 mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1252) -> Result<()> {
1253 self::config::attach_default_module_init_params(
1254 &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1255 &mut server_gen_params,
1256 Network::Regtest,
1257 10,
1258 );
1259
1260 let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1261
1262 crate::util::FedimintCli
1263 .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1264 .await?;
1265
1266 Ok(())
1267}
1268
1269async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1270 poll(
1271 &format!("waiting-server-status: {expected_status:?}"),
1272 || async {
1273 let server_status = crate::util::FedimintCli
1274 .ws_status(endpoint)
1275 .await
1276 .context("server status")
1277 .map_err(ControlFlow::Continue)?
1278 .server;
1279 if server_status == expected_status {
1280 Ok(())
1281 } else {
1282 Err(ControlFlow::Continue(anyhow!(
1283 "expected status: {expected_status:?} current status: {server_status:?}"
1284 )))
1285 }
1286 },
1287 )
1288 .await?;
1289 Ok(())
1290}