devimint/
federation.rs

1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::net::ConnectorType;
13use fedimint_api_client::api::{ConnectorRegistry, DynGlobalApi};
14use fedimint_client_module::module::ClientModule;
15use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
16use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
17use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
18use fedimint_core::envs::BitcoinRpcConfig;
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiAuth, ModuleCommon};
22use fedimint_core::runtime::block_in_place;
23use fedimint_core::task::block_on;
24use fedimint_core::task::jit::JitTryAnyhow;
25use fedimint_core::util::SafeUrl;
26use fedimint_core::{Amount, NumPeers, PeerId};
27use fedimint_gateway_common::WithdrawResponse;
28use fedimint_logging::LOG_DEVIMINT;
29use fedimint_server::config::ConfigGenParams;
30use fedimint_testing_core::config::local_config_gen_params;
31use fedimint_testing_core::node_type::LightningNodeType;
32use fedimint_wallet_client::WalletClientModule;
33use fedimint_wallet_client::config::WalletClientConfig;
34use fs_lock::FileLock;
35use futures::future::{join_all, try_join_all};
36use rand::Rng;
37use tokio::task::{JoinSet, spawn_blocking};
38use tokio::time::Instant;
39use tracing::{debug, info};
40
41use super::external::Bitcoind;
42use super::util::{Command, ProcessHandle, ProcessManager, cmd};
43use super::vars::utf8;
44use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
45use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
46use crate::{poll_almost_equal, poll_eq, vars};
47
48// TODO: Are we still using the 3rd port for anything?
49/// Number of ports we allocate for every `fedimintd` instance
50pub const PORTS_PER_FEDIMINTD: u16 = 4;
51/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
53/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
55/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
56pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
57/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
58pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
59
60#[derive(Clone)]
61pub struct Federation {
62    // client is only for internal use, use cli commands instead
63    pub members: BTreeMap<usize, Fedimintd>,
64    pub vars: BTreeMap<usize, vars::Fedimintd>,
65    pub bitcoind: Bitcoind,
66
67    /// Built in [`Client`], already joined
68    client: JitTryAnyhow<Client>,
69    #[allow(dead_code)] // Will need it later, maybe
70    connectors: ConnectorRegistry,
71}
72
73impl Drop for Federation {
74    fn drop(&mut self) {
75        block_in_place(|| {
76            block_on(async {
77                let mut set = JoinSet::new();
78
79                while let Some((_id, fedimintd)) = self.members.pop_first() {
80                    set.spawn(async { drop(fedimintd) });
81                }
82                while (set.join_next().await).is_some() {}
83            });
84        });
85    }
86}
87/// `fedimint-cli` instance (basically path with client state: config + db)
88#[derive(Clone)]
89pub struct Client {
90    name: String,
91}
92
93impl Client {
94    fn clients_dir() -> PathBuf {
95        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
96            .expect("FM_DATA_DIR_ENV not set")
97            .parse()
98            .expect("FM_DATA_DIR_ENV invalid");
99        data_dir.join("clients")
100    }
101
102    fn client_dir(&self) -> PathBuf {
103        Self::clients_dir().join(&self.name)
104    }
105
106    pub fn client_name_lock(name: &str) -> Result<FileLock> {
107        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
108        let file_lock = std::fs::OpenOptions::new()
109            .write(true)
110            .create(true)
111            .truncate(true)
112            .open(&lock_path)
113            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
114
115        fs_lock::FileLock::new_exclusive(file_lock)
116            .with_context(|| format!("Failed to lock {}", lock_path.display()))
117    }
118
119    /// Create a [`Client`] that starts with a fresh state.
120    pub async fn create(name: impl ToString) -> Result<Client> {
121        let name = name.to_string();
122        spawn_blocking(move || {
123            let _lock = Self::client_name_lock(&name);
124            for i in 0u64.. {
125                let client = Self {
126                    name: format!("{name}-{i}"),
127                };
128
129                if !client.client_dir().exists() {
130                    std::fs::create_dir_all(client.client_dir())?;
131                    return Ok(client);
132                }
133            }
134            unreachable!()
135        })
136        .await?
137    }
138
139    /// Open or create a [`Client`] that starts with a fresh state.
140    pub fn open_or_create(name: &str) -> Result<Client> {
141        block_in_place(|| {
142            let _lock = Self::client_name_lock(name);
143            let client = Self {
144                name: format!("{name}-0"),
145            };
146            if !client.client_dir().exists() {
147                std::fs::create_dir_all(client.client_dir())?;
148            }
149            Ok(client)
150        })
151    }
152
153    /// Client to join a federation
154    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
155        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
156        cmd!(self, "join-federation", invite_code).run().await?;
157
158        Ok(())
159    }
160
161    /// Client to join a federation with a restore procedure
162    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
163        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
164        cmd!(
165            self,
166            "restore",
167            "--invite-code",
168            invite_code,
169            "--mnemonic",
170            mnemonic
171        )
172        .run()
173        .await?;
174
175        Ok(())
176    }
177
178    /// Client to join a federation
179    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
180        let restored = Self::open_or_create(name)?;
181
182        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
183            .as_str()
184            .unwrap()
185            .to_owned();
186
187        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
188        cmd!(
189            restored,
190            "restore",
191            "--invite-code",
192            invite_code,
193            "--mnemonic",
194            mnemonic
195        )
196        .run()
197        .await?;
198
199        Ok(restored)
200    }
201
202    /// Create a [`Client`] that starts with a state that is a copy of
203    /// of another one.
204    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
205        let new = Client::create(name).await?;
206
207        cmd!(
208            "cp",
209            "-R",
210            self.client_dir().join("client.db").display(),
211            new.client_dir().display()
212        )
213        .run()
214        .await?;
215
216        Ok(new)
217    }
218
219    pub async fn balance(&self) -> Result<u64> {
220        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
221            .as_u64()
222            .unwrap())
223    }
224
225    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
226        let deposit = cmd!(self, "deposit-address").out_json().await?;
227        Ok((
228            deposit["address"].as_str().unwrap().to_string(),
229            deposit["operation_id"].as_str().unwrap().to_string(),
230        ))
231    }
232
233    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
234        cmd!(self, "await-deposit", operation_id).run().await
235    }
236
237    pub fn cmd(&self) -> Command {
238        cmd!(
239            crate::util::get_fedimint_cli_path(),
240            format!("--data-dir={}", self.client_dir().display())
241        )
242    }
243
244    pub fn get_name(&self) -> &str {
245        &self.name
246    }
247
248    /// Returns the current consensus session count
249    pub async fn get_session_count(&self) -> Result<u64> {
250        cmd!(self, "dev", "session-count").out_json().await?["count"]
251            .as_u64()
252            .context("count field wasn't a number")
253    }
254
255    /// Returns once all active state machines complete
256    pub async fn wait_complete(&self) -> Result<()> {
257        cmd!(self, "dev", "wait-complete").run().await
258    }
259
260    /// Returns once the current session completes
261    pub async fn wait_session(&self) -> anyhow::Result<()> {
262        info!("Waiting for a new session");
263        let session_count = self.get_session_count().await?;
264        self.wait_session_outcome(session_count).await?;
265        Ok(())
266    }
267
268    /// Returns once the provided session count completes
269    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
270        let timeout = {
271            let current_session_count = self.get_session_count().await?;
272            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
273            let session_duration_seconds = 180;
274            Duration::from_secs(sessions_to_wait * session_duration_seconds)
275        };
276
277        let start = Instant::now();
278        poll_with_timeout("Waiting for a new session", timeout, || async {
279            info!("Awaiting session outcome {session_count}");
280            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
281                .run()
282                .await
283            {
284                Err(e) => Err(ControlFlow::Continue(e)),
285                Ok(()) => Ok(()),
286            }
287        })
288        .await?;
289
290        let session_found_in = start.elapsed();
291        info!("session found in {session_found_in:?}");
292        Ok(())
293    }
294}
295
296impl Federation {
297    pub async fn new(
298        process_mgr: &ProcessManager,
299        bitcoind: Bitcoind,
300        skip_setup: bool,
301        pre_dkg: bool,
302        // Which of the pre-allocated federations to use (most tests just use single `0` one)
303        fed_index: usize,
304        federation_name: String,
305    ) -> Result<Self> {
306        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
307        let mut members = BTreeMap::new();
308        let mut peer_to_env_vars_map = BTreeMap::new();
309
310        let peers: Vec<_> = num_peers.peer_ids().collect();
311        let params: HashMap<PeerId, ConfigGenParams> =
312            local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
313
314        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
315        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
316
317        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
318        for peer_id in num_peers.peer_ids() {
319            let peer_env_vars = vars::Fedimintd::init(
320                &process_mgr.globals,
321                federation_name.clone(),
322                peer_id,
323                process_mgr
324                    .globals
325                    .fedimintd_overrides
326                    .peer_expect(fed_index, peer_id),
327            )
328            .await?;
329            members.insert(
330                peer_id.to_usize(),
331                Fedimintd::new(
332                    process_mgr,
333                    bitcoind.clone(),
334                    peer_id.to_usize(),
335                    &peer_env_vars,
336                    federation_name.clone(),
337                )
338                .await?,
339            );
340            let admin_client = DynGlobalApi::new_admin_setup(
341                connectors.clone(),
342                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
343                // TODO: will need it somewhere
344                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
345            )?;
346            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
347            admin_clients.insert(peer_id, admin_client);
348            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
349        }
350
351        if !skip_setup && !pre_dkg {
352            // we don't guarantee backwards-compatibility for dkg, so we use the
353            // fedimint-cli version that matches fedimintd
354            let (original_fedimint_cli_path, original_fm_mint_client) =
355                crate::util::use_matching_fedimint_cli_for_dkg().await?;
356
357            run_cli_dkg_v2(params, api_endpoints).await?;
358
359            // we're done with dkg, so we can reset the fedimint-cli version
360            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
361
362            // move configs to config directory
363            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
364            let invite_code_filename_original = "invite-code";
365
366            for peer_env_vars in peer_to_env_vars_map.values() {
367                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
368
369                let invite_code = poll_simple("awaiting-invite-code", || async {
370                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
371                    tokio::fs::read_to_string(&path)
372                        .await
373                        .with_context(|| format!("Awaiting invite code file: {path}"))
374                })
375                .await
376                .context("Awaiting invite code file")?;
377
378                ConnectorType::default()
379                    .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
380                    .await?;
381            }
382
383            // copy over invite-code file to client directory
384            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
385
386            tokio::fs::copy(
387                format!("{peer_data_dir}/{invite_code_filename_original}"),
388                format!("{client_dir}/{invite_code_filename_original}"),
389            )
390            .await
391            .context("copying invite-code file")?;
392
393            // move each guardian's invite-code file to the client's directory
394            // appending the peer id to the end
395            for (index, peer_env_vars) in &peer_to_env_vars_map {
396                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
397
398                let invite_code_filename_indexed =
399                    format!("{invite_code_filename_original}-{index}");
400                tokio::fs::rename(
401                    format!("{peer_data_dir}/{invite_code_filename_original}"),
402                    format!("{client_dir}/{invite_code_filename_indexed}"),
403                )
404                .await
405                .context("moving invite-code file")?;
406            }
407
408            debug!("Moved invite-code files to client data directory");
409        }
410
411        let client = JitTryAnyhow::new_try({
412            move || async move {
413                let client = Client::open_or_create(federation_name.as_str())?;
414                let invite_code = Self::invite_code_static()?;
415                if !skip_setup && !pre_dkg {
416                    cmd!(client, "join-federation", invite_code).run().await?;
417                }
418                Ok(client)
419            }
420        });
421
422        Ok(Self {
423            members,
424            vars: peer_to_env_vars_map,
425            bitcoind,
426            client,
427            connectors,
428        })
429    }
430
431    pub fn client_config(&self) -> Result<ClientConfig> {
432        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
433        load_from_file(&cfg_path)
434    }
435
436    pub fn module_client_config<M: ClientModule>(
437        &self,
438    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
439        self.client_config()?
440            .modules
441            .iter()
442            .find_map(|(module_instance_id, module_cfg)| {
443                if module_cfg.kind == M::kind() {
444                    let decoders = ModuleDecoderRegistry::new(vec![(
445                        *module_instance_id,
446                        M::kind(),
447                        M::decoder(),
448                    )]);
449                    Some(
450                        module_cfg
451                            .config
452                            .clone()
453                            .redecode_raw(&decoders)
454                            .expect("Decoding client cfg failed")
455                            .expect_decoded_ref()
456                            .as_any()
457                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
458                            .cloned()
459                            .context("Cast to module config failed"),
460                    )
461                } else {
462                    None
463                }
464            })
465            .transpose()
466    }
467
468    pub fn deposit_fees(&self) -> Result<Amount> {
469        Ok(self
470            .module_client_config::<WalletClientModule>()?
471            .context("No wallet module found")?
472            .fee_consensus
473            .peg_in_abs)
474    }
475
476    /// Read the invite code from the client data dir
477    pub fn invite_code(&self) -> Result<String> {
478        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
479        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
480        Ok(invite_code)
481    }
482
483    pub fn invite_code_static() -> Result<String> {
484        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486        Ok(invite_code)
487    }
488    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
489        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
490        let name = format!("invite-code-{peer_id}");
491        let invite_code = fs::read_to_string(data_dir.join(name))?;
492        Ok(invite_code)
493    }
494
495    /// Built-in, default, internal [`Client`]
496    ///
497    /// We should be moving away from using it for anything.
498    pub async fn internal_client(&self) -> Result<&Client> {
499        self.client
500            .get_try()
501            .await
502            .context("Internal client joining Federation")
503    }
504
505    /// New [`Client`] that already joined `self`
506    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
507        let client = Client::create(name).await?;
508        client.join_federation(self.invite_code()?).await?;
509        Ok(client)
510    }
511
512    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
513        if self.members.contains_key(&peer) {
514            bail!("fedimintd-{peer} already running");
515        }
516        self.members.insert(
517            peer,
518            Fedimintd::new(
519                process_mgr,
520                self.bitcoind.clone(),
521                peer,
522                &self.vars[&peer],
523                "default".to_string(),
524            )
525            .await?,
526        );
527        Ok(())
528    }
529
530    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
531        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
532            bail!("fedimintd-{peer_id} does not exist");
533        };
534        fedimintd.terminate().await?;
535        Ok(())
536    }
537
538    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
539        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
540            bail!("fedimintd-{peer_id} does not exist");
541        };
542        fedimintd.await_terminated().await?;
543        self.members.remove(&peer_id);
544        Ok(())
545    }
546
547    /// Starts all peers not currently running.
548    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
549        info!("starting all servers");
550        let fed_size = process_mgr.globals.FM_FED_SIZE;
551        for peer_id in 0..fed_size {
552            if self.members.contains_key(&peer_id) {
553                continue;
554            }
555            self.start_server(process_mgr, peer_id).await?;
556        }
557        self.await_all_peers().await?;
558        Ok(())
559    }
560
561    /// Terminates all running peers.
562    pub async fn terminate_all_servers(&mut self) -> Result<()> {
563        info!("terminating all servers");
564        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
565        for peer_id in running_peer_ids {
566            self.terminate_server(peer_id).await?;
567        }
568        Ok(())
569    }
570
571    /// Coordinated shutdown of all peers that restart using the provided
572    /// `bin_path`. Returns `Ok()` once all peers are online.
573    ///
574    /// Staggering the restart more closely simulates upgrades in the wild.
575    pub async fn restart_all_staggered_with_bin(
576        &mut self,
577        process_mgr: &ProcessManager,
578        bin_path: &PathBuf,
579    ) -> Result<()> {
580        let fed_size = process_mgr.globals.FM_FED_SIZE;
581
582        // ensure all peers are online
583        self.start_all_servers(process_mgr).await?;
584
585        // staggered shutdown of peers
586        while self.num_members() > 0 {
587            self.terminate_server(self.num_members() - 1).await?;
588            if self.num_members() > 0 {
589                fedimint_core::task::sleep_in_test(
590                    "waiting to shutdown remaining peers",
591                    Duration::from_secs(10),
592                )
593                .await;
594            }
595        }
596
597        // TODO: Audit that the environment access only happens in single-threaded code.
598        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
599
600        // staggered restart
601        for peer_id in 0..fed_size {
602            self.start_server(process_mgr, peer_id).await?;
603            if peer_id < fed_size - 1 {
604                fedimint_core::task::sleep_in_test(
605                    "waiting to restart remaining peers",
606                    Duration::from_secs(10),
607                )
608                .await;
609            }
610        }
611
612        self.await_all_peers().await?;
613
614        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
615        info!("upgraded fedimintd to version: {}", fedimintd_version);
616        Ok(())
617    }
618
619    pub async fn restart_all_with_bin(
620        &mut self,
621        process_mgr: &ProcessManager,
622        bin_path: &PathBuf,
623    ) -> Result<()> {
624        // get the version we're upgrading to, temporarily updating the fedimintd path
625        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
626        // TODO: Audit that the environment access only happens in single-threaded code.
627        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
628        // TODO: Audit that the environment access only happens in single-threaded code.
629        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
630
631        self.restart_all_staggered_with_bin(process_mgr, bin_path)
632            .await
633    }
634
635    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
636        let fed_size = process_mgr.globals.FM_FED_SIZE;
637        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
638        anyhow::ensure!(
639            fed_size > 3 * offline_nodes,
640            "too many offline nodes ({offline_nodes}) to reach consensus"
641        );
642
643        while self.num_members() > fed_size - offline_nodes {
644            self.terminate_server(self.num_members() - 1).await?;
645        }
646
647        if offline_nodes > 0 {
648            info!(fed_size, offline_nodes, "federation is degraded");
649        }
650        Ok(())
651    }
652
653    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
654        let deposit_fees_msat = self.deposit_fees()?.msats;
655        assert_eq!(
656            deposit_fees_msat % 1000,
657            0,
658            "Deposit fees expected to be whole sats in test suite"
659        );
660        let deposit_fees = deposit_fees_msat / 1000;
661        info!(amount, deposit_fees, "Pegging-in client funds");
662
663        let (address, operation_id) = client.get_deposit_addr().await?;
664
665        self.bitcoind
666            .send_to(address, amount + deposit_fees)
667            .await?;
668        self.bitcoind.mine_blocks(21).await?;
669
670        Ok(operation_id)
671    }
672
673    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
674        let operation_id = self.pegin_client_no_wait(amount, client).await?;
675
676        client.await_deposit(&operation_id).await?;
677        Ok(())
678    }
679
680    /// Initiates multiple peg-ins to the same federation for the set of
681    /// gateways to save on mining blocks in parallel.
682    pub async fn pegin_gateways(
683        &self,
684        amount: u64,
685        gateways: Vec<&super::gatewayd::Gatewayd>,
686    ) -> Result<()> {
687        let deposit_fees_msat = self.deposit_fees()?.msats;
688        assert_eq!(
689            deposit_fees_msat % 1000,
690            0,
691            "Deposit fees expected to be whole sats in test suite"
692        );
693        let deposit_fees = deposit_fees_msat / 1000;
694        info!(amount, deposit_fees, "Pegging-in gateway funds");
695        let fed_id = self.calculate_federation_id();
696        for gw in gateways.clone() {
697            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
698            self.bitcoind
699                .send_to(pegin_addr, amount + deposit_fees)
700                .await?;
701        }
702
703        self.bitcoind.mine_blocks(21).await?;
704        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
705        try_join_all(gateways.into_iter().map(|gw| {
706            poll("gateway pegin", || async {
707                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
708                let block_height: u64 = gw_info["block_height"]
709                    .as_u64()
710                    .expect("Could not parse block height");
711                if bitcoind_block_height != block_height {
712                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
713                        "gateway block height is not synced"
714                    )));
715                }
716
717                let gateway_balance = gw
718                    .ecash_balance(fed_id.clone())
719                    .await
720                    .map_err(ControlFlow::Continue)?;
721                poll_almost_equal!(gateway_balance, amount * 1000)
722            })
723        }))
724        .await?;
725
726        Ok(())
727    }
728
729    /// Initiates multiple peg-outs from the same federation for the set of
730    /// gateways to save on mining blocks in parallel.
731    pub async fn pegout_gateways(
732        &self,
733        amount: u64,
734        gateways: Vec<&super::gatewayd::Gatewayd>,
735    ) -> Result<()> {
736        info!(amount, "Pegging-out gateway funds");
737        let fed_id = self.calculate_federation_id();
738        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
739        for gw in gateways.clone() {
740            let prev_fed_ecash_balance = gw
741                .get_balances()
742                .await?
743                .ecash_balances
744                .into_iter()
745                .find(|fed| fed.federation_id.to_string() == fed_id)
746                .expect("Gateway has not joined federation")
747                .ecash_balance_msats;
748
749            let pegout_address = self.bitcoind.get_new_address().await?;
750            let value = cmd!(
751                gw,
752                "ecash",
753                "pegout",
754                "--federation-id",
755                fed_id,
756                "--amount",
757                amount,
758                "--address",
759                pegout_address
760            )
761            .out_json()
762            .await?;
763            let response: WithdrawResponse = serde_json::from_value(value)?;
764            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
765        }
766        self.bitcoind.mine_blocks(21).await?;
767
768        try_join_all(
769            peg_outs
770                .values()
771                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
772        )
773        .await?;
774
775        for gw in gateways.clone() {
776            let after_fed_ecash_balance = gw
777                .get_balances()
778                .await?
779                .ecash_balances
780                .into_iter()
781                .find(|fed| fed.federation_id.to_string() == fed_id)
782                .expect("Gateway has not joined federation")
783                .ecash_balance_msats;
784
785            let ln_type = gw.ln.ln_type();
786            let prev_balance = peg_outs
787                .get(&ln_type)
788                .expect("peg out does not exist")
789                .0
790                .msats;
791            let fees = peg_outs
792                .get(&ln_type)
793                .expect("peg out does not exist")
794                .1
795                .fees;
796            let total_fee = fees.amount().to_sat() * 1000;
797            crate::util::almost_equal(
798                after_fed_ecash_balance.msats,
799                prev_balance - amount - total_fee,
800                2000,
801            )
802            .map_err(|e| {
803                anyhow::anyhow!(
804                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
805                    e
806                )
807            })?;
808        }
809
810        Ok(())
811    }
812
813    pub fn calculate_federation_id(&self) -> String {
814        self.client_config()
815            .unwrap()
816            .global
817            .calculate_federation_id()
818            .to_string()
819    }
820
821    pub async fn await_block_sync(&self) -> Result<u64> {
822        let finality_delay = self.get_finality_delay()?;
823        let block_count = self.bitcoind.get_block_count().await?;
824        let expected = block_count.saturating_sub(finality_delay.into());
825        cmd!(
826            self.internal_client().await?,
827            "dev",
828            "wait-block-count",
829            expected
830        )
831        .run()
832        .await?;
833        Ok(expected)
834    }
835
836    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
837        let client_config = &self.client_config()?;
838        let wallet_cfg = client_config
839            .modules
840            .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
841            .context("wallet module not found")?
842            .clone()
843            .redecode_raw(&ModuleDecoderRegistry::new([(
844                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
845                fedimint_wallet_client::KIND,
846                fedimint_wallet_client::WalletModuleTypes::decoder(),
847            )]))?;
848        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
849
850        let finality_delay = wallet_cfg.finality_delay;
851        Ok(finality_delay)
852    }
853
854    pub async fn await_gateways_registered(&self) -> Result<()> {
855        let start_time = Instant::now();
856        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
857
858        poll("gateways registered", || async {
859            let num_gateways = cmd!(
860                self.internal_client()
861                    .await
862                    .map_err(ControlFlow::Continue)?,
863                "list-gateways"
864            )
865            .out_json()
866            .await
867            .map_err(ControlFlow::Continue)?
868            .as_array()
869            .context("invalid output")
870            .map_err(ControlFlow::Break)?
871            .len();
872            poll_eq!(num_gateways, 1)
873        })
874        .await?;
875        debug!(target: LOG_DEVIMINT,
876            elapsed_ms = %start_time.elapsed().as_millis(),
877            "Gateways registered");
878        Ok(())
879    }
880
881    pub async fn await_all_peers(&self) -> Result<()> {
882        poll("Waiting for all peers to be online", || async {
883            cmd!(
884                self.internal_client()
885                    .await
886                    .map_err(ControlFlow::Continue)?,
887                "dev",
888                "api",
889                "--module",
890                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
891                "block_count"
892            )
893            .run()
894            .await
895            .map_err(ControlFlow::Continue)?;
896            Ok(())
897        })
898        .await
899    }
900
901    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
902        poll("Waiting for all peers to be online", || async {
903            cmd!(
904                self.internal_client()
905                    .await
906                    .map_err(ControlFlow::Continue)?,
907                "dev",
908                "api",
909                "--peer-id",
910                peer_id,
911                "--module",
912                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
913                "block_count"
914            )
915            .run()
916            .await
917            .map_err(ControlFlow::Continue)?;
918            Ok(())
919        })
920        .await
921    }
922
923    /// Mines enough blocks to finalize mempool transactions, then waits for
924    /// federation to process finalized blocks.
925    ///
926    /// ex:
927    ///   tx submitted to mempool at height 100
928    ///   finality delay = 10
929    ///   mine finality delay blocks + 1 => new height 111
930    ///   tx included in block 101
931    ///   highest finalized height = 111 - 10 = 101
932    pub async fn finalize_mempool_tx(&self) -> Result<()> {
933        let finality_delay = self.get_finality_delay()?;
934        let blocks_to_mine = finality_delay + 1;
935        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
936        self.await_block_sync().await?;
937        Ok(())
938    }
939
940    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
941        self.bitcoind.mine_blocks(blocks).await?;
942        self.await_block_sync().await?;
943        Ok(())
944    }
945
946    pub fn num_members(&self) -> usize {
947        self.members.len()
948    }
949
950    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
951        self.members
952            .keys()
953            .map(|&peer_id| PeerId::from(peer_id as u16))
954    }
955}
956
957#[derive(Clone)]
958pub struct Fedimintd {
959    _bitcoind: Bitcoind,
960    process: ProcessHandle,
961}
962
963impl Fedimintd {
964    pub async fn new(
965        process_mgr: &ProcessManager,
966        bitcoind: Bitcoind,
967        peer_id: usize,
968        env: &vars::Fedimintd,
969        fed_name: String,
970    ) -> Result<Self> {
971        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
972        let process = process_mgr
973            .spawn_daemon(
974                &format!("fedimintd-{fed_name}-{peer_id}"),
975                cmd!(FedimintdCmd).envs(env.vars()),
976            )
977            .await?;
978
979        Ok(Self {
980            _bitcoind: bitcoind,
981            process,
982        })
983    }
984
985    pub async fn terminate(self) -> Result<()> {
986        self.process.terminate().await
987    }
988
989    pub async fn await_terminated(&self) -> Result<()> {
990        self.process.await_terminated().await
991    }
992}
993
994pub async fn run_cli_dkg(
995    params: HashMap<PeerId, ConfigGenParams>,
996    endpoints: BTreeMap<PeerId, String>,
997) -> Result<()> {
998    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
999
1000    debug!(target: LOG_DEVIMINT, "Running DKG");
1001    for endpoint in endpoints.values() {
1002        poll("trying-to-connect-to-peers", || async {
1003            crate::util::FedimintCli
1004                .ws_status(endpoint)
1005                .await
1006                .context("dkg status")
1007                .map_err(ControlFlow::Continue)
1008        })
1009        .await?;
1010    }
1011
1012    debug!(target: LOG_DEVIMINT, "Connected to all peers");
1013
1014    for (peer_id, endpoint) in &endpoints {
1015        let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1016        assert_eq!(
1017            status.server,
1018            ServerStatusLegacy::AwaitingPassword,
1019            "peer_id isn't waiting for password: {peer_id}"
1020        );
1021    }
1022
1023    debug!(target: LOG_DEVIMINT, "Setting passwords");
1024    for (peer_id, endpoint) in &endpoints {
1025        crate::util::FedimintCli
1026            .set_password(auth_for(peer_id), endpoint)
1027            .await?;
1028    }
1029    let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1030    let followers = endpoints
1031        .iter()
1032        .filter(|(id, _)| *id != leader_id)
1033        .collect::<BTreeMap<_, _>>();
1034
1035    debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1036    let leader_name = "leader".to_string();
1037    crate::util::FedimintCli
1038        .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1039        .await?;
1040
1041    let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1042
1043    debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1044    cli_set_config_gen_params(
1045        leader_endpoint,
1046        auth_for(leader_id),
1047        server_gen_params.clone(),
1048    )
1049    .await?;
1050
1051    let followers_names = followers
1052        .keys()
1053        .map(|peer_id| {
1054            (*peer_id, {
1055                // This is to be clear that the name will be unrelated to peer id
1056                let random_string = rand::thread_rng()
1057                    .sample_iter(&rand::distributions::Alphanumeric)
1058                    .take(5)
1059                    .map(char::from)
1060                    .collect::<String>();
1061                format!("random-{random_string}{peer_id}")
1062            })
1063        })
1064        .collect::<BTreeMap<_, _>>();
1065    for (peer_id, endpoint) in &followers {
1066        let name = followers_names
1067            .get(peer_id)
1068            .context("missing follower name")?;
1069        debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1070
1071        crate::util::FedimintCli
1072            .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1073            .await?;
1074
1075        cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1076    }
1077
1078    debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1079    let peers = crate::util::FedimintCli
1080        .get_config_gen_peers(leader_endpoint)
1081        .await?;
1082
1083    let found_names = peers
1084        .into_iter()
1085        .map(|peer| peer.name)
1086        .collect::<HashSet<_>>();
1087    let all_names = followers_names
1088        .values()
1089        .cloned()
1090        .chain(iter::once(leader_name))
1091        .collect::<HashSet<_>>();
1092    assert_eq!(found_names, all_names);
1093
1094    debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1095    cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1096
1097    debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1098    let mut configs = vec![];
1099    for endpoint in endpoints.values() {
1100        let config = crate::util::FedimintCli
1101            .consensus_config_gen_params_legacy(endpoint)
1102            .await?;
1103        configs.push(config);
1104    }
1105    // Confirm all consensus configs are the same
1106    let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1107    consensus.dedup();
1108    assert_eq!(consensus.len(), 1);
1109    // Confirm all peer ids are unique
1110    let ids = configs
1111        .iter()
1112        .map(|p| p.our_current_id)
1113        .collect::<HashSet<_>>();
1114    assert_eq!(ids.len(), endpoints.len());
1115    let dkg_results = endpoints
1116        .iter()
1117        .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1118    debug!(target: LOG_DEVIMINT, "Running DKG");
1119    let (dkg_results, leader_wait_result) = tokio::join!(
1120        join_all(dkg_results),
1121        cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1122    );
1123    for result in dkg_results {
1124        result?;
1125    }
1126    leader_wait_result?;
1127
1128    // verify config hashes equal for all peers
1129    debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1130    let mut hashes = HashSet::new();
1131    for (peer_id, endpoint) in &endpoints {
1132        cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1133        let hash = crate::util::FedimintCli
1134            .get_verify_config_hash(auth_for(peer_id), endpoint)
1135            .await?;
1136        hashes.insert(hash);
1137    }
1138    assert_eq!(hashes.len(), 1);
1139    for (peer_id, endpoint) in &endpoints {
1140        let result = crate::util::FedimintCli
1141            .start_consensus(auth_for(peer_id), endpoint)
1142            .await;
1143        if let Err(e) = result {
1144            tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1145        }
1146        cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1147    }
1148    Ok(())
1149}
1150
1151pub async fn run_cli_dkg_v2(
1152    params: HashMap<PeerId, ConfigGenParams>,
1153    endpoints: BTreeMap<PeerId, String>,
1154) -> Result<()> {
1155    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1156
1157    for (peer, endpoint) in &endpoints {
1158        let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1159            crate::util::FedimintCli
1160                .setup_status(auth_for(peer), endpoint)
1161                .await
1162                .map_err(ControlFlow::Continue)
1163        })
1164        .await
1165        .unwrap();
1166
1167        assert_eq!(status, SetupStatus::AwaitingLocalParams);
1168    }
1169
1170    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1171
1172    let mut connection_info = BTreeMap::new();
1173
1174    for (peer, endpoint) in &endpoints {
1175        let info = if peer.to_usize() == 0 {
1176            crate::util::FedimintCli
1177                .set_local_params_leader(peer, auth_for(peer), endpoint)
1178                .await?
1179        } else {
1180            crate::util::FedimintCli
1181                .set_local_params_follower(peer, auth_for(peer), endpoint)
1182                .await?
1183        };
1184
1185        connection_info.insert(peer, info);
1186    }
1187
1188    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1189
1190    for (peer, info) in connection_info {
1191        for (p, endpoint) in &endpoints {
1192            if p != peer {
1193                crate::util::FedimintCli
1194                    .add_peer(&info, auth_for(p), endpoint)
1195                    .await?;
1196            }
1197        }
1198    }
1199
1200    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1201
1202    for (peer, endpoint) in &endpoints {
1203        crate::util::FedimintCli
1204            .start_dkg(auth_for(peer), endpoint)
1205            .await?;
1206    }
1207
1208    Ok(())
1209}
1210
1211async fn cli_set_config_gen_params(
1212    endpoint: &str,
1213    auth: &ApiAuth,
1214    mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1215) -> Result<()> {
1216    self::config::attach_default_module_init_params(
1217        &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1218        &mut server_gen_params,
1219        Network::Regtest,
1220        10,
1221    );
1222
1223    let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1224
1225    crate::util::FedimintCli
1226        .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1227        .await?;
1228
1229    Ok(())
1230}
1231
1232async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1233    poll(
1234        &format!("waiting-server-status: {expected_status:?}"),
1235        || async {
1236            let server_status = crate::util::FedimintCli
1237                .ws_status(endpoint)
1238                .await
1239                .context("server status")
1240                .map_err(ControlFlow::Continue)?
1241                .server;
1242            if server_status == expected_status {
1243                Ok(())
1244            } else {
1245                Err(ControlFlow::Continue(anyhow!(
1246                    "expected status: {expected_status:?} current status: {server_status:?}"
1247                )))
1248            }
1249        },
1250    )
1251    .await?;
1252    Ok(())
1253}