devimint/
federation.rs

1use std::collections::{BTreeMap, HashMap};
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::api::net::ConnectorType;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::registry::ModuleDecoderRegistry;
18use fedimint_core::module::{ApiAuth, ModuleCommon};
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_server::config::ConfigGenParams;
27use fedimint_testing_core::config::local_config_gen_params;
28use fedimint_testing_core::node_type::LightningNodeType;
29use fedimint_wallet_client::WalletClientModule;
30use fedimint_wallet_client::config::WalletClientConfig;
31use fs_lock::FileLock;
32use futures::future::{join_all, try_join_all};
33use tokio::task::{JoinSet, spawn_blocking};
34use tokio::time::Instant;
35use tracing::{debug, info};
36
37use super::external::Bitcoind;
38use super::util::{Command, ProcessHandle, ProcessManager, cmd};
39use super::vars::utf8;
40use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
41use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
42use crate::version_constants::VERSION_0_10_0_ALPHA;
43use crate::{poll_almost_equal, poll_eq, vars};
44
45// TODO: Are we still using the 3rd port for anything?
46/// Number of ports we allocate for every `fedimintd` instance
47pub const PORTS_PER_FEDIMINTD: u16 = 4;
48/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
49pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
50/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
51pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
52/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
53pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
54/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
55pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
56
57#[derive(Clone)]
58pub struct Federation {
59    // client is only for internal use, use cli commands instead
60    pub members: BTreeMap<usize, Fedimintd>,
61    pub vars: BTreeMap<usize, vars::Fedimintd>,
62    pub bitcoind: Bitcoind,
63
64    /// Built in [`Client`], already joined
65    client: JitTryAnyhow<Client>,
66    #[allow(dead_code)] // Will need it later, maybe
67    connectors: ConnectorRegistry,
68}
69
70impl Drop for Federation {
71    fn drop(&mut self) {
72        block_in_place(|| {
73            block_on(async {
74                let mut set = JoinSet::new();
75
76                while let Some((_id, fedimintd)) = self.members.pop_first() {
77                    set.spawn(async { drop(fedimintd) });
78                }
79                while (set.join_next().await).is_some() {}
80            });
81        });
82    }
83}
84/// `fedimint-cli` instance (basically path with client state: config + db)
85#[derive(Clone)]
86pub struct Client {
87    name: String,
88}
89
90impl Client {
91    fn clients_dir() -> PathBuf {
92        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
93            .expect("FM_DATA_DIR_ENV not set")
94            .parse()
95            .expect("FM_DATA_DIR_ENV invalid");
96        data_dir.join("clients")
97    }
98
99    fn client_dir(&self) -> PathBuf {
100        Self::clients_dir().join(&self.name)
101    }
102
103    pub fn client_name_lock(name: &str) -> Result<FileLock> {
104        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
105        let file_lock = std::fs::OpenOptions::new()
106            .write(true)
107            .create(true)
108            .truncate(true)
109            .open(&lock_path)
110            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
111
112        fs_lock::FileLock::new_exclusive(file_lock)
113            .with_context(|| format!("Failed to lock {}", lock_path.display()))
114    }
115
116    /// Create a [`Client`] that starts with a fresh state.
117    pub async fn create(name: impl ToString) -> Result<Client> {
118        let name = name.to_string();
119        spawn_blocking(move || {
120            let _lock = Self::client_name_lock(&name);
121            for i in 0u64.. {
122                let client = Self {
123                    name: format!("{name}-{i}"),
124                };
125
126                if !client.client_dir().exists() {
127                    std::fs::create_dir_all(client.client_dir())?;
128                    return Ok(client);
129                }
130            }
131            unreachable!()
132        })
133        .await?
134    }
135
136    /// Open or create a [`Client`] that starts with a fresh state.
137    pub fn open_or_create(name: &str) -> Result<Client> {
138        block_in_place(|| {
139            let _lock = Self::client_name_lock(name);
140            let client = Self {
141                name: format!("{name}-0"),
142            };
143            if !client.client_dir().exists() {
144                std::fs::create_dir_all(client.client_dir())?;
145            }
146            Ok(client)
147        })
148    }
149
150    /// Client to join a federation
151    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
152        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
153        cmd!(self, "join-federation", invite_code).run().await?;
154
155        Ok(())
156    }
157
158    /// Client to join a federation with a restore procedure
159    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
160        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
161        cmd!(
162            self,
163            "restore",
164            "--invite-code",
165            invite_code,
166            "--mnemonic",
167            mnemonic
168        )
169        .run()
170        .await?;
171
172        Ok(())
173    }
174
175    /// Client to join a federation
176    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
177        let restored = Self::open_or_create(name)?;
178
179        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
180            .as_str()
181            .unwrap()
182            .to_owned();
183
184        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
185        cmd!(
186            restored,
187            "restore",
188            "--invite-code",
189            invite_code,
190            "--mnemonic",
191            mnemonic
192        )
193        .run()
194        .await?;
195
196        Ok(restored)
197    }
198
199    /// Create a [`Client`] that starts with a state that is a copy of
200    /// of another one.
201    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
202        let new = Client::create(name).await?;
203
204        cmd!(
205            "cp",
206            "-R",
207            self.client_dir().join("client.db").display(),
208            new.client_dir().display()
209        )
210        .run()
211        .await?;
212
213        Ok(new)
214    }
215
216    pub async fn balance(&self) -> Result<u64> {
217        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
218            .as_u64()
219            .unwrap())
220    }
221
222    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
223        let deposit = cmd!(self, "deposit-address").out_json().await?;
224        Ok((
225            deposit["address"].as_str().unwrap().to_string(),
226            deposit["operation_id"].as_str().unwrap().to_string(),
227        ))
228    }
229
230    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
231        cmd!(self, "await-deposit", operation_id).run().await
232    }
233
234    pub fn cmd(&self) -> Command {
235        cmd!(
236            crate::util::get_fedimint_cli_path(),
237            format!("--data-dir={}", self.client_dir().display())
238        )
239    }
240
241    pub fn get_name(&self) -> &str {
242        &self.name
243    }
244
245    /// Returns the current consensus session count
246    pub async fn get_session_count(&self) -> Result<u64> {
247        cmd!(self, "dev", "session-count").out_json().await?["count"]
248            .as_u64()
249            .context("count field wasn't a number")
250    }
251
252    /// Returns once all active state machines complete
253    pub async fn wait_complete(&self) -> Result<()> {
254        cmd!(self, "dev", "wait-complete").run().await
255    }
256
257    /// Returns once the current session completes
258    pub async fn wait_session(&self) -> anyhow::Result<()> {
259        info!("Waiting for a new session");
260        let session_count = self.get_session_count().await?;
261        self.wait_session_outcome(session_count).await?;
262        Ok(())
263    }
264
265    /// Returns once the provided session count completes
266    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
267        let timeout = {
268            let current_session_count = self.get_session_count().await?;
269            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
270            let session_duration_seconds = 180;
271            Duration::from_secs(sessions_to_wait * session_duration_seconds)
272        };
273
274        let start = Instant::now();
275        poll_with_timeout("Waiting for a new session", timeout, || async {
276            info!("Awaiting session outcome {session_count}");
277            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
278                .run()
279                .await
280            {
281                Err(e) => Err(ControlFlow::Continue(e)),
282                Ok(()) => Ok(()),
283            }
284        })
285        .await?;
286
287        let session_found_in = start.elapsed();
288        info!("session found in {session_found_in:?}");
289        Ok(())
290    }
291}
292
293impl Federation {
294    pub async fn new(
295        process_mgr: &ProcessManager,
296        bitcoind: Bitcoind,
297        skip_setup: bool,
298        pre_dkg: bool,
299        // Which of the pre-allocated federations to use (most tests just use single `0` one)
300        fed_index: usize,
301        federation_name: String,
302    ) -> Result<Self> {
303        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
304        let mut members = BTreeMap::new();
305        let mut peer_to_env_vars_map = BTreeMap::new();
306
307        let peers: Vec<_> = num_peers.peer_ids().collect();
308        let params: HashMap<PeerId, ConfigGenParams> =
309            local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
310
311        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
312        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
313
314        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
315        for peer_id in num_peers.peer_ids() {
316            let peer_env_vars = vars::Fedimintd::init(
317                &process_mgr.globals,
318                federation_name.clone(),
319                peer_id,
320                process_mgr
321                    .globals
322                    .fedimintd_overrides
323                    .peer_expect(fed_index, peer_id),
324            )
325            .await?;
326            members.insert(
327                peer_id.to_usize(),
328                Fedimintd::new(
329                    process_mgr,
330                    bitcoind.clone(),
331                    peer_id.to_usize(),
332                    &peer_env_vars,
333                    federation_name.clone(),
334                )
335                .await?,
336            );
337            let admin_client = DynGlobalApi::new_admin_setup(
338                connectors.clone(),
339                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
340                // TODO: will need it somewhere
341                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
342            )?;
343            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
344            admin_clients.insert(peer_id, admin_client);
345            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
346        }
347
348        if !skip_setup && !pre_dkg {
349            // we don't guarantee backwards-compatibility for dkg, so we use the
350            // fedimint-cli version that matches fedimintd
351            let (original_fedimint_cli_path, original_fm_mint_client) =
352                crate::util::use_matching_fedimint_cli_for_dkg().await?;
353
354            run_cli_dkg_v2(params, api_endpoints).await?;
355
356            // we're done with dkg, so we can reset the fedimint-cli version
357            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
358
359            // move configs to config directory
360            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
361            let invite_code_filename_original = "invite-code";
362
363            for peer_env_vars in peer_to_env_vars_map.values() {
364                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
365
366                let invite_code = poll_simple("awaiting-invite-code", || async {
367                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
368                    tokio::fs::read_to_string(&path)
369                        .await
370                        .with_context(|| format!("Awaiting invite code file: {path}"))
371                })
372                .await
373                .context("Awaiting invite code file")?;
374
375                ConnectorType::default()
376                    .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
377                    .await?;
378            }
379
380            // copy over invite-code file to client directory
381            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
382
383            tokio::fs::copy(
384                format!("{peer_data_dir}/{invite_code_filename_original}"),
385                format!("{client_dir}/{invite_code_filename_original}"),
386            )
387            .await
388            .context("copying invite-code file")?;
389
390            // move each guardian's invite-code file to the client's directory
391            // appending the peer id to the end
392            for (index, peer_env_vars) in &peer_to_env_vars_map {
393                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
394
395                let invite_code_filename_indexed =
396                    format!("{invite_code_filename_original}-{index}");
397                tokio::fs::rename(
398                    format!("{peer_data_dir}/{invite_code_filename_original}"),
399                    format!("{client_dir}/{invite_code_filename_indexed}"),
400                )
401                .await
402                .context("moving invite-code file")?;
403            }
404
405            debug!("Moved invite-code files to client data directory");
406        }
407
408        let client = JitTryAnyhow::new_try({
409            move || async move {
410                let client = Client::open_or_create(federation_name.as_str())?;
411                let invite_code = Self::invite_code_static()?;
412                if !skip_setup && !pre_dkg {
413                    cmd!(client, "join-federation", invite_code).run().await?;
414                }
415                Ok(client)
416            }
417        });
418
419        Ok(Self {
420            members,
421            vars: peer_to_env_vars_map,
422            bitcoind,
423            client,
424            connectors,
425        })
426    }
427
428    pub fn client_config(&self) -> Result<ClientConfig> {
429        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
430        load_from_file(&cfg_path)
431    }
432
433    /// Get the module instance ID for a given module kind
434    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
435        self.client_config()?
436            .modules
437            .iter()
438            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
439            .with_context(|| format!("Module kind {kind} not found"))
440    }
441
442    pub fn module_client_config<M: ClientModule>(
443        &self,
444    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
445        self.client_config()?
446            .modules
447            .iter()
448            .find_map(|(module_instance_id, module_cfg)| {
449                if module_cfg.kind == M::kind() {
450                    let decoders = ModuleDecoderRegistry::new(vec![(
451                        *module_instance_id,
452                        M::kind(),
453                        M::decoder(),
454                    )]);
455                    Some(
456                        module_cfg
457                            .config
458                            .clone()
459                            .redecode_raw(&decoders)
460                            .expect("Decoding client cfg failed")
461                            .expect_decoded_ref()
462                            .as_any()
463                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
464                            .cloned()
465                            .context("Cast to module config failed"),
466                    )
467                } else {
468                    None
469                }
470            })
471            .transpose()
472    }
473
474    pub fn deposit_fees(&self) -> Result<Amount> {
475        Ok(self
476            .module_client_config::<WalletClientModule>()?
477            .context("No wallet module found")?
478            .fee_consensus
479            .peg_in_abs)
480    }
481
482    /// Read the invite code from the client data dir
483    pub fn invite_code(&self) -> Result<String> {
484        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486        Ok(invite_code)
487    }
488
489    pub fn invite_code_static() -> Result<String> {
490        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
491        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
492        Ok(invite_code)
493    }
494    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
495        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
496        let name = format!("invite-code-{peer_id}");
497        let invite_code = fs::read_to_string(data_dir.join(name))?;
498        Ok(invite_code)
499    }
500
501    /// Built-in, default, internal [`Client`]
502    ///
503    /// We should be moving away from using it for anything.
504    pub async fn internal_client(&self) -> Result<&Client> {
505        self.client
506            .get_try()
507            .await
508            .context("Internal client joining Federation")
509    }
510
511    /// New [`Client`] that already joined `self`
512    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
513        let client = Client::create(name).await?;
514        client.join_federation(self.invite_code()?).await?;
515        Ok(client)
516    }
517
518    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
519        if self.members.contains_key(&peer) {
520            bail!("fedimintd-{peer} already running");
521        }
522        self.members.insert(
523            peer,
524            Fedimintd::new(
525                process_mgr,
526                self.bitcoind.clone(),
527                peer,
528                &self.vars[&peer],
529                "default".to_string(),
530            )
531            .await?,
532        );
533        Ok(())
534    }
535
536    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
537        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
538            bail!("fedimintd-{peer_id} does not exist");
539        };
540        fedimintd.terminate().await?;
541        Ok(())
542    }
543
544    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
545        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
546            bail!("fedimintd-{peer_id} does not exist");
547        };
548        fedimintd.await_terminated().await?;
549        self.members.remove(&peer_id);
550        Ok(())
551    }
552
553    /// Starts all peers not currently running.
554    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
555        info!("starting all servers");
556        let fed_size = process_mgr.globals.FM_FED_SIZE;
557        for peer_id in 0..fed_size {
558            if self.members.contains_key(&peer_id) {
559                continue;
560            }
561            self.start_server(process_mgr, peer_id).await?;
562        }
563        self.await_all_peers().await?;
564        Ok(())
565    }
566
567    /// Terminates all running peers.
568    pub async fn terminate_all_servers(&mut self) -> Result<()> {
569        info!("terminating all servers");
570        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
571        for peer_id in running_peer_ids {
572            self.terminate_server(peer_id).await?;
573        }
574        Ok(())
575    }
576
577    /// Coordinated shutdown of all peers that restart using the provided
578    /// `bin_path`. Returns `Ok()` once all peers are online.
579    ///
580    /// Staggering the restart more closely simulates upgrades in the wild.
581    pub async fn restart_all_staggered_with_bin(
582        &mut self,
583        process_mgr: &ProcessManager,
584        bin_path: &PathBuf,
585    ) -> Result<()> {
586        let fed_size = process_mgr.globals.FM_FED_SIZE;
587
588        // ensure all peers are online
589        self.start_all_servers(process_mgr).await?;
590
591        // staggered shutdown of peers
592        while self.num_members() > 0 {
593            self.terminate_server(self.num_members() - 1).await?;
594            if self.num_members() > 0 {
595                fedimint_core::task::sleep_in_test(
596                    "waiting to shutdown remaining peers",
597                    Duration::from_secs(10),
598                )
599                .await;
600            }
601        }
602
603        // TODO: Audit that the environment access only happens in single-threaded code.
604        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
605
606        // staggered restart
607        for peer_id in 0..fed_size {
608            self.start_server(process_mgr, peer_id).await?;
609            if peer_id < fed_size - 1 {
610                fedimint_core::task::sleep_in_test(
611                    "waiting to restart remaining peers",
612                    Duration::from_secs(10),
613                )
614                .await;
615            }
616        }
617
618        self.await_all_peers().await?;
619
620        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
621        info!("upgraded fedimintd to version: {}", fedimintd_version);
622        Ok(())
623    }
624
625    pub async fn restart_all_with_bin(
626        &mut self,
627        process_mgr: &ProcessManager,
628        bin_path: &PathBuf,
629    ) -> Result<()> {
630        // get the version we're upgrading to, temporarily updating the fedimintd path
631        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
632        // TODO: Audit that the environment access only happens in single-threaded code.
633        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634        // TODO: Audit that the environment access only happens in single-threaded code.
635        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
636
637        self.restart_all_staggered_with_bin(process_mgr, bin_path)
638            .await
639    }
640
641    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
642        let fed_size = process_mgr.globals.FM_FED_SIZE;
643        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
644        anyhow::ensure!(
645            fed_size > 3 * offline_nodes,
646            "too many offline nodes ({offline_nodes}) to reach consensus"
647        );
648
649        while self.num_members() > fed_size - offline_nodes {
650            self.terminate_server(self.num_members() - 1).await?;
651        }
652
653        if offline_nodes > 0 {
654            info!(fed_size, offline_nodes, "federation is degraded");
655        }
656        Ok(())
657    }
658
659    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
660        let deposit_fees_msat = self.deposit_fees()?.msats;
661        assert_eq!(
662            deposit_fees_msat % 1000,
663            0,
664            "Deposit fees expected to be whole sats in test suite"
665        );
666        let deposit_fees = deposit_fees_msat / 1000;
667        info!(amount, deposit_fees, "Pegging-in client funds");
668
669        let (address, operation_id) = client.get_deposit_addr().await?;
670
671        self.bitcoind
672            .send_to(address, amount + deposit_fees)
673            .await?;
674        self.bitcoind.mine_blocks(21).await?;
675
676        Ok(operation_id)
677    }
678
679    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
680        let operation_id = self.pegin_client_no_wait(amount, client).await?;
681
682        client.await_deposit(&operation_id).await?;
683        Ok(())
684    }
685
686    /// Initiates multiple peg-ins to the same federation for the set of
687    /// gateways to save on mining blocks in parallel.
688    pub async fn pegin_gateways(
689        &self,
690        amount: u64,
691        gateways: Vec<&super::gatewayd::Gatewayd>,
692    ) -> Result<()> {
693        let deposit_fees_msat = self.deposit_fees()?.msats;
694        assert_eq!(
695            deposit_fees_msat % 1000,
696            0,
697            "Deposit fees expected to be whole sats in test suite"
698        );
699        let deposit_fees = deposit_fees_msat / 1000;
700        info!(amount, deposit_fees, "Pegging-in gateway funds");
701        let fed_id = self.calculate_federation_id();
702        for gw in gateways.clone() {
703            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
704            self.bitcoind
705                .send_to(pegin_addr, amount + deposit_fees)
706                .await?;
707        }
708
709        self.bitcoind.mine_blocks(21).await?;
710        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
711        try_join_all(gateways.into_iter().map(|gw| {
712            poll("gateway pegin", || async {
713                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
714
715                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
716                    gw_info["block_height"]
717                        .as_u64()
718                        .expect("Could not parse block height")
719                } else {
720                    gw_info["lightning_info"]["connected"]["block_height"]
721                        .as_u64()
722                        .expect("Could not parse block height")
723                };
724
725                if bitcoind_block_height != block_height {
726                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
727                        "gateway block height is not synced"
728                    )));
729                }
730
731                let gateway_balance = gw
732                    .ecash_balance(fed_id.clone())
733                    .await
734                    .map_err(ControlFlow::Continue)?;
735                poll_almost_equal!(gateway_balance, amount * 1000)
736            })
737        }))
738        .await?;
739
740        Ok(())
741    }
742
743    /// Initiates multiple peg-outs from the same federation for the set of
744    /// gateways to save on mining blocks in parallel.
745    pub async fn pegout_gateways(
746        &self,
747        amount: u64,
748        gateways: Vec<&super::gatewayd::Gatewayd>,
749    ) -> Result<()> {
750        info!(amount, "Pegging-out gateway funds");
751        let fed_id = self.calculate_federation_id();
752        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
753        for gw in gateways.clone() {
754            let prev_fed_ecash_balance = gw
755                .get_balances()
756                .await?
757                .ecash_balances
758                .into_iter()
759                .find(|fed| fed.federation_id.to_string() == fed_id)
760                .expect("Gateway has not joined federation")
761                .ecash_balance_msats;
762
763            let pegout_address = self.bitcoind.get_new_address().await?;
764            let value = cmd!(
765                gw,
766                "ecash",
767                "pegout",
768                "--federation-id",
769                fed_id,
770                "--amount",
771                amount,
772                "--address",
773                pegout_address
774            )
775            .out_json()
776            .await?;
777            let response: WithdrawResponse = serde_json::from_value(value)?;
778            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
779        }
780        self.bitcoind.mine_blocks(21).await?;
781
782        try_join_all(
783            peg_outs
784                .values()
785                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
786        )
787        .await?;
788
789        for gw in gateways.clone() {
790            let after_fed_ecash_balance = gw
791                .get_balances()
792                .await?
793                .ecash_balances
794                .into_iter()
795                .find(|fed| fed.federation_id.to_string() == fed_id)
796                .expect("Gateway has not joined federation")
797                .ecash_balance_msats;
798
799            let ln_type = gw.ln.ln_type();
800            let prev_balance = peg_outs
801                .get(&ln_type)
802                .expect("peg out does not exist")
803                .0
804                .msats;
805            let fees = peg_outs
806                .get(&ln_type)
807                .expect("peg out does not exist")
808                .1
809                .fees;
810            let total_fee = fees.amount().to_sat() * 1000;
811            crate::util::almost_equal(
812                after_fed_ecash_balance.msats,
813                prev_balance - amount - total_fee,
814                2000,
815            )
816            .map_err(|e| {
817                anyhow::anyhow!(
818                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
819                    e
820                )
821            })?;
822        }
823
824        Ok(())
825    }
826
827    pub fn calculate_federation_id(&self) -> String {
828        self.client_config()
829            .unwrap()
830            .global
831            .calculate_federation_id()
832            .to_string()
833    }
834
835    pub async fn await_block_sync(&self) -> Result<u64> {
836        let finality_delay = self.get_finality_delay()?;
837        let block_count = self.bitcoind.get_block_count().await?;
838        let expected = block_count.saturating_sub(finality_delay.into());
839        cmd!(
840            self.internal_client().await?,
841            "dev",
842            "wait-block-count",
843            expected
844        )
845        .run()
846        .await?;
847        Ok(expected)
848    }
849
850    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
851        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
852        let client_config = &self.client_config()?;
853        let wallet_cfg = client_config
854            .modules
855            .get(&wallet_instance_id)
856            .context("wallet module not found")?
857            .clone()
858            .redecode_raw(&ModuleDecoderRegistry::new([(
859                wallet_instance_id,
860                fedimint_wallet_client::KIND,
861                fedimint_wallet_client::WalletModuleTypes::decoder(),
862            )]))?;
863        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
864
865        let finality_delay = wallet_cfg.finality_delay;
866        Ok(finality_delay)
867    }
868
869    pub async fn await_gateways_registered(&self) -> Result<()> {
870        let start_time = Instant::now();
871        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
872
873        poll("gateways registered", || async {
874            let num_gateways = cmd!(
875                self.internal_client()
876                    .await
877                    .map_err(ControlFlow::Continue)?,
878                "list-gateways"
879            )
880            .out_json()
881            .await
882            .map_err(ControlFlow::Continue)?
883            .as_array()
884            .context("invalid output")
885            .map_err(ControlFlow::Break)?
886            .len();
887
888            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
889            // server, and once for the iroh endpoint.
890            let expected_gateways =
891                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
892                    1
893                } else {
894                    2
895                };
896
897            poll_eq!(num_gateways, expected_gateways)
898        })
899        .await?;
900        debug!(target: LOG_DEVIMINT,
901            elapsed_ms = %start_time.elapsed().as_millis(),
902            "Gateways registered");
903        Ok(())
904    }
905
906    pub async fn await_all_peers(&self) -> Result<()> {
907        poll("Waiting for all peers to be online", || async {
908            cmd!(
909                self.internal_client()
910                    .await
911                    .map_err(ControlFlow::Continue)?,
912                "dev",
913                "api",
914                "--module",
915                "wallet",
916                "block_count"
917            )
918            .run()
919            .await
920            .map_err(ControlFlow::Continue)?;
921            Ok(())
922        })
923        .await
924    }
925
926    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
927        poll("Waiting for all peers to be online", || async {
928            cmd!(
929                self.internal_client()
930                    .await
931                    .map_err(ControlFlow::Continue)?,
932                "dev",
933                "api",
934                "--peer-id",
935                peer_id,
936                "--module",
937                "wallet",
938                "block_count"
939            )
940            .run()
941            .await
942            .map_err(ControlFlow::Continue)?;
943            Ok(())
944        })
945        .await
946    }
947
948    /// Mines enough blocks to finalize mempool transactions, then waits for
949    /// federation to process finalized blocks.
950    ///
951    /// ex:
952    ///   tx submitted to mempool at height 100
953    ///   finality delay = 10
954    ///   mine finality delay blocks + 1 => new height 111
955    ///   tx included in block 101
956    ///   highest finalized height = 111 - 10 = 101
957    pub async fn finalize_mempool_tx(&self) -> Result<()> {
958        let finality_delay = self.get_finality_delay()?;
959        let blocks_to_mine = finality_delay + 1;
960        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
961        self.await_block_sync().await?;
962        Ok(())
963    }
964
965    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
966        self.bitcoind.mine_blocks(blocks).await?;
967        self.await_block_sync().await?;
968        Ok(())
969    }
970
971    pub fn num_members(&self) -> usize {
972        self.members.len()
973    }
974
975    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
976        self.members
977            .keys()
978            .map(|&peer_id| PeerId::from(peer_id as u16))
979    }
980}
981
982#[derive(Clone)]
983pub struct Fedimintd {
984    _bitcoind: Bitcoind,
985    process: ProcessHandle,
986}
987
988impl Fedimintd {
989    pub async fn new(
990        process_mgr: &ProcessManager,
991        bitcoind: Bitcoind,
992        peer_id: usize,
993        env: &vars::Fedimintd,
994        fed_name: String,
995    ) -> Result<Self> {
996        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
997        let process = process_mgr
998            .spawn_daemon(
999                &format!("fedimintd-{fed_name}-{peer_id}"),
1000                cmd!(FedimintdCmd).envs(env.vars()),
1001            )
1002            .await?;
1003
1004        Ok(Self {
1005            _bitcoind: bitcoind,
1006            process,
1007        })
1008    }
1009
1010    pub async fn terminate(self) -> Result<()> {
1011        self.process.terminate().await
1012    }
1013
1014    pub async fn await_terminated(&self) -> Result<()> {
1015        self.process.await_terminated().await
1016    }
1017}
1018
1019pub async fn run_cli_dkg_v2(
1020    params: HashMap<PeerId, ConfigGenParams>,
1021    endpoints: BTreeMap<PeerId, String>,
1022) -> Result<()> {
1023    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1024
1025    // Parallelize setup status checks
1026    let status_futures = endpoints.iter().map(|(peer, endpoint)| {
1027        let peer = *peer;
1028        let endpoint = endpoint.clone();
1029        async move {
1030            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1031                crate::util::FedimintCli
1032                    .setup_status(auth_for(&peer), &endpoint)
1033                    .await
1034                    .map_err(ControlFlow::Continue)
1035            })
1036            .await
1037            .unwrap();
1038
1039            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1040        }
1041    });
1042    join_all(status_futures).await;
1043
1044    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1045
1046    // Parallelize setting local parameters
1047    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1048        let peer = *peer;
1049        let endpoint = endpoint.clone();
1050        async move {
1051            let info = if peer.to_usize() == 0 {
1052                crate::util::FedimintCli
1053                    .set_local_params_leader(&peer, auth_for(&peer), &endpoint)
1054                    .await
1055            } else {
1056                crate::util::FedimintCli
1057                    .set_local_params_follower(&peer, auth_for(&peer), &endpoint)
1058                    .await
1059            };
1060            info.map(|i| (peer, i))
1061        }
1062    });
1063    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1064        .await?
1065        .into_iter()
1066        .collect();
1067
1068    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1069
1070    // Parallelize peer addition - flatten the nested loop into a single parallel
1071    // operation
1072    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1073        endpoints
1074            .iter()
1075            .filter(move |(p, _)| *p != peer)
1076            .map(move |(p, endpoint)| {
1077                let p = *p;
1078                let endpoint = endpoint.clone();
1079                let info = info.clone();
1080                async move {
1081                    crate::util::FedimintCli
1082                        .add_peer(&info, auth_for(&p), &endpoint)
1083                        .await
1084                }
1085            })
1086    });
1087    try_join_all(add_peer_futures).await?;
1088
1089    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1090
1091    // Parallelize DKG start
1092    let start_dkg_futures = endpoints.iter().map(|(peer, endpoint)| {
1093        let peer = *peer;
1094        let endpoint = endpoint.clone();
1095        async move {
1096            crate::util::FedimintCli
1097                .start_dkg(auth_for(&peer), &endpoint)
1098                .await
1099        }
1100    });
1101    try_join_all(start_dkg_futures).await?;
1102
1103    Ok(())
1104}