devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::api::net::ConnectorType;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::VERSION_0_10_0_ALPHA;
42use crate::{poll_almost_equal, poll_eq, vars};
43
44// TODO: Are we still using the 3rd port for anything?
45/// Number of ports we allocate for every `fedimintd` instance
46pub const PORTS_PER_FEDIMINTD: u16 = 4;
47/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58    // client is only for internal use, use cli commands instead
59    pub members: BTreeMap<usize, Fedimintd>,
60    pub vars: BTreeMap<usize, vars::Fedimintd>,
61    pub bitcoind: Bitcoind,
62
63    /// Built in [`Client`], already joined
64    client: JitTryAnyhow<Client>,
65    #[allow(dead_code)] // Will need it later, maybe
66    connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70    fn drop(&mut self) {
71        block_in_place(|| {
72            block_on(async {
73                let mut set = JoinSet::new();
74
75                while let Some((_id, fedimintd)) = self.members.pop_first() {
76                    set.spawn(async { drop(fedimintd) });
77                }
78                while (set.join_next().await).is_some() {}
79            });
80        });
81    }
82}
83/// `fedimint-cli` instance (basically path with client state: config + db)
84#[derive(Clone)]
85pub struct Client {
86    name: String,
87}
88
89impl Client {
90    fn clients_dir() -> PathBuf {
91        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92            .expect("FM_DATA_DIR_ENV not set")
93            .parse()
94            .expect("FM_DATA_DIR_ENV invalid");
95        data_dir.join("clients")
96    }
97
98    fn client_dir(&self) -> PathBuf {
99        Self::clients_dir().join(&self.name)
100    }
101
102    pub fn client_name_lock(name: &str) -> Result<FileLock> {
103        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104        let file_lock = std::fs::OpenOptions::new()
105            .write(true)
106            .create(true)
107            .truncate(true)
108            .open(&lock_path)
109            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111        fs_lock::FileLock::new_exclusive(file_lock)
112            .with_context(|| format!("Failed to lock {}", lock_path.display()))
113    }
114
115    /// Create a [`Client`] that starts with a fresh state.
116    pub async fn create(name: impl ToString) -> Result<Client> {
117        let name = name.to_string();
118        spawn_blocking(move || {
119            let _lock = Self::client_name_lock(&name);
120            for i in 0u64.. {
121                let client = Self {
122                    name: format!("{name}-{i}"),
123                };
124
125                if !client.client_dir().exists() {
126                    std::fs::create_dir_all(client.client_dir())?;
127                    return Ok(client);
128                }
129            }
130            unreachable!()
131        })
132        .await?
133    }
134
135    /// Open or create a [`Client`] that starts with a fresh state.
136    pub fn open_or_create(name: &str) -> Result<Client> {
137        block_in_place(|| {
138            let _lock = Self::client_name_lock(name);
139            let client = Self {
140                name: format!("{name}-0"),
141            };
142            if !client.client_dir().exists() {
143                std::fs::create_dir_all(client.client_dir())?;
144            }
145            Ok(client)
146        })
147    }
148
149    /// Client to join a federation
150    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152        cmd!(self, "join-federation", invite_code).run().await?;
153
154        Ok(())
155    }
156
157    /// Client to join a federation with a restore procedure
158    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160        cmd!(
161            self,
162            "restore",
163            "--invite-code",
164            invite_code,
165            "--mnemonic",
166            mnemonic
167        )
168        .run()
169        .await?;
170
171        Ok(())
172    }
173
174    /// Client to join a federation
175    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176        let restored = Self::open_or_create(name)?;
177
178        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179            .as_str()
180            .unwrap()
181            .to_owned();
182
183        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184        cmd!(
185            restored,
186            "restore",
187            "--invite-code",
188            invite_code,
189            "--mnemonic",
190            mnemonic
191        )
192        .run()
193        .await?;
194
195        Ok(restored)
196    }
197
198    /// Create a [`Client`] that starts with a state that is a copy of
199    /// of another one.
200    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201        let new = Client::create(name).await?;
202
203        cmd!(
204            "cp",
205            "-R",
206            self.client_dir().join("client.db").display(),
207            new.client_dir().display()
208        )
209        .run()
210        .await?;
211
212        Ok(new)
213    }
214
215    pub async fn balance(&self) -> Result<u64> {
216        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217            .as_u64()
218            .unwrap())
219    }
220
221    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
222        let deposit = cmd!(self, "deposit-address").out_json().await?;
223        Ok((
224            deposit["address"].as_str().unwrap().to_string(),
225            deposit["operation_id"].as_str().unwrap().to_string(),
226        ))
227    }
228
229    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
230        cmd!(self, "await-deposit", operation_id).run().await
231    }
232
233    pub fn cmd(&self) -> Command {
234        cmd!(
235            crate::util::get_fedimint_cli_path(),
236            format!("--data-dir={}", self.client_dir().display())
237        )
238    }
239
240    pub fn get_name(&self) -> &str {
241        &self.name
242    }
243
244    /// Returns the current consensus session count
245    pub async fn get_session_count(&self) -> Result<u64> {
246        cmd!(self, "dev", "session-count").out_json().await?["count"]
247            .as_u64()
248            .context("count field wasn't a number")
249    }
250
251    /// Returns once all active state machines complete
252    pub async fn wait_complete(&self) -> Result<()> {
253        cmd!(self, "dev", "wait-complete").run().await
254    }
255
256    /// Returns once the current session completes
257    pub async fn wait_session(&self) -> anyhow::Result<()> {
258        info!("Waiting for a new session");
259        let session_count = self.get_session_count().await?;
260        self.wait_session_outcome(session_count).await?;
261        Ok(())
262    }
263
264    /// Returns once the provided session count completes
265    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
266        let timeout = {
267            let current_session_count = self.get_session_count().await?;
268            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
269            let session_duration_seconds = 180;
270            Duration::from_secs(sessions_to_wait * session_duration_seconds)
271        };
272
273        let start = Instant::now();
274        poll_with_timeout("Waiting for a new session", timeout, || async {
275            info!("Awaiting session outcome {session_count}");
276            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
277                .run()
278                .await
279            {
280                Err(e) => Err(ControlFlow::Continue(e)),
281                Ok(()) => Ok(()),
282            }
283        })
284        .await?;
285
286        let session_found_in = start.elapsed();
287        info!("session found in {session_found_in:?}");
288        Ok(())
289    }
290}
291
292impl Federation {
293    pub async fn new(
294        process_mgr: &ProcessManager,
295        bitcoind: Bitcoind,
296        skip_setup: bool,
297        pre_dkg: bool,
298        // Which of the pre-allocated federations to use (most tests just use single `0` one)
299        fed_index: usize,
300        federation_name: String,
301    ) -> Result<Self> {
302        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
303        let mut members = BTreeMap::new();
304        let mut peer_to_env_vars_map = BTreeMap::new();
305
306        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
307        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
308
309        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
310        for peer_id in num_peers.peer_ids() {
311            let peer_env_vars = vars::Fedimintd::init(
312                &process_mgr.globals,
313                federation_name.clone(),
314                peer_id,
315                process_mgr
316                    .globals
317                    .fedimintd_overrides
318                    .peer_expect(fed_index, peer_id),
319            )
320            .await?;
321            members.insert(
322                peer_id.to_usize(),
323                Fedimintd::new(
324                    process_mgr,
325                    bitcoind.clone(),
326                    peer_id.to_usize(),
327                    &peer_env_vars,
328                    federation_name.clone(),
329                )
330                .await?,
331            );
332            let admin_client = DynGlobalApi::new_admin_setup(
333                connectors.clone(),
334                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
335                // TODO: will need it somewhere
336                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
337            )?;
338            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
339            admin_clients.insert(peer_id, admin_client);
340            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
341        }
342
343        if !skip_setup && !pre_dkg {
344            // we don't guarantee backwards-compatibility for dkg, so we use the
345            // fedimint-cli version that matches fedimintd
346            let (original_fedimint_cli_path, original_fm_mint_client) =
347                crate::util::use_matching_fedimint_cli_for_dkg().await?;
348
349            run_cli_dkg_v2(api_endpoints).await?;
350
351            // we're done with dkg, so we can reset the fedimint-cli version
352            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
353
354            // move configs to config directory
355            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
356            let invite_code_filename_original = "invite-code";
357
358            for peer_env_vars in peer_to_env_vars_map.values() {
359                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
360
361                let invite_code = poll_simple("awaiting-invite-code", || async {
362                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
363                    tokio::fs::read_to_string(&path)
364                        .await
365                        .with_context(|| format!("Awaiting invite code file: {path}"))
366                })
367                .await
368                .context("Awaiting invite code file")?;
369
370                ConnectorType::default()
371                    .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
372                    .await?;
373            }
374
375            // copy over invite-code file to client directory
376            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
377
378            tokio::fs::copy(
379                format!("{peer_data_dir}/{invite_code_filename_original}"),
380                format!("{client_dir}/{invite_code_filename_original}"),
381            )
382            .await
383            .context("copying invite-code file")?;
384
385            // move each guardian's invite-code file to the client's directory
386            // appending the peer id to the end
387            for (index, peer_env_vars) in &peer_to_env_vars_map {
388                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
389
390                let invite_code_filename_indexed =
391                    format!("{invite_code_filename_original}-{index}");
392                tokio::fs::rename(
393                    format!("{peer_data_dir}/{invite_code_filename_original}"),
394                    format!("{client_dir}/{invite_code_filename_indexed}"),
395                )
396                .await
397                .context("moving invite-code file")?;
398            }
399
400            debug!("Moved invite-code files to client data directory");
401        }
402
403        let client = JitTryAnyhow::new_try({
404            move || async move {
405                let client = Client::open_or_create(federation_name.as_str())?;
406                let invite_code = Self::invite_code_static()?;
407                if !skip_setup && !pre_dkg {
408                    cmd!(client, "join-federation", invite_code).run().await?;
409                }
410                Ok(client)
411            }
412        });
413
414        Ok(Self {
415            members,
416            vars: peer_to_env_vars_map,
417            bitcoind,
418            client,
419            connectors,
420        })
421    }
422
423    pub fn client_config(&self) -> Result<ClientConfig> {
424        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
425        load_from_file(&cfg_path)
426    }
427
428    /// Get the module instance ID for a given module kind
429    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
430        self.client_config()?
431            .modules
432            .iter()
433            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
434            .with_context(|| format!("Module kind {kind} not found"))
435    }
436
437    pub fn module_client_config<M: ClientModule>(
438        &self,
439    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
440        self.client_config()?
441            .modules
442            .iter()
443            .find_map(|(module_instance_id, module_cfg)| {
444                if module_cfg.kind == M::kind() {
445                    let decoders = ModuleDecoderRegistry::new(vec![(
446                        *module_instance_id,
447                        M::kind(),
448                        M::decoder(),
449                    )]);
450                    Some(
451                        module_cfg
452                            .config
453                            .clone()
454                            .redecode_raw(&decoders)
455                            .expect("Decoding client cfg failed")
456                            .expect_decoded_ref()
457                            .as_any()
458                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
459                            .cloned()
460                            .context("Cast to module config failed"),
461                    )
462                } else {
463                    None
464                }
465            })
466            .transpose()
467    }
468
469    pub fn deposit_fees(&self) -> Result<Amount> {
470        Ok(self
471            .module_client_config::<WalletClientModule>()?
472            .context("No wallet module found")?
473            .fee_consensus
474            .peg_in_abs)
475    }
476
477    /// Read the invite code from the client data dir
478    pub fn invite_code(&self) -> Result<String> {
479        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
480        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
481        Ok(invite_code)
482    }
483
484    pub fn invite_code_static() -> Result<String> {
485        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
486        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
487        Ok(invite_code)
488    }
489    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
490        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
491        let name = format!("invite-code-{peer_id}");
492        let invite_code = fs::read_to_string(data_dir.join(name))?;
493        Ok(invite_code)
494    }
495
496    /// Built-in, default, internal [`Client`]
497    ///
498    /// We should be moving away from using it for anything.
499    pub async fn internal_client(&self) -> Result<&Client> {
500        self.client
501            .get_try()
502            .await
503            .context("Internal client joining Federation")
504    }
505
506    /// New [`Client`] that already joined `self`
507    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
508        let client = Client::create(name).await?;
509        client.join_federation(self.invite_code()?).await?;
510        Ok(client)
511    }
512
513    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
514        if self.members.contains_key(&peer) {
515            bail!("fedimintd-{peer} already running");
516        }
517        self.members.insert(
518            peer,
519            Fedimintd::new(
520                process_mgr,
521                self.bitcoind.clone(),
522                peer,
523                &self.vars[&peer],
524                "default".to_string(),
525            )
526            .await?,
527        );
528        Ok(())
529    }
530
531    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
532        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
533            bail!("fedimintd-{peer_id} does not exist");
534        };
535        fedimintd.terminate().await?;
536        Ok(())
537    }
538
539    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
540        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
541            bail!("fedimintd-{peer_id} does not exist");
542        };
543        fedimintd.await_terminated().await?;
544        self.members.remove(&peer_id);
545        Ok(())
546    }
547
548    /// Starts all peers not currently running.
549    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
550        info!("starting all servers");
551        let fed_size = process_mgr.globals.FM_FED_SIZE;
552        for peer_id in 0..fed_size {
553            if self.members.contains_key(&peer_id) {
554                continue;
555            }
556            self.start_server(process_mgr, peer_id).await?;
557        }
558        self.await_all_peers().await?;
559        Ok(())
560    }
561
562    /// Terminates all running peers.
563    pub async fn terminate_all_servers(&mut self) -> Result<()> {
564        info!("terminating all servers");
565        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
566        for peer_id in running_peer_ids {
567            self.terminate_server(peer_id).await?;
568        }
569        Ok(())
570    }
571
572    /// Coordinated shutdown of all peers that restart using the provided
573    /// `bin_path`. Returns `Ok()` once all peers are online.
574    ///
575    /// Staggering the restart more closely simulates upgrades in the wild.
576    pub async fn restart_all_staggered_with_bin(
577        &mut self,
578        process_mgr: &ProcessManager,
579        bin_path: &PathBuf,
580    ) -> Result<()> {
581        let fed_size = process_mgr.globals.FM_FED_SIZE;
582
583        // ensure all peers are online
584        self.start_all_servers(process_mgr).await?;
585
586        // staggered shutdown of peers
587        while self.num_members() > 0 {
588            self.terminate_server(self.num_members() - 1).await?;
589            if self.num_members() > 0 {
590                fedimint_core::task::sleep_in_test(
591                    "waiting to shutdown remaining peers",
592                    Duration::from_secs(10),
593                )
594                .await;
595            }
596        }
597
598        // TODO: Audit that the environment access only happens in single-threaded code.
599        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
600
601        // staggered restart
602        for peer_id in 0..fed_size {
603            self.start_server(process_mgr, peer_id).await?;
604            if peer_id < fed_size - 1 {
605                fedimint_core::task::sleep_in_test(
606                    "waiting to restart remaining peers",
607                    Duration::from_secs(10),
608                )
609                .await;
610            }
611        }
612
613        self.await_all_peers().await?;
614
615        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
616        info!("upgraded fedimintd to version: {}", fedimintd_version);
617        Ok(())
618    }
619
620    pub async fn restart_all_with_bin(
621        &mut self,
622        process_mgr: &ProcessManager,
623        bin_path: &PathBuf,
624    ) -> Result<()> {
625        // get the version we're upgrading to, temporarily updating the fedimintd path
626        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
627        // TODO: Audit that the environment access only happens in single-threaded code.
628        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
629        // TODO: Audit that the environment access only happens in single-threaded code.
630        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
631
632        self.restart_all_staggered_with_bin(process_mgr, bin_path)
633            .await
634    }
635
636    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
637        let fed_size = process_mgr.globals.FM_FED_SIZE;
638        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
639        anyhow::ensure!(
640            fed_size > 3 * offline_nodes,
641            "too many offline nodes ({offline_nodes}) to reach consensus"
642        );
643
644        while self.num_members() > fed_size - offline_nodes {
645            self.terminate_server(self.num_members() - 1).await?;
646        }
647
648        if offline_nodes > 0 {
649            info!(fed_size, offline_nodes, "federation is degraded");
650        }
651        Ok(())
652    }
653
654    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
655        let deposit_fees_msat = self.deposit_fees()?.msats;
656        assert_eq!(
657            deposit_fees_msat % 1000,
658            0,
659            "Deposit fees expected to be whole sats in test suite"
660        );
661        let deposit_fees = deposit_fees_msat / 1000;
662        info!(amount, deposit_fees, "Pegging-in client funds");
663
664        let (address, operation_id) = client.get_deposit_addr().await?;
665
666        self.bitcoind
667            .send_to(address, amount + deposit_fees)
668            .await?;
669        self.bitcoind.mine_blocks(21).await?;
670
671        Ok(operation_id)
672    }
673
674    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
675        let operation_id = self.pegin_client_no_wait(amount, client).await?;
676
677        client.await_deposit(&operation_id).await?;
678        Ok(())
679    }
680
681    /// Initiates multiple peg-ins to the same federation for the set of
682    /// gateways to save on mining blocks in parallel.
683    pub async fn pegin_gateways(
684        &self,
685        amount: u64,
686        gateways: Vec<&super::gatewayd::Gatewayd>,
687    ) -> Result<()> {
688        let deposit_fees_msat = self.deposit_fees()?.msats;
689        assert_eq!(
690            deposit_fees_msat % 1000,
691            0,
692            "Deposit fees expected to be whole sats in test suite"
693        );
694        let deposit_fees = deposit_fees_msat / 1000;
695        info!(amount, deposit_fees, "Pegging-in gateway funds");
696        let fed_id = self.calculate_federation_id();
697        for gw in gateways.clone() {
698            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
699            self.bitcoind
700                .send_to(pegin_addr, amount + deposit_fees)
701                .await?;
702        }
703
704        self.bitcoind.mine_blocks(21).await?;
705        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
706        try_join_all(gateways.into_iter().map(|gw| {
707            poll("gateway pegin", || async {
708                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
709
710                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
711                    gw_info["block_height"]
712                        .as_u64()
713                        .expect("Could not parse block height")
714                } else {
715                    gw_info["lightning_info"]["connected"]["block_height"]
716                        .as_u64()
717                        .expect("Could not parse block height")
718                };
719
720                if bitcoind_block_height != block_height {
721                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
722                        "gateway block height is not synced"
723                    )));
724                }
725
726                let gateway_balance = gw
727                    .ecash_balance(fed_id.clone())
728                    .await
729                    .map_err(ControlFlow::Continue)?;
730                poll_almost_equal!(gateway_balance, amount * 1000)
731            })
732        }))
733        .await?;
734
735        Ok(())
736    }
737
738    /// Initiates multiple peg-outs from the same federation for the set of
739    /// gateways to save on mining blocks in parallel.
740    pub async fn pegout_gateways(
741        &self,
742        amount: u64,
743        gateways: Vec<&super::gatewayd::Gatewayd>,
744    ) -> Result<()> {
745        info!(amount, "Pegging-out gateway funds");
746        let fed_id = self.calculate_federation_id();
747        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
748        for gw in gateways.clone() {
749            let prev_fed_ecash_balance = gw
750                .get_balances()
751                .await?
752                .ecash_balances
753                .into_iter()
754                .find(|fed| fed.federation_id.to_string() == fed_id)
755                .expect("Gateway has not joined federation")
756                .ecash_balance_msats;
757
758            let pegout_address = self.bitcoind.get_new_address().await?;
759            let value = cmd!(
760                gw,
761                "ecash",
762                "pegout",
763                "--federation-id",
764                fed_id,
765                "--amount",
766                amount,
767                "--address",
768                pegout_address
769            )
770            .out_json()
771            .await?;
772            let response: WithdrawResponse = serde_json::from_value(value)?;
773            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
774        }
775        self.bitcoind.mine_blocks(21).await?;
776
777        try_join_all(
778            peg_outs
779                .values()
780                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
781        )
782        .await?;
783
784        for gw in gateways.clone() {
785            let after_fed_ecash_balance = gw
786                .get_balances()
787                .await?
788                .ecash_balances
789                .into_iter()
790                .find(|fed| fed.federation_id.to_string() == fed_id)
791                .expect("Gateway has not joined federation")
792                .ecash_balance_msats;
793
794            let ln_type = gw.ln.ln_type();
795            let prev_balance = peg_outs
796                .get(&ln_type)
797                .expect("peg out does not exist")
798                .0
799                .msats;
800            let fees = peg_outs
801                .get(&ln_type)
802                .expect("peg out does not exist")
803                .1
804                .fees;
805            let total_fee = fees.amount().to_sat() * 1000;
806            crate::util::almost_equal(
807                after_fed_ecash_balance.msats,
808                prev_balance - amount - total_fee,
809                2000,
810            )
811            .map_err(|e| {
812                anyhow::anyhow!(
813                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
814                    e
815                )
816            })?;
817        }
818
819        Ok(())
820    }
821
822    pub fn calculate_federation_id(&self) -> String {
823        self.client_config()
824            .unwrap()
825            .global
826            .calculate_federation_id()
827            .to_string()
828    }
829
830    pub async fn await_block_sync(&self) -> Result<u64> {
831        let finality_delay = self.get_finality_delay()?;
832        let block_count = self.bitcoind.get_block_count().await?;
833        let expected = block_count.saturating_sub(finality_delay.into());
834        cmd!(
835            self.internal_client().await?,
836            "dev",
837            "wait-block-count",
838            expected
839        )
840        .run()
841        .await?;
842        Ok(expected)
843    }
844
845    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
846        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
847        let client_config = &self.client_config()?;
848        let wallet_cfg = client_config
849            .modules
850            .get(&wallet_instance_id)
851            .context("wallet module not found")?
852            .clone()
853            .redecode_raw(&ModuleDecoderRegistry::new([(
854                wallet_instance_id,
855                fedimint_wallet_client::KIND,
856                fedimint_wallet_client::WalletModuleTypes::decoder(),
857            )]))?;
858        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
859
860        let finality_delay = wallet_cfg.finality_delay;
861        Ok(finality_delay)
862    }
863
864    pub async fn await_gateways_registered(&self) -> Result<()> {
865        let start_time = Instant::now();
866        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
867
868        poll("gateways registered", || async {
869            let num_gateways = cmd!(
870                self.internal_client()
871                    .await
872                    .map_err(ControlFlow::Continue)?,
873                "list-gateways"
874            )
875            .out_json()
876            .await
877            .map_err(ControlFlow::Continue)?
878            .as_array()
879            .context("invalid output")
880            .map_err(ControlFlow::Break)?
881            .len();
882
883            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
884            // server, and once for the iroh endpoint.
885            let expected_gateways =
886                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
887                    1
888                } else {
889                    2
890                };
891
892            poll_eq!(num_gateways, expected_gateways)
893        })
894        .await?;
895        debug!(target: LOG_DEVIMINT,
896            elapsed_ms = %start_time.elapsed().as_millis(),
897            "Gateways registered");
898        Ok(())
899    }
900
901    pub async fn await_all_peers(&self) -> Result<()> {
902        poll("Waiting for all peers to be online", || async {
903            cmd!(
904                self.internal_client()
905                    .await
906                    .map_err(ControlFlow::Continue)?,
907                "dev",
908                "api",
909                "--module",
910                "wallet",
911                "block_count"
912            )
913            .run()
914            .await
915            .map_err(ControlFlow::Continue)?;
916            Ok(())
917        })
918        .await
919    }
920
921    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
922        poll("Waiting for all peers to be online", || async {
923            cmd!(
924                self.internal_client()
925                    .await
926                    .map_err(ControlFlow::Continue)?,
927                "dev",
928                "api",
929                "--peer-id",
930                peer_id,
931                "--module",
932                "wallet",
933                "block_count"
934            )
935            .run()
936            .await
937            .map_err(ControlFlow::Continue)?;
938            Ok(())
939        })
940        .await
941    }
942
943    /// Mines enough blocks to finalize mempool transactions, then waits for
944    /// federation to process finalized blocks.
945    ///
946    /// ex:
947    ///   tx submitted to mempool at height 100
948    ///   finality delay = 10
949    ///   mine finality delay blocks + 1 => new height 111
950    ///   tx included in block 101
951    ///   highest finalized height = 111 - 10 = 101
952    pub async fn finalize_mempool_tx(&self) -> Result<()> {
953        let finality_delay = self.get_finality_delay()?;
954        let blocks_to_mine = finality_delay + 1;
955        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
956        self.await_block_sync().await?;
957        Ok(())
958    }
959
960    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
961        self.bitcoind.mine_blocks(blocks).await?;
962        self.await_block_sync().await?;
963        Ok(())
964    }
965
966    pub fn num_members(&self) -> usize {
967        self.members.len()
968    }
969
970    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
971        self.members
972            .keys()
973            .map(|&peer_id| PeerId::from(peer_id as u16))
974    }
975}
976
977#[derive(Clone)]
978pub struct Fedimintd {
979    _bitcoind: Bitcoind,
980    process: ProcessHandle,
981}
982
983impl Fedimintd {
984    pub async fn new(
985        process_mgr: &ProcessManager,
986        bitcoind: Bitcoind,
987        peer_id: usize,
988        env: &vars::Fedimintd,
989        fed_name: String,
990    ) -> Result<Self> {
991        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
992        let process = process_mgr
993            .spawn_daemon(
994                &format!("fedimintd-{fed_name}-{peer_id}"),
995                cmd!(FedimintdCmd).envs(env.vars()),
996            )
997            .await?;
998
999        Ok(Self {
1000            _bitcoind: bitcoind,
1001            process,
1002        })
1003    }
1004
1005    pub async fn terminate(self) -> Result<()> {
1006        self.process.terminate().await
1007    }
1008
1009    pub async fn await_terminated(&self) -> Result<()> {
1010        self.process.await_terminated().await
1011    }
1012}
1013
1014pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1015    // Parallelize setup status checks
1016    let status_futures = endpoints.values().map(|endpoint| {
1017        let endpoint = endpoint.clone();
1018        async move {
1019            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1020                crate::util::FedimintCli
1021                    .setup_status(&API_AUTH, &endpoint)
1022                    .await
1023                    .map_err(ControlFlow::Continue)
1024            })
1025            .await
1026            .unwrap();
1027
1028            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1029        }
1030    });
1031    join_all(status_futures).await;
1032
1033    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1034
1035    // Parallelize setting local parameters
1036    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1037        let peer = *peer;
1038        let endpoint = endpoint.clone();
1039        async move {
1040            let info = if peer.to_usize() == 0 {
1041                crate::util::FedimintCli
1042                    .set_local_params_leader(&peer, &API_AUTH, &endpoint)
1043                    .await
1044            } else {
1045                crate::util::FedimintCli
1046                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1047                    .await
1048            };
1049            info.map(|i| (peer, i))
1050        }
1051    });
1052    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1053        .await?
1054        .into_iter()
1055        .collect();
1056
1057    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1058
1059    // Parallelize peer addition - flatten the nested loop into a single parallel
1060    // operation
1061    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1062        endpoints
1063            .iter()
1064            .filter(move |(p, _)| *p != peer)
1065            .map(move |(_, endpoint)| {
1066                let endpoint = endpoint.clone();
1067                let info = info.clone();
1068                async move {
1069                    crate::util::FedimintCli
1070                        .add_peer(&info, &API_AUTH, &endpoint)
1071                        .await
1072                }
1073            })
1074    });
1075    try_join_all(add_peer_futures).await?;
1076
1077    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1078
1079    // Parallelize DKG start
1080    let start_dkg_futures = endpoints.values().map(|endpoint| {
1081        let endpoint = endpoint.clone();
1082        async move {
1083            crate::util::FedimintCli
1084                .start_dkg(&API_AUTH, &endpoint)
1085                .await
1086        }
1087    });
1088    try_join_all(start_dkg_futures).await?;
1089
1090    Ok(())
1091}