1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_server::config::io::{
27 CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
28};
29use fedimint_testing_core::config::API_AUTH;
30use fedimint_testing_core::node_type::LightningNodeType;
31use fedimint_wallet_client::WalletClientModule;
32use fedimint_wallet_client::config::WalletClientConfig;
33use fs_lock::FileLock;
34use futures::future::{join_all, try_join_all};
35use tokio::task::{JoinSet, spawn_blocking};
36use tokio::time::Instant;
37use tracing::{debug, info};
38
39use super::external::Bitcoind;
40use super::util::{Command, ProcessHandle, ProcessManager, cmd};
41use super::vars::utf8;
42use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
43use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
44use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
45use crate::{poll_almost_equal, poll_eq, vars};
46
47pub const PORTS_PER_FEDIMINTD: u16 = 4;
50pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
52pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
54pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
56pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
58
59#[derive(Clone)]
60pub struct Federation {
61 pub members: BTreeMap<usize, Fedimintd>,
63 pub vars: BTreeMap<usize, vars::Fedimintd>,
64 pub bitcoind: Bitcoind,
65
66 client: JitTryAnyhow<Client>,
68 #[allow(dead_code)] connectors: ConnectorRegistry,
70}
71
72impl Drop for Federation {
73 fn drop(&mut self) {
74 block_in_place(|| {
75 block_on(async {
76 let mut set = JoinSet::new();
77
78 while let Some((_id, fedimintd)) = self.members.pop_first() {
79 set.spawn(async { drop(fedimintd) });
80 }
81 while (set.join_next().await).is_some() {}
82 });
83 });
84 }
85}
86#[derive(Clone)]
88pub struct Client {
89 name: String,
90}
91
92impl Client {
93 fn clients_dir() -> PathBuf {
94 let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
95 .expect("FM_DATA_DIR_ENV not set")
96 .parse()
97 .expect("FM_DATA_DIR_ENV invalid");
98 data_dir.join("clients")
99 }
100
101 fn client_dir(&self) -> PathBuf {
102 Self::clients_dir().join(&self.name)
103 }
104
105 pub fn client_name_lock(name: &str) -> Result<FileLock> {
106 let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
107 let file_lock = std::fs::OpenOptions::new()
108 .write(true)
109 .create(true)
110 .truncate(true)
111 .open(&lock_path)
112 .with_context(|| format!("Failed to open {}", lock_path.display()))?;
113
114 fs_lock::FileLock::new_exclusive(file_lock)
115 .with_context(|| format!("Failed to lock {}", lock_path.display()))
116 }
117
118 pub async fn create(name: impl ToString) -> Result<Client> {
120 let name = name.to_string();
121 spawn_blocking(move || {
122 let _lock = Self::client_name_lock(&name);
123 for i in 0u64.. {
124 let client = Self {
125 name: format!("{name}-{i}"),
126 };
127
128 if !client.client_dir().exists() {
129 std::fs::create_dir_all(client.client_dir())?;
130 return Ok(client);
131 }
132 }
133 unreachable!()
134 })
135 .await?
136 }
137
138 pub fn open_or_create(name: &str) -> Result<Client> {
140 block_in_place(|| {
141 let _lock = Self::client_name_lock(name);
142 let client = Self {
143 name: format!("{name}-0"),
144 };
145 if !client.client_dir().exists() {
146 std::fs::create_dir_all(client.client_dir())?;
147 }
148 Ok(client)
149 })
150 }
151
152 pub async fn join_federation(&self, invite_code: String) -> Result<()> {
154 debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
155 cmd!(self, "join-federation", invite_code).run().await?;
156
157 Ok(())
158 }
159
160 pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
162 debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
163 cmd!(
164 self,
165 "restore",
166 "--invite-code",
167 invite_code,
168 "--mnemonic",
169 mnemonic
170 )
171 .run()
172 .await?;
173
174 Ok(())
175 }
176
177 pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
179 let restored = Self::open_or_create(name)?;
180
181 let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
182 .as_str()
183 .unwrap()
184 .to_owned();
185
186 debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
187 cmd!(
188 restored,
189 "restore",
190 "--invite-code",
191 invite_code,
192 "--mnemonic",
193 mnemonic
194 )
195 .run()
196 .await?;
197
198 Ok(restored)
199 }
200
201 pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
204 let new = Client::create(name).await?;
205
206 cmd!(
207 "cp",
208 "-R",
209 self.client_dir().join("client.db").display(),
210 new.client_dir().display()
211 )
212 .run()
213 .await?;
214
215 Ok(new)
216 }
217
218 pub async fn balance(&self) -> Result<u64> {
219 Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
220 .as_u64()
221 .unwrap())
222 }
223
224 pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
226 loop {
227 cmd!(self, "dev", "wait", "3").out_json().await?;
228
229 let balance = self.balance().await?;
230 if balance >= min_balance_msat {
231 return Ok(());
232 }
233
234 info!(
235 target: LOG_DEVIMINT,
236 balance,
237 min_balance_msat,
238 "Waiting for client balance to reach minimum"
239 );
240 }
241 }
242
243 pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
244 if crate::util::supports_wallet_v2() {
245 let address = cmd!(self, "module", "walletv2", "receive")
246 .out_json()
247 .await?;
248 Ok((address.as_str().unwrap().to_string(), String::new()))
250 } else {
251 let deposit = cmd!(self, "deposit-address").out_json().await?;
252 Ok((
253 deposit["address"].as_str().unwrap().to_string(),
254 deposit["operation_id"].as_str().unwrap().to_string(),
255 ))
256 }
257 }
258
259 pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
260 cmd!(self, "await-deposit", operation_id).run().await
261 }
262
263 pub fn cmd(&self) -> Command {
264 cmd!(
265 crate::util::get_fedimint_cli_path(),
266 format!("--data-dir={}", self.client_dir().display())
267 )
268 }
269
270 pub fn get_name(&self) -> &str {
271 &self.name
272 }
273
274 pub async fn get_session_count(&self) -> Result<u64> {
276 cmd!(self, "dev", "session-count").out_json().await?["count"]
277 .as_u64()
278 .context("count field wasn't a number")
279 }
280
281 pub async fn wait_complete(&self) -> Result<()> {
283 cmd!(self, "dev", "wait-complete").run().await
284 }
285
286 pub async fn wait_session(&self) -> anyhow::Result<()> {
288 info!("Waiting for a new session");
289 let session_count = self.get_session_count().await?;
290 self.wait_session_outcome(session_count).await?;
291 Ok(())
292 }
293
294 pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
296 let timeout = {
297 let current_session_count = self.get_session_count().await?;
298 let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
299 let session_duration_seconds = 180;
300 Duration::from_secs(sessions_to_wait * session_duration_seconds)
301 };
302
303 let start = Instant::now();
304 poll_with_timeout("Waiting for a new session", timeout, || async {
305 info!("Awaiting session outcome {session_count}");
306 match cmd!(self, "dev", "api", "await_session_outcome", session_count)
307 .run()
308 .await
309 {
310 Err(e) => Err(ControlFlow::Continue(e)),
311 Ok(()) => Ok(()),
312 }
313 })
314 .await?;
315
316 let session_found_in = start.elapsed();
317 info!("session found in {session_found_in:?}");
318 Ok(())
319 }
320}
321
322impl Federation {
323 pub async fn new(
324 process_mgr: &ProcessManager,
325 bitcoind: Bitcoind,
326 skip_setup: bool,
327 pre_dkg: bool,
328 pre_restore: bool,
329 fed_index: usize,
331 federation_name: String,
332 ) -> Result<Self> {
333 let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
334 let mut members = BTreeMap::new();
335 let mut peer_to_env_vars_map = BTreeMap::new();
336
337 let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
338 let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
339
340 let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
341 for peer_id in num_peers.peer_ids() {
342 let peer_env_vars = vars::Fedimintd::init(
343 &process_mgr.globals,
344 federation_name.clone(),
345 peer_id,
346 process_mgr
347 .globals
348 .fedimintd_overrides
349 .peer_expect(fed_index, peer_id),
350 )
351 .await?;
352 members.insert(
353 peer_id.to_usize(),
354 Fedimintd::new(
355 process_mgr,
356 bitcoind.clone(),
357 peer_id.to_usize(),
358 &peer_env_vars,
359 federation_name.clone(),
360 )
361 .await?,
362 );
363 let admin_client = DynGlobalApi::new_admin_setup(
364 connectors.clone(),
365 SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
366 )?;
369 api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
370 admin_clients.insert(peer_id, admin_client);
371 peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
372 }
373
374 if !skip_setup && !pre_dkg {
375 let (original_fedimint_cli_path, original_fm_mint_client) =
378 crate::util::use_matching_fedimint_cli_for_dkg().await?;
379
380 run_cli_dkg_v2(api_endpoints).await?;
381
382 crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
384
385 let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
387 let invite_code_filename_original = "invite-code";
388
389 for peer_env_vars in peer_to_env_vars_map.values() {
390 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
391
392 let invite_code = poll_simple("awaiting-invite-code", || async {
393 let path = format!("{peer_data_dir}/{invite_code_filename_original}");
394 tokio::fs::read_to_string(&path)
395 .await
396 .with_context(|| format!("Awaiting invite code file: {path}"))
397 })
398 .await
399 .context("Awaiting invite code file")?;
400
401 download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
402 .await?;
403 }
404
405 let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
407
408 tokio::fs::copy(
409 format!("{peer_data_dir}/{invite_code_filename_original}"),
410 format!("{client_dir}/{invite_code_filename_original}"),
411 )
412 .await
413 .context("copying invite-code file")?;
414
415 for (index, peer_env_vars) in &peer_to_env_vars_map {
418 let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
419
420 let invite_code_filename_indexed =
421 format!("{invite_code_filename_original}-{index}");
422 tokio::fs::rename(
423 format!("{peer_data_dir}/{invite_code_filename_original}"),
424 format!("{client_dir}/{invite_code_filename_indexed}"),
425 )
426 .await
427 .context("moving invite-code file")?;
428 }
429
430 debug!("Moved invite-code files to client data directory");
431
432 if pre_restore {
433 Self::restart_guardian_for_manual_restore(
434 process_mgr,
435 &mut members,
436 &peer_to_env_vars_map,
437 &bitcoind,
438 )
439 .await?;
440 }
441 }
442
443 let client = JitTryAnyhow::new_try({
444 move || async move {
445 let client = Client::open_or_create(federation_name.as_str())?;
446 let invite_code = Self::invite_code_static()?;
447 if !skip_setup && !pre_dkg {
448 cmd!(client, "join-federation", invite_code).run().await?;
449 }
450 Ok(client)
451 }
452 });
453
454 Ok(Self {
455 members,
456 vars: peer_to_env_vars_map,
457 bitcoind,
458 client,
459 connectors,
460 })
461 }
462
463 async fn restart_guardian_for_manual_restore(
464 process_mgr: &ProcessManager,
465 members: &mut BTreeMap<usize, Fedimintd>,
466 peer_to_env_vars_map: &BTreeMap<usize, vars::Fedimintd>,
467 bitcoind: &Bitcoind,
468 ) -> Result<()> {
469 const RESTORE_PEER: usize = 0;
470
471 let peer_env_vars = &peer_to_env_vars_map[&RESTORE_PEER];
472 let backup_dir = process_mgr.globals.FM_TEST_DIR.join("fedimintd-backups");
473 tokio::fs::create_dir_all(&backup_dir)
474 .await
475 .context("Creating fedimintd backup directory")?;
476 let backup_path = backup_dir.join("fedimint-0-guardian-backup.tar");
477
478 Self::write_guardian_backup_tar(&peer_env_vars.FM_DATA_DIR, &backup_path).await?;
479 info!(
480 target: LOG_DEVIMINT,
481 path = %backup_path.display(),
482 "Wrote guardian backup for manual restore"
483 );
484
485 let fedimintd = members
486 .remove(&RESTORE_PEER)
487 .context("Missing fedimint-0 process")?;
488 fedimintd.terminate().await?;
489 tokio::fs::remove_dir_all(&peer_env_vars.FM_DATA_DIR)
490 .await
491 .with_context(|| format!("Removing {}", peer_env_vars.FM_DATA_DIR.display()))?;
492 tokio::fs::create_dir_all(&peer_env_vars.FM_DATA_DIR)
493 .await
494 .with_context(|| format!("Creating {}", peer_env_vars.FM_DATA_DIR.display()))?;
495
496 let fedimintd = Fedimintd::new(
497 process_mgr,
498 bitcoind.clone(),
499 RESTORE_PEER,
500 peer_env_vars,
501 "default".to_string(),
502 )
503 .await?;
504 members.insert(RESTORE_PEER, fedimintd);
505
506 info!(
507 target: LOG_DEVIMINT,
508 ui = %peer_env_vars.FM_BIND_UI,
509 backup = %backup_path.display(),
510 "fedimint-0 restarted in setup mode for manual restore"
511 );
512
513 Ok(())
514 }
515
516 async fn write_guardian_backup_tar(data_dir: &Path, backup_path: &Path) -> Result<()> {
517 let data_dir = data_dir.to_path_buf();
518 let backup_path = backup_path.to_path_buf();
519 spawn_blocking(move || {
520 let file = fs::File::options()
521 .write(true)
522 .create_new(true)
523 .open(&backup_path)
524 .with_context(|| format!("Creating {}", backup_path.display()))?;
525 let mut archive = tar::Builder::new(file);
526 for path in [
527 PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
528 PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
529 PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
530 PathBuf::from(SALT_FILE),
531 ] {
532 archive
533 .append_path_with_name(data_dir.join(&path), &path)
534 .with_context(|| format!("Adding {} to backup", path.display()))?;
535 }
536 archive.finish().context("Finishing guardian backup tar")?;
537 Ok::<_, anyhow::Error>(())
538 })
539 .await?
540 }
541
542 pub fn client_config(&self) -> Result<ClientConfig> {
543 let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
544 load_from_file(&cfg_path)
545 }
546
547 pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
549 self.client_config()?
550 .modules
551 .iter()
552 .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
553 .with_context(|| format!("Module kind {kind} not found"))
554 }
555
556 pub fn module_client_config<M: ClientModule>(
557 &self,
558 ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
559 self.client_config()?
560 .modules
561 .iter()
562 .find_map(|(module_instance_id, module_cfg)| {
563 if module_cfg.kind == M::kind() {
564 let decoders = ModuleDecoderRegistry::new(vec![(
565 *module_instance_id,
566 M::kind(),
567 M::decoder(),
568 )]);
569 Some(
570 module_cfg
571 .config
572 .clone()
573 .redecode_raw(&decoders)
574 .expect("Decoding client cfg failed")
575 .expect_decoded_ref()
576 .as_any()
577 .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
578 .cloned()
579 .context("Cast to module config failed"),
580 )
581 } else {
582 None
583 }
584 })
585 .transpose()
586 }
587
588 pub fn deposit_fees(&self) -> Result<Amount> {
589 if crate::util::supports_wallet_v2() {
590 Ok(self
591 .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
592 .context("No walletv2 module found")?
593 .fee_consensus
594 .base)
595 } else {
596 Ok(self
597 .module_client_config::<WalletClientModule>()?
598 .context("No wallet module found")?
599 .fee_consensus
600 .peg_in_abs)
601 }
602 }
603
604 pub fn invite_code(&self) -> Result<String> {
606 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
607 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
608 Ok(invite_code)
609 }
610
611 pub fn invite_code_static() -> Result<String> {
612 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
613 let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
614 Ok(invite_code)
615 }
616 pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
617 let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
618 let name = format!("invite-code-{peer_id}");
619 let invite_code = fs::read_to_string(data_dir.join(name))?;
620 Ok(invite_code)
621 }
622
623 pub async fn internal_client(&self) -> Result<&Client> {
627 self.client
628 .get_try()
629 .await
630 .context("Internal client joining Federation")
631 }
632
633 pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
635 let client = Client::create(name).await?;
636 client.join_federation(self.invite_code()?).await?;
637 Ok(client)
638 }
639
640 pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
641 if self.members.contains_key(&peer) {
642 bail!("fedimintd-{peer} already running");
643 }
644 self.members.insert(
645 peer,
646 Fedimintd::new(
647 process_mgr,
648 self.bitcoind.clone(),
649 peer,
650 &self.vars[&peer],
651 "default".to_string(),
652 )
653 .await?,
654 );
655 Ok(())
656 }
657
658 pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
659 let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
660 bail!("fedimintd-{peer_id} does not exist");
661 };
662 fedimintd.terminate().await?;
663 Ok(())
664 }
665
666 pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
667 let Some(fedimintd) = self.members.get_mut(&peer_id) else {
668 bail!("fedimintd-{peer_id} does not exist");
669 };
670 fedimintd.await_terminated().await?;
671 self.members.remove(&peer_id);
672 Ok(())
673 }
674
675 pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
677 info!("starting all servers");
678 let fed_size = process_mgr.globals.FM_FED_SIZE;
679 for peer_id in 0..fed_size {
680 if self.members.contains_key(&peer_id) {
681 continue;
682 }
683 self.start_server(process_mgr, peer_id).await?;
684 }
685 self.await_all_peers().await?;
686 Ok(())
687 }
688
689 pub async fn terminate_all_servers(&mut self) -> Result<()> {
691 info!("terminating all servers");
692 let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
693 for peer_id in running_peer_ids {
694 self.terminate_server(peer_id).await?;
695 }
696 Ok(())
697 }
698
699 pub async fn restart_all_staggered_with_bin(
704 &mut self,
705 process_mgr: &ProcessManager,
706 bin_path: &PathBuf,
707 ) -> Result<()> {
708 let fed_size = process_mgr.globals.FM_FED_SIZE;
709
710 self.start_all_servers(process_mgr).await?;
712
713 while self.num_members() > 0 {
715 self.terminate_server(self.num_members() - 1).await?;
716 if self.num_members() > 0 {
717 fedimint_core::task::sleep_in_test(
718 "waiting to shutdown remaining peers",
719 Duration::from_secs(10),
720 )
721 .await;
722 }
723 }
724
725 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
727
728 for peer_id in 0..fed_size {
730 self.start_server(process_mgr, peer_id).await?;
731 if peer_id < fed_size - 1 {
732 fedimint_core::task::sleep_in_test(
733 "waiting to restart remaining peers",
734 Duration::from_secs(10),
735 )
736 .await;
737 }
738 }
739
740 self.await_all_peers().await?;
741
742 let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
743 info!("upgraded fedimintd to version: {}", fedimintd_version);
744 Ok(())
745 }
746
747 pub async fn restart_all_with_bin(
748 &mut self,
749 process_mgr: &ProcessManager,
750 bin_path: &PathBuf,
751 ) -> Result<()> {
752 let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
754 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
756 unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
758
759 self.restart_all_staggered_with_bin(process_mgr, bin_path)
760 .await
761 }
762
763 pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
764 let fed_size = process_mgr.globals.FM_FED_SIZE;
765 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
766 anyhow::ensure!(
767 fed_size > 3 * offline_nodes,
768 "too many offline nodes ({offline_nodes}) to reach consensus"
769 );
770
771 while self.num_members() > fed_size - offline_nodes {
772 self.terminate_server(self.num_members() - 1).await?;
773 }
774
775 if offline_nodes > 0 {
776 info!(fed_size, offline_nodes, "federation is degraded");
777 }
778 Ok(())
779 }
780
781 pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
782 self.bitcoind.send_to(address, amount).await?;
783
784 self.bitcoind.mine_blocks(21).await?;
785
786 Ok(())
787 }
788
789 pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
790 let deposit_fees_msat = self.deposit_fees()?.msats;
791 assert_eq!(
792 deposit_fees_msat % 1000,
793 0,
794 "Deposit fees expected to be whole sats in test suite"
795 );
796 let deposit_fees = deposit_fees_msat / 1000;
797 info!(amount, deposit_fees, "Pegging-in client funds");
798
799 let (address, operation_id) = client.get_deposit_addr().await?;
800
801 self.bitcoind
802 .send_to(address, amount + deposit_fees)
803 .await?;
804 self.bitcoind.mine_blocks(21).await?;
805
806 Ok(operation_id)
807 }
808
809 pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
810 let initial_balance = if crate::util::supports_wallet_v2() {
813 Some(client.balance().await?)
814 } else {
815 None
816 };
817
818 let operation_id = self.pegin_client_no_wait(amount, client).await?;
819
820 if let Some(initial) = initial_balance {
821 let expected_balance = initial + (amount * 1000 * 9 / 10);
824 client.await_balance(expected_balance).await?;
825 } else {
826 client.await_deposit(&operation_id).await?;
827 }
828 Ok(())
829 }
830
831 pub async fn pegin_gateways(
834 &self,
835 amount: u64,
836 gateways: Vec<&super::gatewayd::Gatewayd>,
837 ) -> Result<()> {
838 let deposit_fees_msat = self.deposit_fees()?.msats;
839 assert_eq!(
840 deposit_fees_msat % 1000,
841 0,
842 "Deposit fees expected to be whole sats in test suite"
843 );
844 let deposit_fees = deposit_fees_msat / 1000;
845 info!(amount, deposit_fees, "Pegging-in gateway funds");
846 let fed_id = self.calculate_federation_id();
847
848 let uses_walletv2 = crate::util::supports_wallet_v2();
852 let mut initial_balances = Vec::new();
853 if uses_walletv2 {
854 for gw in &gateways {
855 let balance = gw
856 .client()
857 .ecash_balance(fed_id.clone())
858 .await
859 .expect("failed to fetch initial gateway balance");
860 initial_balances.push(balance);
861 }
862 }
863
864 for gw in gateways.clone() {
865 let pegin_addr = gw.client().get_pegin_addr(&fed_id).await?;
866 self.bitcoind
867 .send_to(pegin_addr, amount + deposit_fees)
868 .await?;
869 }
870
871 self.bitcoind.mine_blocks(21).await?;
872 let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
873 try_join_all(gateways.into_iter().enumerate().map(|(i, gw)| {
874 let initial_balance = if uses_walletv2 {
875 initial_balances[i]
876 } else {
877 0
878 };
879 let fed_id = fed_id.clone();
880 poll("gateway pegin", move || {
881 let fed_id = fed_id.clone();
882 async move {
883 let gw_info = gw
884 .client()
885 .get_info()
886 .await
887 .map_err(ControlFlow::Continue)?;
888
889 let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
890 gw_info["block_height"]
891 .as_u64()
892 .expect("Could not parse block height")
893 } else {
894 gw_info["lightning_info"]["connected"]["block_height"]
895 .as_u64()
896 .expect("Could not parse block height")
897 };
898
899 if bitcoind_block_height != block_height {
900 return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
901 "gateway block height is not synced"
902 )));
903 }
904
905 let gateway_balance = gw
906 .client()
907 .ecash_balance(fed_id)
908 .await
909 .map_err(ControlFlow::Continue)?;
910
911 if uses_walletv2 {
912 let expected = initial_balance + (amount * 1000 * 9 / 10);
916 if gateway_balance >= expected {
917 Ok(())
918 } else {
919 Err(ControlFlow::Continue(anyhow::anyhow!(
920 "Gateway balance {gateway_balance} has not reached expected {expected} (initial: {initial_balance})"
921 )))
922 }
923 } else {
924 poll_almost_equal!(gateway_balance, amount * 1000)
925 }
926 }
927 })
928 }))
929 .await?;
930
931 Ok(())
932 }
933
934 pub async fn pegout_gateways(
937 &self,
938 amount: u64,
939 gateways: Vec<&super::gatewayd::Gatewayd>,
940 ) -> Result<()> {
941 info!(amount, "Pegging-out gateway funds");
942 let fed_id = self.calculate_federation_id();
943 let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
944 for gw in gateways.clone() {
945 let prev_fed_ecash_balance = gw
946 .client()
947 .get_balances()
948 .await?
949 .ecash_balances
950 .into_iter()
951 .find(|fed| fed.federation_id.to_string() == fed_id)
952 .expect("Gateway has not joined federation")
953 .ecash_balance_msats;
954
955 let pegout_address = self.bitcoind.get_new_address().await?;
956 let response = gw
957 .client()
958 .pegout(fed_id.clone(), amount, pegout_address)
959 .await?;
960 peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
961 }
962 self.bitcoind.mine_blocks(21).await?;
963
964 try_join_all(
965 peg_outs
966 .values()
967 .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
968 )
969 .await?;
970
971 for gw in gateways.clone() {
972 let after_fed_ecash_balance = gw
973 .client()
974 .get_balances()
975 .await?
976 .ecash_balances
977 .into_iter()
978 .find(|fed| fed.federation_id.to_string() == fed_id)
979 .expect("Gateway has not joined federation")
980 .ecash_balance_msats;
981
982 let ln_type = gw.ln.ln_type();
983 let prev_balance = peg_outs
984 .get(&ln_type)
985 .expect("peg out does not exist")
986 .0
987 .msats;
988 let fees = peg_outs
989 .get(&ln_type)
990 .expect("peg out does not exist")
991 .1
992 .fees;
993 let total_fee = fees.amount().to_sat() * 1000;
994 let tolerance = if crate::util::supports_wallet_v2() {
997 let amount_sats = amount / 1000;
998 let module_fee_sats = 100 + amount_sats / 100;
999 module_fee_sats * 1000 + 2000
1000 } else if crate::util::supports_mint_v2() {
1001 4000
1002 } else {
1003 2000
1004 };
1005 crate::util::almost_equal(
1006 after_fed_ecash_balance.msats,
1007 prev_balance - amount - total_fee,
1008 tolerance,
1009 )
1010 .map_err(|e| {
1011 anyhow::anyhow!(
1012 "new balance did not equal prev balance minus withdraw_amount minus fees: {e}"
1013 )
1014 })?;
1015 }
1016
1017 Ok(())
1018 }
1019
1020 pub fn calculate_federation_id(&self) -> String {
1021 self.client_config()
1022 .unwrap()
1023 .global
1024 .calculate_federation_id()
1025 .to_string()
1026 }
1027
1028 pub async fn await_block_sync(&self) -> Result<u64> {
1029 let finality_delay = self.get_finality_delay()?;
1030 let block_count = self.bitcoind.get_block_count().await?;
1031 let expected = block_count.saturating_sub(finality_delay.into());
1032
1033 if crate::util::supports_wallet_v2() {
1034 let client = self.internal_client().await?;
1036 loop {
1037 let value = cmd!(client, "module", "walletv2", "info", "block-count")
1038 .out_json()
1039 .await?;
1040 let current: u64 = serde_json::from_value(value)?;
1041 if current >= expected {
1042 break;
1043 }
1044 fedimint_core::task::sleep_in_test(
1045 format!("Waiting for consensus block count to reach {expected}"),
1046 std::time::Duration::from_secs(1),
1047 )
1048 .await;
1049 }
1050 } else {
1051 cmd!(
1052 self.internal_client().await?,
1053 "dev",
1054 "wait-block-count",
1055 expected
1056 )
1057 .run()
1058 .await?;
1059 }
1060
1061 Ok(expected)
1062 }
1063
1064 fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
1065 if crate::util::supports_wallet_v2() {
1067 return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
1068 }
1069
1070 let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
1071 let client_config = &self.client_config()?;
1072 let wallet_cfg = client_config
1073 .modules
1074 .get(&wallet_instance_id)
1075 .context("wallet module not found")?
1076 .clone()
1077 .redecode_raw(&ModuleDecoderRegistry::new([(
1078 wallet_instance_id,
1079 fedimint_wallet_client::KIND,
1080 fedimint_wallet_client::WalletModuleTypes::decoder(),
1081 )]))?;
1082 let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
1083
1084 let finality_delay = wallet_cfg.finality_delay;
1085 Ok(finality_delay)
1086 }
1087
1088 pub async fn await_gateways_registered(&self) -> Result<()> {
1089 let start_time = Instant::now();
1090 debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
1091
1092 poll("gateways registered", || async {
1093 let num_gateways = cmd!(
1094 self.internal_client()
1095 .await
1096 .map_err(ControlFlow::Continue)?,
1097 "list-gateways"
1098 )
1099 .out_json()
1100 .await
1101 .map_err(ControlFlow::Continue)?
1102 .as_array()
1103 .context("invalid output")
1104 .map_err(ControlFlow::Break)?
1105 .len();
1106
1107 let expected_gateways =
1110 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
1111 1
1112 } else {
1113 2
1114 };
1115
1116 poll_eq!(num_gateways, expected_gateways)
1117 })
1118 .await?;
1119 debug!(target: LOG_DEVIMINT,
1120 elapsed_ms = %start_time.elapsed().as_millis(),
1121 "Gateways registered");
1122 Ok(())
1123 }
1124
1125 pub async fn await_all_peers(&self) -> Result<()> {
1126 let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
1127 ("walletv2", "consensus_block_count")
1128 } else {
1129 ("wallet", "block_count")
1130 };
1131 poll("Waiting for all peers to be online", || async {
1132 cmd!(
1133 self.internal_client()
1134 .await
1135 .map_err(ControlFlow::Continue)?,
1136 "dev",
1137 "api",
1138 "--module",
1139 module_name,
1140 endpoint
1141 )
1142 .run()
1143 .await
1144 .map_err(ControlFlow::Continue)?;
1145 Ok(())
1146 })
1147 .await
1148 }
1149
1150 pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1151 poll("Waiting for all peers to be online", || async {
1152 cmd!(
1153 self.internal_client()
1154 .await
1155 .map_err(ControlFlow::Continue)?,
1156 "dev",
1157 "api",
1158 "--peer-id",
1159 peer_id,
1160 "--module",
1161 "wallet",
1162 "block_count"
1163 )
1164 .run()
1165 .await
1166 .map_err(ControlFlow::Continue)?;
1167 Ok(())
1168 })
1169 .await
1170 }
1171
1172 pub async fn finalize_mempool_tx(&self) -> Result<()> {
1182 let finality_delay = self.get_finality_delay()?;
1183 let blocks_to_mine = finality_delay + 1;
1184 self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1185 self.await_block_sync().await?;
1186 Ok(())
1187 }
1188
1189 pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1190 self.bitcoind.mine_blocks(blocks).await?;
1191 self.await_block_sync().await?;
1192 Ok(())
1193 }
1194
1195 pub fn num_members(&self) -> usize {
1196 self.members.len()
1197 }
1198
1199 pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1200 self.members
1201 .keys()
1202 .map(|&peer_id| PeerId::from(peer_id as u16))
1203 }
1204}
1205
1206#[derive(Clone)]
1207pub struct Fedimintd {
1208 _bitcoind: Bitcoind,
1209 process: ProcessHandle,
1210}
1211
1212impl Fedimintd {
1213 pub async fn new(
1214 process_mgr: &ProcessManager,
1215 bitcoind: Bitcoind,
1216 peer_id: usize,
1217 env: &vars::Fedimintd,
1218 fed_name: String,
1219 ) -> Result<Self> {
1220 debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1221 let process = process_mgr
1222 .spawn_daemon(
1223 &format!("fedimintd-{fed_name}-{peer_id}"),
1224 cmd!(FedimintdCmd).envs(env.vars()),
1225 )
1226 .await?;
1227
1228 Ok(Self {
1229 _bitcoind: bitcoind,
1230 process,
1231 })
1232 }
1233
1234 pub async fn terminate(self) -> Result<()> {
1235 self.process.terminate().await
1236 }
1237
1238 pub async fn await_terminated(&self) -> Result<()> {
1239 self.process.await_terminated().await
1240 }
1241}
1242
1243pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1244 let status_futures = endpoints.values().map(|endpoint| {
1246 let endpoint = endpoint.clone();
1247 async move {
1248 let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1249 crate::util::FedimintCli
1250 .setup_status(&API_AUTH, &endpoint)
1251 .await
1252 .map_err(ControlFlow::Continue)
1253 })
1254 .await
1255 .unwrap();
1256
1257 assert_eq!(status, SetupStatus::AwaitingLocalParams);
1258 }
1259 });
1260 join_all(status_futures).await;
1261
1262 debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1263
1264 let federation_size =
1267 if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1268 Some(endpoints.len())
1269 } else {
1270 None
1271 };
1272 let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1273 let peer = *peer;
1274 let endpoint = endpoint.clone();
1275 async move {
1276 let info = if peer.to_usize() == 0 {
1277 crate::util::FedimintCli
1278 .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1279 .await
1280 } else {
1281 crate::util::FedimintCli
1282 .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1283 .await
1284 };
1285 info.map(|i| (peer, i))
1286 }
1287 });
1288 let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1289 .await?
1290 .into_iter()
1291 .collect();
1292
1293 debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1294
1295 let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1298 endpoints
1299 .iter()
1300 .filter(move |(p, _)| *p != peer)
1301 .map(move |(_, endpoint)| {
1302 let endpoint = endpoint.clone();
1303 let info = info.clone();
1304 async move {
1305 crate::util::FedimintCli
1306 .add_peer(&info, &API_AUTH, &endpoint)
1307 .await
1308 }
1309 })
1310 });
1311 try_join_all(add_peer_futures).await?;
1312
1313 debug!(target: LOG_DEVIMINT, "Starting DKG...");
1314
1315 let start_dkg_futures = endpoints.values().map(|endpoint| {
1317 let endpoint = endpoint.clone();
1318 async move {
1319 crate::util::FedimintCli
1320 .start_dkg(&API_AUTH, &endpoint)
1321 .await
1322 }
1323 });
1324 try_join_all(start_dkg_futures).await?;
1325
1326 Ok(())
1327}