devimint/
federation.rs

1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::DynGlobalApi;
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_client_module::module::ClientModule;
15use fedimint_connectors::ConnectorRegistry;
16use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
17use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
18use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
19use fedimint_core::envs::BitcoinRpcConfig;
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::ModuleDecoderRegistry;
22use fedimint_core::module::{ApiAuth, ModuleCommon};
23use fedimint_core::runtime::block_in_place;
24use fedimint_core::task::block_on;
25use fedimint_core::task::jit::JitTryAnyhow;
26use fedimint_core::util::SafeUrl;
27use fedimint_core::{Amount, NumPeers, PeerId};
28use fedimint_gateway_common::WithdrawResponse;
29use fedimint_logging::LOG_DEVIMINT;
30use fedimint_server::config::ConfigGenParams;
31use fedimint_testing_core::config::local_config_gen_params;
32use fedimint_testing_core::node_type::LightningNodeType;
33use fedimint_wallet_client::WalletClientModule;
34use fedimint_wallet_client::config::WalletClientConfig;
35use fs_lock::FileLock;
36use futures::future::{join_all, try_join_all};
37use rand::Rng;
38use tokio::task::{JoinSet, spawn_blocking};
39use tokio::time::Instant;
40use tracing::{debug, info};
41
42use super::external::Bitcoind;
43use super::util::{Command, ProcessHandle, ProcessManager, cmd};
44use super::vars::utf8;
45use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
46use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
47use crate::version_constants::VERSION_0_10_0_ALPHA;
48use crate::{poll_almost_equal, poll_eq, vars};
49
50// TODO: Are we still using the 3rd port for anything?
51/// Number of ports we allocate for every `fedimintd` instance
52pub const PORTS_PER_FEDIMINTD: u16 = 4;
53/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
55/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
56pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
57/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
58pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
59/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
60pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
61
62#[derive(Clone)]
63pub struct Federation {
64    // client is only for internal use, use cli commands instead
65    pub members: BTreeMap<usize, Fedimintd>,
66    pub vars: BTreeMap<usize, vars::Fedimintd>,
67    pub bitcoind: Bitcoind,
68
69    /// Built in [`Client`], already joined
70    client: JitTryAnyhow<Client>,
71    #[allow(dead_code)] // Will need it later, maybe
72    connectors: ConnectorRegistry,
73}
74
75impl Drop for Federation {
76    fn drop(&mut self) {
77        block_in_place(|| {
78            block_on(async {
79                let mut set = JoinSet::new();
80
81                while let Some((_id, fedimintd)) = self.members.pop_first() {
82                    set.spawn(async { drop(fedimintd) });
83                }
84                while (set.join_next().await).is_some() {}
85            });
86        });
87    }
88}
89/// `fedimint-cli` instance (basically path with client state: config + db)
90#[derive(Clone)]
91pub struct Client {
92    name: String,
93}
94
95impl Client {
96    fn clients_dir() -> PathBuf {
97        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
98            .expect("FM_DATA_DIR_ENV not set")
99            .parse()
100            .expect("FM_DATA_DIR_ENV invalid");
101        data_dir.join("clients")
102    }
103
104    fn client_dir(&self) -> PathBuf {
105        Self::clients_dir().join(&self.name)
106    }
107
108    pub fn client_name_lock(name: &str) -> Result<FileLock> {
109        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
110        let file_lock = std::fs::OpenOptions::new()
111            .write(true)
112            .create(true)
113            .truncate(true)
114            .open(&lock_path)
115            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
116
117        fs_lock::FileLock::new_exclusive(file_lock)
118            .with_context(|| format!("Failed to lock {}", lock_path.display()))
119    }
120
121    /// Create a [`Client`] that starts with a fresh state.
122    pub async fn create(name: impl ToString) -> Result<Client> {
123        let name = name.to_string();
124        spawn_blocking(move || {
125            let _lock = Self::client_name_lock(&name);
126            for i in 0u64.. {
127                let client = Self {
128                    name: format!("{name}-{i}"),
129                };
130
131                if !client.client_dir().exists() {
132                    std::fs::create_dir_all(client.client_dir())?;
133                    return Ok(client);
134                }
135            }
136            unreachable!()
137        })
138        .await?
139    }
140
141    /// Open or create a [`Client`] that starts with a fresh state.
142    pub fn open_or_create(name: &str) -> Result<Client> {
143        block_in_place(|| {
144            let _lock = Self::client_name_lock(name);
145            let client = Self {
146                name: format!("{name}-0"),
147            };
148            if !client.client_dir().exists() {
149                std::fs::create_dir_all(client.client_dir())?;
150            }
151            Ok(client)
152        })
153    }
154
155    /// Client to join a federation
156    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
157        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
158        cmd!(self, "join-federation", invite_code).run().await?;
159
160        Ok(())
161    }
162
163    /// Client to join a federation with a restore procedure
164    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
165        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
166        cmd!(
167            self,
168            "restore",
169            "--invite-code",
170            invite_code,
171            "--mnemonic",
172            mnemonic
173        )
174        .run()
175        .await?;
176
177        Ok(())
178    }
179
180    /// Client to join a federation
181    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
182        let restored = Self::open_or_create(name)?;
183
184        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
185            .as_str()
186            .unwrap()
187            .to_owned();
188
189        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
190        cmd!(
191            restored,
192            "restore",
193            "--invite-code",
194            invite_code,
195            "--mnemonic",
196            mnemonic
197        )
198        .run()
199        .await?;
200
201        Ok(restored)
202    }
203
204    /// Create a [`Client`] that starts with a state that is a copy of
205    /// of another one.
206    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
207        let new = Client::create(name).await?;
208
209        cmd!(
210            "cp",
211            "-R",
212            self.client_dir().join("client.db").display(),
213            new.client_dir().display()
214        )
215        .run()
216        .await?;
217
218        Ok(new)
219    }
220
221    pub async fn balance(&self) -> Result<u64> {
222        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
223            .as_u64()
224            .unwrap())
225    }
226
227    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
228        let deposit = cmd!(self, "deposit-address").out_json().await?;
229        Ok((
230            deposit["address"].as_str().unwrap().to_string(),
231            deposit["operation_id"].as_str().unwrap().to_string(),
232        ))
233    }
234
235    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
236        cmd!(self, "await-deposit", operation_id).run().await
237    }
238
239    pub fn cmd(&self) -> Command {
240        cmd!(
241            crate::util::get_fedimint_cli_path(),
242            format!("--data-dir={}", self.client_dir().display())
243        )
244    }
245
246    pub fn get_name(&self) -> &str {
247        &self.name
248    }
249
250    /// Returns the current consensus session count
251    pub async fn get_session_count(&self) -> Result<u64> {
252        cmd!(self, "dev", "session-count").out_json().await?["count"]
253            .as_u64()
254            .context("count field wasn't a number")
255    }
256
257    /// Returns once all active state machines complete
258    pub async fn wait_complete(&self) -> Result<()> {
259        cmd!(self, "dev", "wait-complete").run().await
260    }
261
262    /// Returns once the current session completes
263    pub async fn wait_session(&self) -> anyhow::Result<()> {
264        info!("Waiting for a new session");
265        let session_count = self.get_session_count().await?;
266        self.wait_session_outcome(session_count).await?;
267        Ok(())
268    }
269
270    /// Returns once the provided session count completes
271    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
272        let timeout = {
273            let current_session_count = self.get_session_count().await?;
274            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
275            let session_duration_seconds = 180;
276            Duration::from_secs(sessions_to_wait * session_duration_seconds)
277        };
278
279        let start = Instant::now();
280        poll_with_timeout("Waiting for a new session", timeout, || async {
281            info!("Awaiting session outcome {session_count}");
282            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
283                .run()
284                .await
285            {
286                Err(e) => Err(ControlFlow::Continue(e)),
287                Ok(()) => Ok(()),
288            }
289        })
290        .await?;
291
292        let session_found_in = start.elapsed();
293        info!("session found in {session_found_in:?}");
294        Ok(())
295    }
296}
297
298impl Federation {
299    pub async fn new(
300        process_mgr: &ProcessManager,
301        bitcoind: Bitcoind,
302        skip_setup: bool,
303        pre_dkg: bool,
304        // Which of the pre-allocated federations to use (most tests just use single `0` one)
305        fed_index: usize,
306        federation_name: String,
307    ) -> Result<Self> {
308        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
309        let mut members = BTreeMap::new();
310        let mut peer_to_env_vars_map = BTreeMap::new();
311
312        let peers: Vec<_> = num_peers.peer_ids().collect();
313        let params: HashMap<PeerId, ConfigGenParams> =
314            local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT, true)?;
315
316        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
317        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
318
319        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
320        for peer_id in num_peers.peer_ids() {
321            let peer_env_vars = vars::Fedimintd::init(
322                &process_mgr.globals,
323                federation_name.clone(),
324                peer_id,
325                process_mgr
326                    .globals
327                    .fedimintd_overrides
328                    .peer_expect(fed_index, peer_id),
329            )
330            .await?;
331            members.insert(
332                peer_id.to_usize(),
333                Fedimintd::new(
334                    process_mgr,
335                    bitcoind.clone(),
336                    peer_id.to_usize(),
337                    &peer_env_vars,
338                    federation_name.clone(),
339                )
340                .await?,
341            );
342            let admin_client = DynGlobalApi::new_admin_setup(
343                connectors.clone(),
344                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
345                // TODO: will need it somewhere
346                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
347            )?;
348            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
349            admin_clients.insert(peer_id, admin_client);
350            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
351        }
352
353        if !skip_setup && !pre_dkg {
354            // we don't guarantee backwards-compatibility for dkg, so we use the
355            // fedimint-cli version that matches fedimintd
356            let (original_fedimint_cli_path, original_fm_mint_client) =
357                crate::util::use_matching_fedimint_cli_for_dkg().await?;
358
359            run_cli_dkg_v2(params, api_endpoints).await?;
360
361            // we're done with dkg, so we can reset the fedimint-cli version
362            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
363
364            // move configs to config directory
365            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
366            let invite_code_filename_original = "invite-code";
367
368            for peer_env_vars in peer_to_env_vars_map.values() {
369                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
370
371                let invite_code = poll_simple("awaiting-invite-code", || async {
372                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
373                    tokio::fs::read_to_string(&path)
374                        .await
375                        .with_context(|| format!("Awaiting invite code file: {path}"))
376                })
377                .await
378                .context("Awaiting invite code file")?;
379
380                ConnectorType::default()
381                    .download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
382                    .await?;
383            }
384
385            // copy over invite-code file to client directory
386            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
387
388            tokio::fs::copy(
389                format!("{peer_data_dir}/{invite_code_filename_original}"),
390                format!("{client_dir}/{invite_code_filename_original}"),
391            )
392            .await
393            .context("copying invite-code file")?;
394
395            // move each guardian's invite-code file to the client's directory
396            // appending the peer id to the end
397            for (index, peer_env_vars) in &peer_to_env_vars_map {
398                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
399
400                let invite_code_filename_indexed =
401                    format!("{invite_code_filename_original}-{index}");
402                tokio::fs::rename(
403                    format!("{peer_data_dir}/{invite_code_filename_original}"),
404                    format!("{client_dir}/{invite_code_filename_indexed}"),
405                )
406                .await
407                .context("moving invite-code file")?;
408            }
409
410            debug!("Moved invite-code files to client data directory");
411        }
412
413        let client = JitTryAnyhow::new_try({
414            move || async move {
415                let client = Client::open_or_create(federation_name.as_str())?;
416                let invite_code = Self::invite_code_static()?;
417                if !skip_setup && !pre_dkg {
418                    cmd!(client, "join-federation", invite_code).run().await?;
419                }
420                Ok(client)
421            }
422        });
423
424        Ok(Self {
425            members,
426            vars: peer_to_env_vars_map,
427            bitcoind,
428            client,
429            connectors,
430        })
431    }
432
433    pub fn client_config(&self) -> Result<ClientConfig> {
434        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
435        load_from_file(&cfg_path)
436    }
437
438    pub fn module_client_config<M: ClientModule>(
439        &self,
440    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
441        self.client_config()?
442            .modules
443            .iter()
444            .find_map(|(module_instance_id, module_cfg)| {
445                if module_cfg.kind == M::kind() {
446                    let decoders = ModuleDecoderRegistry::new(vec![(
447                        *module_instance_id,
448                        M::kind(),
449                        M::decoder(),
450                    )]);
451                    Some(
452                        module_cfg
453                            .config
454                            .clone()
455                            .redecode_raw(&decoders)
456                            .expect("Decoding client cfg failed")
457                            .expect_decoded_ref()
458                            .as_any()
459                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
460                            .cloned()
461                            .context("Cast to module config failed"),
462                    )
463                } else {
464                    None
465                }
466            })
467            .transpose()
468    }
469
470    pub fn deposit_fees(&self) -> Result<Amount> {
471        Ok(self
472            .module_client_config::<WalletClientModule>()?
473            .context("No wallet module found")?
474            .fee_consensus
475            .peg_in_abs)
476    }
477
478    /// Read the invite code from the client data dir
479    pub fn invite_code(&self) -> Result<String> {
480        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
481        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
482        Ok(invite_code)
483    }
484
485    pub fn invite_code_static() -> Result<String> {
486        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
487        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
488        Ok(invite_code)
489    }
490    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
491        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
492        let name = format!("invite-code-{peer_id}");
493        let invite_code = fs::read_to_string(data_dir.join(name))?;
494        Ok(invite_code)
495    }
496
497    /// Built-in, default, internal [`Client`]
498    ///
499    /// We should be moving away from using it for anything.
500    pub async fn internal_client(&self) -> Result<&Client> {
501        self.client
502            .get_try()
503            .await
504            .context("Internal client joining Federation")
505    }
506
507    /// New [`Client`] that already joined `self`
508    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
509        let client = Client::create(name).await?;
510        client.join_federation(self.invite_code()?).await?;
511        Ok(client)
512    }
513
514    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
515        if self.members.contains_key(&peer) {
516            bail!("fedimintd-{peer} already running");
517        }
518        self.members.insert(
519            peer,
520            Fedimintd::new(
521                process_mgr,
522                self.bitcoind.clone(),
523                peer,
524                &self.vars[&peer],
525                "default".to_string(),
526            )
527            .await?,
528        );
529        Ok(())
530    }
531
532    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
533        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
534            bail!("fedimintd-{peer_id} does not exist");
535        };
536        fedimintd.terminate().await?;
537        Ok(())
538    }
539
540    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
541        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
542            bail!("fedimintd-{peer_id} does not exist");
543        };
544        fedimintd.await_terminated().await?;
545        self.members.remove(&peer_id);
546        Ok(())
547    }
548
549    /// Starts all peers not currently running.
550    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
551        info!("starting all servers");
552        let fed_size = process_mgr.globals.FM_FED_SIZE;
553        for peer_id in 0..fed_size {
554            if self.members.contains_key(&peer_id) {
555                continue;
556            }
557            self.start_server(process_mgr, peer_id).await?;
558        }
559        self.await_all_peers().await?;
560        Ok(())
561    }
562
563    /// Terminates all running peers.
564    pub async fn terminate_all_servers(&mut self) -> Result<()> {
565        info!("terminating all servers");
566        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
567        for peer_id in running_peer_ids {
568            self.terminate_server(peer_id).await?;
569        }
570        Ok(())
571    }
572
573    /// Coordinated shutdown of all peers that restart using the provided
574    /// `bin_path`. Returns `Ok()` once all peers are online.
575    ///
576    /// Staggering the restart more closely simulates upgrades in the wild.
577    pub async fn restart_all_staggered_with_bin(
578        &mut self,
579        process_mgr: &ProcessManager,
580        bin_path: &PathBuf,
581    ) -> Result<()> {
582        let fed_size = process_mgr.globals.FM_FED_SIZE;
583
584        // ensure all peers are online
585        self.start_all_servers(process_mgr).await?;
586
587        // staggered shutdown of peers
588        while self.num_members() > 0 {
589            self.terminate_server(self.num_members() - 1).await?;
590            if self.num_members() > 0 {
591                fedimint_core::task::sleep_in_test(
592                    "waiting to shutdown remaining peers",
593                    Duration::from_secs(10),
594                )
595                .await;
596            }
597        }
598
599        // TODO: Audit that the environment access only happens in single-threaded code.
600        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
601
602        // staggered restart
603        for peer_id in 0..fed_size {
604            self.start_server(process_mgr, peer_id).await?;
605            if peer_id < fed_size - 1 {
606                fedimint_core::task::sleep_in_test(
607                    "waiting to restart remaining peers",
608                    Duration::from_secs(10),
609                )
610                .await;
611            }
612        }
613
614        self.await_all_peers().await?;
615
616        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
617        info!("upgraded fedimintd to version: {}", fedimintd_version);
618        Ok(())
619    }
620
621    pub async fn restart_all_with_bin(
622        &mut self,
623        process_mgr: &ProcessManager,
624        bin_path: &PathBuf,
625    ) -> Result<()> {
626        // get the version we're upgrading to, temporarily updating the fedimintd path
627        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
628        // TODO: Audit that the environment access only happens in single-threaded code.
629        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
630        // TODO: Audit that the environment access only happens in single-threaded code.
631        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
632
633        self.restart_all_staggered_with_bin(process_mgr, bin_path)
634            .await
635    }
636
637    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
638        let fed_size = process_mgr.globals.FM_FED_SIZE;
639        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
640        anyhow::ensure!(
641            fed_size > 3 * offline_nodes,
642            "too many offline nodes ({offline_nodes}) to reach consensus"
643        );
644
645        while self.num_members() > fed_size - offline_nodes {
646            self.terminate_server(self.num_members() - 1).await?;
647        }
648
649        if offline_nodes > 0 {
650            info!(fed_size, offline_nodes, "federation is degraded");
651        }
652        Ok(())
653    }
654
655    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
656        let deposit_fees_msat = self.deposit_fees()?.msats;
657        assert_eq!(
658            deposit_fees_msat % 1000,
659            0,
660            "Deposit fees expected to be whole sats in test suite"
661        );
662        let deposit_fees = deposit_fees_msat / 1000;
663        info!(amount, deposit_fees, "Pegging-in client funds");
664
665        let (address, operation_id) = client.get_deposit_addr().await?;
666
667        self.bitcoind
668            .send_to(address, amount + deposit_fees)
669            .await?;
670        self.bitcoind.mine_blocks(21).await?;
671
672        Ok(operation_id)
673    }
674
675    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
676        let operation_id = self.pegin_client_no_wait(amount, client).await?;
677
678        client.await_deposit(&operation_id).await?;
679        Ok(())
680    }
681
682    /// Initiates multiple peg-ins to the same federation for the set of
683    /// gateways to save on mining blocks in parallel.
684    pub async fn pegin_gateways(
685        &self,
686        amount: u64,
687        gateways: Vec<&super::gatewayd::Gatewayd>,
688    ) -> Result<()> {
689        let deposit_fees_msat = self.deposit_fees()?.msats;
690        assert_eq!(
691            deposit_fees_msat % 1000,
692            0,
693            "Deposit fees expected to be whole sats in test suite"
694        );
695        let deposit_fees = deposit_fees_msat / 1000;
696        info!(amount, deposit_fees, "Pegging-in gateway funds");
697        let fed_id = self.calculate_federation_id();
698        for gw in gateways.clone() {
699            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
700            self.bitcoind
701                .send_to(pegin_addr, amount + deposit_fees)
702                .await?;
703        }
704
705        self.bitcoind.mine_blocks(21).await?;
706        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
707        try_join_all(gateways.into_iter().map(|gw| {
708            poll("gateway pegin", || async {
709                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
710
711                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
712                    gw_info["block_height"]
713                        .as_u64()
714                        .expect("Could not parse block height")
715                } else {
716                    gw_info["lightning_info"]["connected"]["block_height"]
717                        .as_u64()
718                        .expect("Could not parse block height")
719                };
720
721                if bitcoind_block_height != block_height {
722                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
723                        "gateway block height is not synced"
724                    )));
725                }
726
727                let gateway_balance = gw
728                    .ecash_balance(fed_id.clone())
729                    .await
730                    .map_err(ControlFlow::Continue)?;
731                poll_almost_equal!(gateway_balance, amount * 1000)
732            })
733        }))
734        .await?;
735
736        Ok(())
737    }
738
739    /// Initiates multiple peg-outs from the same federation for the set of
740    /// gateways to save on mining blocks in parallel.
741    pub async fn pegout_gateways(
742        &self,
743        amount: u64,
744        gateways: Vec<&super::gatewayd::Gatewayd>,
745    ) -> Result<()> {
746        info!(amount, "Pegging-out gateway funds");
747        let fed_id = self.calculate_federation_id();
748        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
749        for gw in gateways.clone() {
750            let prev_fed_ecash_balance = gw
751                .get_balances()
752                .await?
753                .ecash_balances
754                .into_iter()
755                .find(|fed| fed.federation_id.to_string() == fed_id)
756                .expect("Gateway has not joined federation")
757                .ecash_balance_msats;
758
759            let pegout_address = self.bitcoind.get_new_address().await?;
760            let value = cmd!(
761                gw,
762                "ecash",
763                "pegout",
764                "--federation-id",
765                fed_id,
766                "--amount",
767                amount,
768                "--address",
769                pegout_address
770            )
771            .out_json()
772            .await?;
773            let response: WithdrawResponse = serde_json::from_value(value)?;
774            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
775        }
776        self.bitcoind.mine_blocks(21).await?;
777
778        try_join_all(
779            peg_outs
780                .values()
781                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
782        )
783        .await?;
784
785        for gw in gateways.clone() {
786            let after_fed_ecash_balance = gw
787                .get_balances()
788                .await?
789                .ecash_balances
790                .into_iter()
791                .find(|fed| fed.federation_id.to_string() == fed_id)
792                .expect("Gateway has not joined federation")
793                .ecash_balance_msats;
794
795            let ln_type = gw.ln.ln_type();
796            let prev_balance = peg_outs
797                .get(&ln_type)
798                .expect("peg out does not exist")
799                .0
800                .msats;
801            let fees = peg_outs
802                .get(&ln_type)
803                .expect("peg out does not exist")
804                .1
805                .fees;
806            let total_fee = fees.amount().to_sat() * 1000;
807            crate::util::almost_equal(
808                after_fed_ecash_balance.msats,
809                prev_balance - amount - total_fee,
810                2000,
811            )
812            .map_err(|e| {
813                anyhow::anyhow!(
814                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
815                    e
816                )
817            })?;
818        }
819
820        Ok(())
821    }
822
823    pub fn calculate_federation_id(&self) -> String {
824        self.client_config()
825            .unwrap()
826            .global
827            .calculate_federation_id()
828            .to_string()
829    }
830
831    pub async fn await_block_sync(&self) -> Result<u64> {
832        let finality_delay = self.get_finality_delay()?;
833        let block_count = self.bitcoind.get_block_count().await?;
834        let expected = block_count.saturating_sub(finality_delay.into());
835        cmd!(
836            self.internal_client().await?,
837            "dev",
838            "wait-block-count",
839            expected
840        )
841        .run()
842        .await?;
843        Ok(expected)
844    }
845
846    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
847        let client_config = &self.client_config()?;
848        let wallet_cfg = client_config
849            .modules
850            .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
851            .context("wallet module not found")?
852            .clone()
853            .redecode_raw(&ModuleDecoderRegistry::new([(
854                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
855                fedimint_wallet_client::KIND,
856                fedimint_wallet_client::WalletModuleTypes::decoder(),
857            )]))?;
858        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
859
860        let finality_delay = wallet_cfg.finality_delay;
861        Ok(finality_delay)
862    }
863
864    pub async fn await_gateways_registered(&self) -> Result<()> {
865        let start_time = Instant::now();
866        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
867
868        poll("gateways registered", || async {
869            let num_gateways = cmd!(
870                self.internal_client()
871                    .await
872                    .map_err(ControlFlow::Continue)?,
873                "list-gateways"
874            )
875            .out_json()
876            .await
877            .map_err(ControlFlow::Continue)?
878            .as_array()
879            .context("invalid output")
880            .map_err(ControlFlow::Break)?
881            .len();
882            poll_eq!(num_gateways, 1)
883        })
884        .await?;
885        debug!(target: LOG_DEVIMINT,
886            elapsed_ms = %start_time.elapsed().as_millis(),
887            "Gateways registered");
888        Ok(())
889    }
890
891    pub async fn await_all_peers(&self) -> Result<()> {
892        poll("Waiting for all peers to be online", || async {
893            cmd!(
894                self.internal_client()
895                    .await
896                    .map_err(ControlFlow::Continue)?,
897                "dev",
898                "api",
899                "--module",
900                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
901                "block_count"
902            )
903            .run()
904            .await
905            .map_err(ControlFlow::Continue)?;
906            Ok(())
907        })
908        .await
909    }
910
911    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
912        poll("Waiting for all peers to be online", || async {
913            cmd!(
914                self.internal_client()
915                    .await
916                    .map_err(ControlFlow::Continue)?,
917                "dev",
918                "api",
919                "--peer-id",
920                peer_id,
921                "--module",
922                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
923                "block_count"
924            )
925            .run()
926            .await
927            .map_err(ControlFlow::Continue)?;
928            Ok(())
929        })
930        .await
931    }
932
933    /// Mines enough blocks to finalize mempool transactions, then waits for
934    /// federation to process finalized blocks.
935    ///
936    /// ex:
937    ///   tx submitted to mempool at height 100
938    ///   finality delay = 10
939    ///   mine finality delay blocks + 1 => new height 111
940    ///   tx included in block 101
941    ///   highest finalized height = 111 - 10 = 101
942    pub async fn finalize_mempool_tx(&self) -> Result<()> {
943        let finality_delay = self.get_finality_delay()?;
944        let blocks_to_mine = finality_delay + 1;
945        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
946        self.await_block_sync().await?;
947        Ok(())
948    }
949
950    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
951        self.bitcoind.mine_blocks(blocks).await?;
952        self.await_block_sync().await?;
953        Ok(())
954    }
955
956    pub fn num_members(&self) -> usize {
957        self.members.len()
958    }
959
960    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
961        self.members
962            .keys()
963            .map(|&peer_id| PeerId::from(peer_id as u16))
964    }
965}
966
967#[derive(Clone)]
968pub struct Fedimintd {
969    _bitcoind: Bitcoind,
970    process: ProcessHandle,
971}
972
973impl Fedimintd {
974    pub async fn new(
975        process_mgr: &ProcessManager,
976        bitcoind: Bitcoind,
977        peer_id: usize,
978        env: &vars::Fedimintd,
979        fed_name: String,
980    ) -> Result<Self> {
981        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
982        let process = process_mgr
983            .spawn_daemon(
984                &format!("fedimintd-{fed_name}-{peer_id}"),
985                cmd!(FedimintdCmd).envs(env.vars()),
986            )
987            .await?;
988
989        Ok(Self {
990            _bitcoind: bitcoind,
991            process,
992        })
993    }
994
995    pub async fn terminate(self) -> Result<()> {
996        self.process.terminate().await
997    }
998
999    pub async fn await_terminated(&self) -> Result<()> {
1000        self.process.await_terminated().await
1001    }
1002}
1003
1004pub async fn run_cli_dkg(
1005    params: HashMap<PeerId, ConfigGenParams>,
1006    endpoints: BTreeMap<PeerId, String>,
1007) -> Result<()> {
1008    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1009
1010    debug!(target: LOG_DEVIMINT, "Running DKG");
1011    for endpoint in endpoints.values() {
1012        poll("trying-to-connect-to-peers", || async {
1013            crate::util::FedimintCli
1014                .ws_status(endpoint)
1015                .await
1016                .context("dkg status")
1017                .map_err(ControlFlow::Continue)
1018        })
1019        .await?;
1020    }
1021
1022    debug!(target: LOG_DEVIMINT, "Connected to all peers");
1023
1024    for (peer_id, endpoint) in &endpoints {
1025        let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1026        assert_eq!(
1027            status.server,
1028            ServerStatusLegacy::AwaitingPassword,
1029            "peer_id isn't waiting for password: {peer_id}"
1030        );
1031    }
1032
1033    debug!(target: LOG_DEVIMINT, "Setting passwords");
1034    for (peer_id, endpoint) in &endpoints {
1035        crate::util::FedimintCli
1036            .set_password(auth_for(peer_id), endpoint)
1037            .await?;
1038    }
1039    let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1040    let followers = endpoints
1041        .iter()
1042        .filter(|(id, _)| *id != leader_id)
1043        .collect::<BTreeMap<_, _>>();
1044
1045    debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1046    let leader_name = "leader".to_string();
1047    crate::util::FedimintCli
1048        .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1049        .await?;
1050
1051    let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1052
1053    debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1054    cli_set_config_gen_params(
1055        leader_endpoint,
1056        auth_for(leader_id),
1057        server_gen_params.clone(),
1058    )
1059    .await?;
1060
1061    let followers_names = followers
1062        .keys()
1063        .map(|peer_id| {
1064            (*peer_id, {
1065                // This is to be clear that the name will be unrelated to peer id
1066                let random_string = rand::thread_rng()
1067                    .sample_iter(&rand::distributions::Alphanumeric)
1068                    .take(5)
1069                    .map(char::from)
1070                    .collect::<String>();
1071                format!("random-{random_string}{peer_id}")
1072            })
1073        })
1074        .collect::<BTreeMap<_, _>>();
1075    for (peer_id, endpoint) in &followers {
1076        let name = followers_names
1077            .get(peer_id)
1078            .context("missing follower name")?;
1079        debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1080
1081        crate::util::FedimintCli
1082            .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1083            .await?;
1084
1085        cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1086    }
1087
1088    debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1089    let peers = crate::util::FedimintCli
1090        .get_config_gen_peers(leader_endpoint)
1091        .await?;
1092
1093    let found_names = peers
1094        .into_iter()
1095        .map(|peer| peer.name)
1096        .collect::<HashSet<_>>();
1097    let all_names = followers_names
1098        .values()
1099        .cloned()
1100        .chain(iter::once(leader_name))
1101        .collect::<HashSet<_>>();
1102    assert_eq!(found_names, all_names);
1103
1104    debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1105    cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1106
1107    debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1108    let mut configs = vec![];
1109    for endpoint in endpoints.values() {
1110        let config = crate::util::FedimintCli
1111            .consensus_config_gen_params_legacy(endpoint)
1112            .await?;
1113        configs.push(config);
1114    }
1115    // Confirm all consensus configs are the same
1116    let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1117    consensus.dedup();
1118    assert_eq!(consensus.len(), 1);
1119    // Confirm all peer ids are unique
1120    let ids = configs
1121        .iter()
1122        .map(|p| p.our_current_id)
1123        .collect::<HashSet<_>>();
1124    assert_eq!(ids.len(), endpoints.len());
1125    let dkg_results = endpoints
1126        .iter()
1127        .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1128    debug!(target: LOG_DEVIMINT, "Running DKG");
1129    let (dkg_results, leader_wait_result) = tokio::join!(
1130        join_all(dkg_results),
1131        cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1132    );
1133    for result in dkg_results {
1134        result?;
1135    }
1136    leader_wait_result?;
1137
1138    // verify config hashes equal for all peers
1139    debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1140    let mut hashes = HashSet::new();
1141    for (peer_id, endpoint) in &endpoints {
1142        cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1143        let hash = crate::util::FedimintCli
1144            .get_verify_config_hash(auth_for(peer_id), endpoint)
1145            .await?;
1146        hashes.insert(hash);
1147    }
1148    assert_eq!(hashes.len(), 1);
1149    for (peer_id, endpoint) in &endpoints {
1150        let result = crate::util::FedimintCli
1151            .start_consensus(auth_for(peer_id), endpoint)
1152            .await;
1153        if let Err(e) = result {
1154            tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1155        }
1156        cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1157    }
1158    Ok(())
1159}
1160
1161pub async fn run_cli_dkg_v2(
1162    params: HashMap<PeerId, ConfigGenParams>,
1163    endpoints: BTreeMap<PeerId, String>,
1164) -> Result<()> {
1165    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1166
1167    // Parallelize setup status checks
1168    let status_futures = endpoints.iter().map(|(peer, endpoint)| {
1169        let peer = *peer;
1170        let endpoint = endpoint.clone();
1171        async move {
1172            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1173                crate::util::FedimintCli
1174                    .setup_status(auth_for(&peer), &endpoint)
1175                    .await
1176                    .map_err(ControlFlow::Continue)
1177            })
1178            .await
1179            .unwrap();
1180
1181            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1182        }
1183    });
1184    join_all(status_futures).await;
1185
1186    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1187
1188    // Parallelize setting local parameters
1189    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1190        let peer = *peer;
1191        let endpoint = endpoint.clone();
1192        async move {
1193            let info = if peer.to_usize() == 0 {
1194                crate::util::FedimintCli
1195                    .set_local_params_leader(&peer, auth_for(&peer), &endpoint)
1196                    .await
1197            } else {
1198                crate::util::FedimintCli
1199                    .set_local_params_follower(&peer, auth_for(&peer), &endpoint)
1200                    .await
1201            };
1202            info.map(|i| (peer, i))
1203        }
1204    });
1205    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1206        .await?
1207        .into_iter()
1208        .collect();
1209
1210    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1211
1212    // Parallelize peer addition - flatten the nested loop into a single parallel
1213    // operation
1214    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1215        endpoints
1216            .iter()
1217            .filter(move |(p, _)| *p != peer)
1218            .map(move |(p, endpoint)| {
1219                let p = *p;
1220                let endpoint = endpoint.clone();
1221                let info = info.clone();
1222                async move {
1223                    crate::util::FedimintCli
1224                        .add_peer(&info, auth_for(&p), &endpoint)
1225                        .await
1226                }
1227            })
1228    });
1229    try_join_all(add_peer_futures).await?;
1230
1231    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1232
1233    // Parallelize DKG start
1234    let start_dkg_futures = endpoints.iter().map(|(peer, endpoint)| {
1235        let peer = *peer;
1236        let endpoint = endpoint.clone();
1237        async move {
1238            crate::util::FedimintCli
1239                .start_dkg(auth_for(&peer), &endpoint)
1240                .await
1241        }
1242    });
1243    try_join_all(start_dkg_futures).await?;
1244
1245    Ok(())
1246}
1247
1248async fn cli_set_config_gen_params(
1249    endpoint: &str,
1250    auth: &ApiAuth,
1251    mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1252) -> Result<()> {
1253    self::config::attach_default_module_init_params(
1254        &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1255        &mut server_gen_params,
1256        Network::Regtest,
1257        10,
1258    );
1259
1260    let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1261
1262    crate::util::FedimintCli
1263        .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1264        .await?;
1265
1266    Ok(())
1267}
1268
1269async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1270    poll(
1271        &format!("waiting-server-status: {expected_status:?}"),
1272        || async {
1273            let server_status = crate::util::FedimintCli
1274                .ws_status(endpoint)
1275                .await
1276                .context("server status")
1277                .map_err(ControlFlow::Continue)?
1278                .server;
1279            if server_status == expected_status {
1280                Ok(())
1281            } else {
1282                Err(ControlFlow::Continue(anyhow!(
1283                    "expected status: {expected_status:?} current status: {server_status:?}"
1284                )))
1285            }
1286        },
1287    )
1288    .await?;
1289    Ok(())
1290}