devimint/
federation.rs

1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::time::Duration;
7use std::{env, fs, iter};
8
9use anyhow::{Context, Result, anyhow, bail};
10use bitcoincore_rpc::bitcoin::Network;
11use fedimint_api_client::api::DynGlobalApi;
12use fedimint_client_module::module::ClientModule;
13use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
14use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
15use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
16use fedimint_core::envs::BitcoinRpcConfig;
17use fedimint_core::module::registry::ModuleDecoderRegistry;
18use fedimint_core::module::{ApiAuth, ModuleCommon};
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_server::config::ConfigGenParams;
27use fedimint_testing_core::config::local_config_gen_params;
28use fedimint_testing_core::node_type::LightningNodeType;
29use fedimint_wallet_client::WalletClientModule;
30use fedimint_wallet_client::config::WalletClientConfig;
31use fedimintd::envs::FM_EXTRA_DKG_META_ENV;
32use fs_lock::FileLock;
33use futures::future::{join_all, try_join_all};
34use rand::Rng;
35use tokio::task::{JoinSet, spawn_blocking};
36use tokio::time::Instant;
37use tracing::{debug, info};
38
39use super::external::Bitcoind;
40use super::util::{Command, ProcessHandle, ProcessManager, cmd, parse_map};
41use super::vars::utf8;
42use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
43use crate::util::{FedimintdCmd, poll, poll_with_timeout};
44use crate::version_constants::{VERSION_0_6_0_ALPHA, VERSION_0_7_0_ALPHA};
45use crate::{poll_eq, vars};
46
47// TODO: Are we still using the 3rd port for anything?
48/// Number of ports we allocate for every `fedimintd` instance
49pub const PORTS_PER_FEDIMINTD: u16 = 4;
50/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
51pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
52/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
53pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
54/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
55pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
56/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
57pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
58
59#[derive(Clone)]
60pub struct Federation {
61    // client is only for internal use, use cli commands instead
62    pub members: BTreeMap<usize, Fedimintd>,
63    pub vars: BTreeMap<usize, vars::Fedimintd>,
64    pub bitcoind: Bitcoind,
65
66    /// Built in [`Client`], already joined
67    client: JitTryAnyhow<Client>,
68}
69
70impl Drop for Federation {
71    fn drop(&mut self) {
72        block_in_place(|| {
73            block_on(async {
74                let mut set = JoinSet::new();
75
76                while let Some((_id, fedimintd)) = self.members.pop_first() {
77                    set.spawn(async { drop(fedimintd) });
78                }
79                while (set.join_next().await).is_some() {}
80            });
81        });
82    }
83}
84/// `fedimint-cli` instance (basically path with client state: config + db)
85#[derive(Clone)]
86pub struct Client {
87    name: String,
88}
89
90impl Client {
91    fn clients_dir() -> PathBuf {
92        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
93            .expect("FM_DATA_DIR_ENV not set")
94            .parse()
95            .expect("FM_DATA_DIR_ENV invalid");
96        data_dir.join("clients")
97    }
98
99    fn client_dir(&self) -> PathBuf {
100        Self::clients_dir().join(&self.name)
101    }
102
103    pub fn client_name_lock(name: &str) -> Result<FileLock> {
104        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
105        let file_lock = std::fs::OpenOptions::new()
106            .write(true)
107            .create(true)
108            .truncate(true)
109            .open(&lock_path)
110            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
111
112        fs_lock::FileLock::new_exclusive(file_lock)
113            .with_context(|| format!("Failed to lock {}", lock_path.display()))
114    }
115
116    /// Create a [`Client`] that starts with a fresh state.
117    pub async fn create(name: impl ToString) -> Result<Client> {
118        let name = name.to_string();
119        spawn_blocking(move || {
120            let _lock = Self::client_name_lock(&name);
121            for i in 0u64.. {
122                let client = Self {
123                    name: format!("{name}-{i}"),
124                };
125
126                if !client.client_dir().exists() {
127                    std::fs::create_dir_all(client.client_dir())?;
128                    return Ok(client);
129                }
130            }
131            unreachable!()
132        })
133        .await?
134    }
135
136    /// Open or create a [`Client`] that starts with a fresh state.
137    pub fn open_or_create(name: &str) -> Result<Client> {
138        block_in_place(|| {
139            let _lock = Self::client_name_lock(name);
140            let client = Self {
141                name: format!("{name}-0"),
142            };
143            if !client.client_dir().exists() {
144                std::fs::create_dir_all(client.client_dir())?;
145            }
146            Ok(client)
147        })
148    }
149
150    /// Client to join a federation
151    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
152        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
153        cmd!(self, "join-federation", invite_code).run().await?;
154
155        Ok(())
156    }
157
158    /// Client to join a federation with a restore procedure
159    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
160        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
161        cmd!(
162            self,
163            "restore",
164            "--invite-code",
165            invite_code,
166            "--mnemonic",
167            mnemonic
168        )
169        .run()
170        .await?;
171
172        Ok(())
173    }
174
175    /// Client to join a federation
176    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
177        let restored = Self::open_or_create(name)?;
178
179        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
180            .as_str()
181            .unwrap()
182            .to_owned();
183
184        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
185        cmd!(
186            restored,
187            "restore",
188            "--invite-code",
189            invite_code,
190            "--mnemonic",
191            mnemonic
192        )
193        .run()
194        .await?;
195
196        Ok(restored)
197    }
198
199    /// Create a [`Client`] that starts with a state that is a copy of
200    /// of another one.
201    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
202        let new = Client::create(name).await?;
203
204        cmd!(
205            "cp",
206            "-R",
207            self.client_dir().join("client.db").display(),
208            new.client_dir().display()
209        )
210        .run()
211        .await?;
212
213        Ok(new)
214    }
215
216    pub async fn balance(&self) -> Result<u64> {
217        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
218            .as_u64()
219            .unwrap())
220    }
221
222    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
223        let deposit = cmd!(self, "deposit-address").out_json().await?;
224        Ok((
225            deposit["address"].as_str().unwrap().to_string(),
226            deposit["operation_id"].as_str().unwrap().to_string(),
227        ))
228    }
229
230    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
231        cmd!(self, "await-deposit", operation_id).run().await
232    }
233
234    pub fn cmd(&self) -> Command {
235        cmd!(
236            crate::util::get_fedimint_cli_path(),
237            format!("--data-dir={}", self.client_dir().display())
238        )
239    }
240
241    pub fn get_name(&self) -> &str {
242        &self.name
243    }
244
245    /// Returns the current consensus session count
246    pub async fn get_session_count(&self) -> Result<u64> {
247        cmd!(self, "dev", "session-count").out_json().await?["count"]
248            .as_u64()
249            .context("count field wasn't a number")
250    }
251
252    /// Returns once all active state machines complete
253    pub async fn wait_complete(&self) -> Result<()> {
254        cmd!(self, "dev", "wait-complete").run().await
255    }
256
257    /// Returns once the current session completes
258    pub async fn wait_session(&self) -> anyhow::Result<()> {
259        info!("Waiting for a new session");
260        let session_count = self.get_session_count().await?;
261        self.wait_session_outcome(session_count).await?;
262        Ok(())
263    }
264
265    /// Returns once the provided session count completes
266    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
267        let timeout = {
268            let current_session_count = self.get_session_count().await?;
269            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
270            let session_duration_seconds = 180;
271            Duration::from_secs(sessions_to_wait * session_duration_seconds)
272        };
273
274        let start = Instant::now();
275        poll_with_timeout("Waiting for a new session", timeout, || async {
276            info!("Awaiting session outcome {session_count}");
277            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
278                .run()
279                .await
280            {
281                Err(e) => Err(ControlFlow::Continue(e)),
282                Ok(()) => Ok(()),
283            }
284        })
285        .await?;
286
287        let session_found_in = start.elapsed();
288        info!("session found in {session_found_in:?}");
289        Ok(())
290    }
291}
292
293impl Federation {
294    pub async fn new(
295        process_mgr: &ProcessManager,
296        bitcoind: Bitcoind,
297        skip_setup: bool,
298        pre_dkg: bool,
299        // Which of the pre-allocated federations to use (most tests just use single `0` one)
300        fed_index: usize,
301        federation_name: String,
302    ) -> Result<Self> {
303        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
304        let mut members = BTreeMap::new();
305        let mut peer_to_env_vars_map = BTreeMap::new();
306
307        let peers: Vec<_> = num_peers.peer_ids().collect();
308        let params: HashMap<PeerId, ConfigGenParams> =
309            local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT)?;
310
311        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
312        let mut endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
313        for peer_id in num_peers.peer_ids() {
314            let peer_env_vars = vars::Fedimintd::init(
315                &process_mgr.globals,
316                federation_name.clone(),
317                peer_id,
318                process_mgr
319                    .globals
320                    .fedimintd_overrides
321                    .peer_expect(fed_index, peer_id),
322            )
323            .await?;
324            members.insert(
325                peer_id.to_usize(),
326                Fedimintd::new(
327                    process_mgr,
328                    bitcoind.clone(),
329                    peer_id.to_usize(),
330                    &peer_env_vars,
331                    federation_name.clone(),
332                )
333                .await?,
334            );
335            let admin_client = DynGlobalApi::from_setup_endpoint(
336                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
337                &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
338            )
339            .await?;
340            endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
341            admin_clients.insert(peer_id, admin_client);
342            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
343        }
344
345        if !skip_setup && !pre_dkg {
346            // we don't guarantee backwards-compatibility for dkg, so we use the
347            // fedimint-cli version that matches fedimintd
348            let (original_fedimint_cli_path, original_fm_mint_client) =
349                crate::util::use_matching_fedimint_cli_for_dkg().await?;
350
351            let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
352
353            if fedimint_cli_version >= *VERSION_0_7_0_ALPHA {
354                run_cli_dkg_v2(params, endpoints).await?;
355            } else {
356                run_cli_dkg(params, endpoints).await?;
357            }
358
359            // we're done with dkg, so we can reset the fedimint-cli version
360            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
361
362            // move configs to config directory
363            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
364            let invite_code_filename_original = "invite-code";
365
366            // copy over invite-code file to client directory
367            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
368            tokio::fs::copy(
369                format!("{peer_data_dir}/{invite_code_filename_original}"),
370                format!("{client_dir}/{invite_code_filename_original}"),
371            )
372            .await
373            .context("copying invite-code file")?;
374
375            // move each guardian's invite-code file to the client's directory
376            // appending the peer id to the end
377            for (index, peer_env_vars) in &peer_to_env_vars_map {
378                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
379
380                let invite_code_filename_indexed =
381                    format!("{invite_code_filename_original}-{index}");
382                tokio::fs::rename(
383                    format!("{peer_data_dir}/{invite_code_filename_original}"),
384                    format!("{client_dir}/{invite_code_filename_indexed}"),
385                )
386                .await
387                .context("moving invite-code file")?;
388            }
389            debug!("Moved invite-code files to client data directory");
390        }
391
392        let client = JitTryAnyhow::new_try({
393            move || async move {
394                let client = Client::open_or_create(federation_name.as_str())?;
395                let invite_code = Self::invite_code_static()?;
396                if !skip_setup && !pre_dkg {
397                    cmd!(client, "join-federation", invite_code).run().await?;
398                }
399                Ok(client)
400            }
401        });
402
403        Ok(Self {
404            members,
405            vars: peer_to_env_vars_map,
406            bitcoind,
407            client,
408        })
409    }
410
411    pub fn client_config(&self) -> Result<ClientConfig> {
412        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
413        load_from_file(&cfg_path)
414    }
415
416    pub fn module_client_config<M: ClientModule>(
417        &self,
418    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
419        self.client_config()?
420            .modules
421            .iter()
422            .find_map(|(module_instance_id, module_cfg)| {
423                if module_cfg.kind == M::kind() {
424                    let decoders = ModuleDecoderRegistry::new(vec![(
425                        *module_instance_id,
426                        M::kind(),
427                        M::decoder(),
428                    )]);
429                    Some(
430                        module_cfg
431                            .config
432                            .clone()
433                            .redecode_raw(&decoders)
434                            .expect("Decoding client cfg failed")
435                            .expect_decoded_ref()
436                            .as_any()
437                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
438                            .cloned()
439                            .context("Cast to module config failed"),
440                    )
441                } else {
442                    None
443                }
444            })
445            .transpose()
446    }
447
448    pub fn deposit_fees(&self) -> Result<Amount> {
449        Ok(self
450            .module_client_config::<WalletClientModule>()?
451            .context("No wallet module found")?
452            .fee_consensus
453            .peg_in_abs)
454    }
455
456    /// Read the invite code from the client data dir
457    pub fn invite_code(&self) -> Result<String> {
458        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
459        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
460        Ok(invite_code)
461    }
462
463    pub fn invite_code_static() -> Result<String> {
464        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
465        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
466        Ok(invite_code)
467    }
468    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
469        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
470        let name = format!("invite-code-{peer_id}");
471        let invite_code = fs::read_to_string(data_dir.join(name))?;
472        Ok(invite_code)
473    }
474
475    /// Built-in, default, internal [`Client`]
476    ///
477    /// We should be moving away from using it for anything.
478    pub async fn internal_client(&self) -> Result<&Client> {
479        self.client
480            .get_try()
481            .await
482            .context("Internal client joining Federation")
483    }
484
485    /// New [`Client`] that already joined `self`
486    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
487        let client = Client::create(name).await?;
488        client.join_federation(self.invite_code()?).await?;
489        Ok(client)
490    }
491
492    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
493        if self.members.contains_key(&peer) {
494            bail!("fedimintd-{peer} already running");
495        }
496        self.members.insert(
497            peer,
498            Fedimintd::new(
499                process_mgr,
500                self.bitcoind.clone(),
501                peer,
502                &self.vars[&peer],
503                "default".to_string(),
504            )
505            .await?,
506        );
507        Ok(())
508    }
509
510    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
511        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
512            bail!("fedimintd-{peer_id} does not exist");
513        };
514        fedimintd.terminate().await?;
515        Ok(())
516    }
517
518    /// Starts all peers not currently running.
519    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
520        info!("starting all servers");
521        let fed_size = process_mgr.globals.FM_FED_SIZE;
522        for peer_id in 0..fed_size {
523            if self.members.contains_key(&peer_id) {
524                continue;
525            }
526            self.start_server(process_mgr, peer_id).await?;
527        }
528        self.await_all_peers().await?;
529        Ok(())
530    }
531
532    /// Terminates all running peers.
533    pub async fn terminate_all_servers(&mut self) -> Result<()> {
534        info!("terminating all servers");
535        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
536        for peer_id in running_peer_ids {
537            self.terminate_server(peer_id).await?;
538        }
539        Ok(())
540    }
541
542    /// Coordinated shutdown of all peers that restart using the provided
543    /// `bin_path`. Returns `Ok()` once all peers are online.
544    ///
545    /// Staggering the restart more closely simulates upgrades in the wild.
546    pub async fn restart_all_staggered_with_bin(
547        &mut self,
548        process_mgr: &ProcessManager,
549        bin_path: &PathBuf,
550    ) -> Result<()> {
551        let fed_size = process_mgr.globals.FM_FED_SIZE;
552
553        // ensure all peers are online
554        self.start_all_servers(process_mgr).await?;
555
556        // staggered shutdown of peers
557        while self.num_members() > 0 {
558            self.terminate_server(self.num_members() - 1).await?;
559            if self.num_members() > 0 {
560                fedimint_core::task::sleep_in_test(
561                    "waiting to shutdown remaining peers",
562                    Duration::from_secs(10),
563                )
564                .await;
565            }
566        }
567
568        // TODO: Audit that the environment access only happens in single-threaded code.
569        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
570
571        // staggered restart
572        for peer_id in 0..fed_size {
573            self.start_server(process_mgr, peer_id).await?;
574            if peer_id < fed_size - 1 {
575                fedimint_core::task::sleep_in_test(
576                    "waiting to restart remaining peers",
577                    Duration::from_secs(10),
578                )
579                .await;
580            }
581        }
582
583        self.await_all_peers().await?;
584
585        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
586        info!("upgraded fedimintd to version: {}", fedimintd_version);
587        Ok(())
588    }
589
590    pub async fn restart_all_with_bin(
591        &mut self,
592        process_mgr: &ProcessManager,
593        bin_path: &PathBuf,
594    ) -> Result<()> {
595        // get the version we're upgrading to, temporarily updating the fedimintd path
596        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
597        // TODO: Audit that the environment access only happens in single-threaded code.
598        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
599        // TODO: Audit that the environment access only happens in single-threaded code.
600        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
601
602        self.restart_all_staggered_with_bin(process_mgr, bin_path)
603            .await
604    }
605
606    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
607        let fed_size = process_mgr.globals.FM_FED_SIZE;
608        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
609        anyhow::ensure!(
610            fed_size > 3 * offline_nodes,
611            "too many offline nodes ({offline_nodes}) to reach consensus"
612        );
613
614        while self.num_members() > fed_size - offline_nodes {
615            self.terminate_server(self.num_members() - 1).await?;
616        }
617
618        if offline_nodes > 0 {
619            info!(fed_size, offline_nodes, "federation is degraded");
620        }
621        Ok(())
622    }
623
624    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
625        let deposit_fees_msat = self.deposit_fees()?.msats;
626        assert_eq!(
627            deposit_fees_msat % 1000,
628            0,
629            "Deposit fees expected to be whole sats in test suite"
630        );
631        let deposit_fees = deposit_fees_msat / 1000;
632        info!(amount, deposit_fees, "Pegging-in client funds");
633
634        let (address, operation_id) = client.get_deposit_addr().await?;
635
636        self.bitcoind
637            .send_to(address, amount + deposit_fees)
638            .await?;
639        self.bitcoind.mine_blocks(21).await?;
640
641        Ok(operation_id)
642    }
643
644    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
645        let operation_id = self.pegin_client_no_wait(amount, client).await?;
646
647        client.await_deposit(&operation_id).await?;
648        Ok(())
649    }
650
651    /// Inititates multiple peg-ins to the same federation for the set of
652    /// gateways to save on mining blocks in parallel.
653    pub async fn pegin_gateways(
654        &self,
655        amount: u64,
656        gateways: Vec<&super::gatewayd::Gatewayd>,
657    ) -> Result<()> {
658        let deposit_fees_msat = self.deposit_fees()?.msats;
659        assert_eq!(
660            deposit_fees_msat % 1000,
661            0,
662            "Deposit fees expected to be whole sats in test suite"
663        );
664        let deposit_fees = deposit_fees_msat / 1000;
665        info!(amount, deposit_fees, "Pegging-in gateway funds");
666        let fed_id = self.calculate_federation_id();
667        for gw in gateways.clone() {
668            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
669            self.bitcoind
670                .send_to(pegin_addr, amount + deposit_fees)
671                .await?;
672        }
673
674        self.bitcoind.mine_blocks(21).await?;
675        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
676        try_join_all(gateways.into_iter().map(|gw| {
677            poll("gateway pegin", || async {
678                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
679                let block_height: u64 = gw_info["block_height"]
680                    .as_u64()
681                    .expect("Could not parse block height");
682                if bitcoind_block_height != block_height {
683                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
684                        "gateway block height is not synced"
685                    )));
686                }
687
688                let gateway_balance = gw
689                    .ecash_balance(fed_id.clone())
690                    .await
691                    .map_err(ControlFlow::Continue)?;
692                poll_eq!(gateway_balance, amount * 1000)
693            })
694        }))
695        .await?;
696
697        Ok(())
698    }
699
700    /// Initiates multiple peg-outs from the same federation for the set of
701    /// gateways to save on mining blocks in parallel.
702    pub async fn pegout_gateways(
703        &self,
704        amount: u64,
705        gateways: Vec<&super::gatewayd::Gatewayd>,
706    ) -> Result<()> {
707        info!(amount, "Pegging-out gateway funds");
708        let fed_id = self.calculate_federation_id();
709        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
710        for gw in gateways.clone() {
711            let prev_fed_ecash_balance = gw
712                .get_balances()
713                .await?
714                .ecash_balances
715                .into_iter()
716                .find(|fed| fed.federation_id.to_string() == fed_id)
717                .expect("Gateway has not joined federation")
718                .ecash_balance_msats;
719
720            let pegout_address = self.bitcoind.get_new_address().await?;
721            let value = cmd!(
722                gw,
723                "ecash",
724                "pegout",
725                "--federation-id",
726                fed_id,
727                "--amount",
728                amount,
729                "--address",
730                pegout_address
731            )
732            .out_json()
733            .await?;
734            let response: WithdrawResponse = serde_json::from_value(value)?;
735            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
736        }
737        self.bitcoind.mine_blocks(21).await?;
738
739        try_join_all(
740            peg_outs
741                .values()
742                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
743        )
744        .await?;
745
746        for gw in gateways.clone() {
747            let after_fed_ecash_balance = gw
748                .get_balances()
749                .await?
750                .ecash_balances
751                .into_iter()
752                .find(|fed| fed.federation_id.to_string() == fed_id)
753                .expect("Gateway has not joined federation")
754                .ecash_balance_msats;
755
756            let ln_type = gw.ln.ln_type();
757            let prev_balance = peg_outs
758                .get(&ln_type)
759                .expect("peg out does not exist")
760                .0
761                .msats;
762            let fees = peg_outs
763                .get(&ln_type)
764                .expect("peg out does not exist")
765                .1
766                .fees;
767            let total_fee = fees.amount().to_sat() * 1000;
768            assert_eq!(
769                prev_balance - amount - total_fee,
770                after_fed_ecash_balance.msats,
771                "new balance did not equal prev balance minus withdraw_amount minus fees"
772            );
773        }
774
775        Ok(())
776    }
777
778    pub fn calculate_federation_id(&self) -> String {
779        self.client_config()
780            .unwrap()
781            .global
782            .calculate_federation_id()
783            .to_string()
784    }
785
786    pub async fn await_block_sync(&self) -> Result<u64> {
787        let finality_delay = self.get_finality_delay()?;
788        let block_count = self.bitcoind.get_block_count().await?;
789        let expected = block_count.saturating_sub(finality_delay.into());
790        cmd!(
791            self.internal_client().await?,
792            "dev",
793            "wait-block-count",
794            expected
795        )
796        .run()
797        .await?;
798        Ok(expected)
799    }
800
801    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
802        let client_config = &self.client_config()?;
803        let wallet_cfg = client_config
804            .modules
805            .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
806            .context("wallet module not found")?
807            .clone()
808            .redecode_raw(&ModuleDecoderRegistry::new([(
809                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
810                fedimint_wallet_client::KIND,
811                fedimint_wallet_client::WalletModuleTypes::decoder(),
812            )]))?;
813        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
814
815        let finality_delay = wallet_cfg.finality_delay;
816        Ok(finality_delay)
817    }
818
819    pub async fn await_gateways_registered(&self) -> Result<()> {
820        let start_time = Instant::now();
821        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
822
823        poll("gateways registered", || async {
824            let num_gateways = cmd!(
825                self.internal_client()
826                    .await
827                    .map_err(ControlFlow::Continue)?,
828                "list-gateways"
829            )
830            .out_json()
831            .await
832            .map_err(ControlFlow::Continue)?
833            .as_array()
834            .context("invalid output")
835            .map_err(ControlFlow::Break)?
836            .len();
837            poll_eq!(num_gateways, 1)
838        })
839        .await?;
840        debug!(target: LOG_DEVIMINT,
841            elapsed_ms = %start_time.elapsed().as_millis(),
842            "Gateways registered");
843        Ok(())
844    }
845
846    pub async fn await_all_peers(&self) -> Result<()> {
847        let fedimin_cli_version = crate::util::FedimintCli::version_or_default().await;
848        poll("Waiting for all peers to be online", || async {
849            if fedimin_cli_version < *VERSION_0_6_0_ALPHA {
850                cmd!(
851                    self.internal_client()
852                        .await
853                        .map_err(ControlFlow::Continue)?,
854                    "dev",
855                    "api",
856                    "module_{LEGACY_HARDCODED_INSTANCE_ID_WALLET}_block_count"
857                )
858            } else {
859                cmd!(
860                    self.internal_client()
861                        .await
862                        .map_err(ControlFlow::Continue)?,
863                    "dev",
864                    "api",
865                    "--module",
866                    LEGACY_HARDCODED_INSTANCE_ID_WALLET,
867                    "block_count"
868                )
869            }
870            .run()
871            .await
872            .map_err(ControlFlow::Continue)?;
873            Ok(())
874        })
875        .await
876    }
877
878    /// Mines enough blocks to finalize mempool transactions, then waits for
879    /// federation to process finalized blocks.
880    ///
881    /// ex:
882    ///   tx submitted to mempool at height 100
883    ///   finality delay = 10
884    ///   mine finality delay blocks + 1 => new height 111
885    ///   tx included in block 101
886    ///   highest finalized height = 111 - 10 = 101
887    pub async fn finalize_mempool_tx(&self) -> Result<()> {
888        let finality_delay = self.get_finality_delay()?;
889        let blocks_to_mine = finality_delay + 1;
890        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
891        self.await_block_sync().await?;
892        Ok(())
893    }
894
895    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
896        self.bitcoind.mine_blocks(blocks).await?;
897        self.await_block_sync().await?;
898        Ok(())
899    }
900
901    pub fn num_members(&self) -> usize {
902        self.members.len()
903    }
904
905    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
906        self.members
907            .keys()
908            .map(|&peer_id| PeerId::from(peer_id as u16))
909    }
910}
911
912#[derive(Clone)]
913pub struct Fedimintd {
914    _bitcoind: Bitcoind,
915    process: ProcessHandle,
916}
917
918impl Fedimintd {
919    pub async fn new(
920        process_mgr: &ProcessManager,
921        bitcoind: Bitcoind,
922        peer_id: usize,
923        env: &vars::Fedimintd,
924        fed_name: String,
925    ) -> Result<Self> {
926        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
927        let process = process_mgr
928            .spawn_daemon(
929                &format!("fedimintd-{fed_name}-{peer_id}"),
930                cmd!(FedimintdCmd).envs(env.vars()),
931            )
932            .await?;
933
934        Ok(Self {
935            _bitcoind: bitcoind,
936            process,
937        })
938    }
939
940    pub async fn terminate(self) -> Result<()> {
941        self.process.terminate().await
942    }
943}
944
945pub async fn run_cli_dkg(
946    params: HashMap<PeerId, ConfigGenParams>,
947    endpoints: BTreeMap<PeerId, String>,
948) -> Result<()> {
949    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
950
951    debug!(target: LOG_DEVIMINT, "Running DKG");
952    for endpoint in endpoints.values() {
953        poll("trying-to-connect-to-peers", || async {
954            crate::util::FedimintCli
955                .ws_status(endpoint)
956                .await
957                .context("dkg status")
958                .map_err(ControlFlow::Continue)
959        })
960        .await?;
961    }
962
963    debug!(target: LOG_DEVIMINT, "Connected to all peers");
964
965    for (peer_id, endpoint) in &endpoints {
966        let status = crate::util::FedimintCli.ws_status(endpoint).await?;
967        assert_eq!(
968            status.server,
969            ServerStatusLegacy::AwaitingPassword,
970            "peer_id isn't waiting for password: {peer_id}"
971        );
972    }
973
974    debug!(target: LOG_DEVIMINT, "Setting passwords");
975    for (peer_id, endpoint) in &endpoints {
976        crate::util::FedimintCli
977            .set_password(auth_for(peer_id), endpoint)
978            .await?;
979    }
980    let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
981    let followers = endpoints
982        .iter()
983        .filter(|(id, _)| *id != leader_id)
984        .collect::<BTreeMap<_, _>>();
985
986    debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
987    let leader_name = "leader".to_string();
988    crate::util::FedimintCli
989        .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
990        .await?;
991
992    let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
993
994    debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
995    cli_set_config_gen_params(
996        leader_endpoint,
997        auth_for(leader_id),
998        server_gen_params.clone(),
999    )
1000    .await?;
1001
1002    let followers_names = followers
1003        .keys()
1004        .map(|peer_id| {
1005            (*peer_id, {
1006                // This is to be clear that the name will be unrelated to peer id
1007                let random_string = rand::thread_rng()
1008                    .sample_iter(&rand::distributions::Alphanumeric)
1009                    .take(5)
1010                    .map(char::from)
1011                    .collect::<String>();
1012                format!("random-{random_string}{peer_id}")
1013            })
1014        })
1015        .collect::<BTreeMap<_, _>>();
1016    for (peer_id, endpoint) in &followers {
1017        let name = followers_names
1018            .get(peer_id)
1019            .context("missing follower name")?;
1020        debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1021
1022        crate::util::FedimintCli
1023            .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1024            .await?;
1025
1026        cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1027    }
1028
1029    debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1030    let peers = crate::util::FedimintCli
1031        .get_config_gen_peers(leader_endpoint)
1032        .await?;
1033
1034    let found_names = peers
1035        .into_iter()
1036        .map(|peer| peer.name)
1037        .collect::<HashSet<_>>();
1038    let all_names = followers_names
1039        .values()
1040        .cloned()
1041        .chain(iter::once(leader_name))
1042        .collect::<HashSet<_>>();
1043    assert_eq!(found_names, all_names);
1044
1045    debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1046    cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1047
1048    debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1049    let mut configs = vec![];
1050    for endpoint in endpoints.values() {
1051        let config = crate::util::FedimintCli
1052            .consensus_config_gen_params_legacy(endpoint)
1053            .await?;
1054        configs.push(config);
1055    }
1056    // Confirm all consensus configs are the same
1057    let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1058    consensus.dedup();
1059    assert_eq!(consensus.len(), 1);
1060    // Confirm all peer ids are unique
1061    let ids = configs
1062        .iter()
1063        .map(|p| p.our_current_id)
1064        .collect::<HashSet<_>>();
1065    assert_eq!(ids.len(), endpoints.len());
1066    let dkg_results = endpoints
1067        .iter()
1068        .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1069    debug!(target: LOG_DEVIMINT, "Running DKG");
1070    let (dkg_results, leader_wait_result) = tokio::join!(
1071        join_all(dkg_results),
1072        cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1073    );
1074    for result in dkg_results {
1075        result?;
1076    }
1077    leader_wait_result?;
1078
1079    // verify config hashes equal for all peers
1080    debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1081    let mut hashes = HashSet::new();
1082    for (peer_id, endpoint) in &endpoints {
1083        cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1084        let hash = crate::util::FedimintCli
1085            .get_verify_config_hash(auth_for(peer_id), endpoint)
1086            .await?;
1087        hashes.insert(hash);
1088    }
1089    assert_eq!(hashes.len(), 1);
1090    for (peer_id, endpoint) in &endpoints {
1091        let result = crate::util::FedimintCli
1092            .start_consensus(auth_for(peer_id), endpoint)
1093            .await;
1094        if let Err(e) = result {
1095            tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1096        }
1097        cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1098    }
1099    Ok(())
1100}
1101
1102pub async fn run_cli_dkg_v2(
1103    params: HashMap<PeerId, ConfigGenParams>,
1104    endpoints: BTreeMap<PeerId, String>,
1105) -> Result<()> {
1106    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1107
1108    for (peer, endpoint) in &endpoints {
1109        let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1110            crate::util::FedimintCli
1111                .setup_status(auth_for(peer), endpoint)
1112                .await
1113                .map_err(ControlFlow::Continue)
1114        })
1115        .await
1116        .unwrap();
1117
1118        assert_eq!(status, SetupStatus::AwaitingLocalParams);
1119    }
1120
1121    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1122
1123    let mut connection_info = BTreeMap::new();
1124
1125    for (peer, endpoint) in &endpoints {
1126        let info = if peer.to_usize() == 0 {
1127            crate::util::FedimintCli
1128                .set_local_params_leader(peer, auth_for(peer), endpoint)
1129                .await?
1130        } else {
1131            crate::util::FedimintCli
1132                .set_local_params_follower(peer, auth_for(peer), endpoint)
1133                .await?
1134        };
1135
1136        connection_info.insert(peer, info);
1137    }
1138
1139    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1140
1141    for (peer, info) in connection_info {
1142        for (p, endpoint) in &endpoints {
1143            if p != peer {
1144                crate::util::FedimintCli
1145                    .add_peer(&info, auth_for(p), endpoint)
1146                    .await?;
1147            }
1148        }
1149    }
1150
1151    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1152
1153    for (peer, endpoint) in &endpoints {
1154        crate::util::FedimintCli
1155            .start_dkg(auth_for(peer), endpoint)
1156            .await?;
1157    }
1158
1159    for (peer, endpoint) in &endpoints {
1160        let status = poll("awaiting-setup-status-consensus-is-running", || async {
1161            crate::util::FedimintCli
1162                .setup_status(auth_for(peer), endpoint)
1163                .await
1164                .map_err(ControlFlow::Continue)
1165        })
1166        .await
1167        .unwrap();
1168
1169        assert_eq!(status, SetupStatus::ConsensusIsRunning);
1170    }
1171
1172    debug!(target: LOG_DEVIMINT, "Consensus is running...");
1173
1174    Ok(())
1175}
1176
1177async fn cli_set_config_gen_params(
1178    endpoint: &str,
1179    auth: &ApiAuth,
1180    mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1181) -> Result<()> {
1182    let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
1183    self::config::attach_default_module_init_params(
1184        &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1185        &mut server_gen_params,
1186        Network::Regtest,
1187        10,
1188        &fedimintd_version,
1189    );
1190    // Since we are not actually calling `fedimintd` binary, parse and handle
1191    // `FM_EXTRA_META_DATA` like it would do.
1192    let extra_meta_data = parse_map(
1193        &std::env::var(FM_EXTRA_DKG_META_ENV)
1194            .ok()
1195            .unwrap_or_default(),
1196    )
1197    .with_context(|| format!("Failed to parse {FM_EXTRA_DKG_META_ENV}"))
1198    .expect("Failed");
1199    let meta: BTreeMap<String, String> =
1200        iter::once(("federation_name".to_string(), "testfed".to_string()))
1201            .chain(extra_meta_data)
1202            .collect();
1203
1204    crate::util::FedimintCli
1205        .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1206        .await?;
1207    Ok(())
1208}
1209
1210async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1211    poll(
1212        &format!("waiting-server-status: {expected_status:?}"),
1213        || async {
1214            let server_status = crate::util::FedimintCli
1215                .ws_status(endpoint)
1216                .await
1217                .context("server status")
1218                .map_err(ControlFlow::Continue)?
1219                .server;
1220            if server_status == expected_status {
1221                Ok(())
1222            } else {
1223                Err(ControlFlow::Continue(anyhow!(
1224                    "expected status: {expected_status:?} current status: {server_status:?}"
1225                )))
1226            }
1227        },
1228    )
1229    .await?;
1230    Ok(())
1231}