devimint/
federation.rs

1mod config;
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::ops::ControlFlow;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use std::{env, fs, iter};
9
10use anyhow::{Context, Result, anyhow, bail};
11use bitcoincore_rpc::bitcoin::Network;
12use fedimint_api_client::api::DynGlobalApi;
13use fedimint_api_client::api::net::Connector;
14use fedimint_client_module::module::ClientModule;
15use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
16use fedimint_core::config::{ClientConfig, ServerModuleConfigGenParamsRegistry, load_from_file};
17use fedimint_core::core::LEGACY_HARDCODED_INSTANCE_ID_WALLET;
18use fedimint_core::envs::BitcoinRpcConfig;
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::module::{ApiAuth, ModuleCommon};
22use fedimint_core::runtime::block_in_place;
23use fedimint_core::task::block_on;
24use fedimint_core::task::jit::JitTryAnyhow;
25use fedimint_core::util::SafeUrl;
26use fedimint_core::{Amount, NumPeers, PeerId};
27use fedimint_gateway_common::WithdrawResponse;
28use fedimint_logging::LOG_DEVIMINT;
29use fedimint_server::config::ConfigGenParams;
30use fedimint_testing_core::config::local_config_gen_params;
31use fedimint_testing_core::node_type::LightningNodeType;
32use fedimint_wallet_client::WalletClientModule;
33use fedimint_wallet_client::config::WalletClientConfig;
34use fs_lock::FileLock;
35use futures::future::{join_all, try_join_all};
36use rand::Rng;
37use tokio::task::{JoinSet, spawn_blocking};
38use tokio::time::Instant;
39use tracing::{debug, info};
40
41use super::external::Bitcoind;
42use super::util::{Command, ProcessHandle, ProcessManager, cmd};
43use super::vars::utf8;
44use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
45use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
46use crate::version_constants::VERSION_0_7_0_ALPHA;
47use crate::{poll_almost_equal, poll_eq, vars};
48
49// TODO: Are we still using the 3rd port for anything?
50/// Number of ports we allocate for every `fedimintd` instance
51pub const PORTS_PER_FEDIMINTD: u16 = 4;
52/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
53pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
54/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
55pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
56/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
57pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
58/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
59pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
60
61#[derive(Clone)]
62pub struct Federation {
63    // client is only for internal use, use cli commands instead
64    pub members: BTreeMap<usize, Fedimintd>,
65    pub vars: BTreeMap<usize, vars::Fedimintd>,
66    pub bitcoind: Bitcoind,
67
68    /// Built in [`Client`], already joined
69    client: JitTryAnyhow<Client>,
70}
71
72impl Drop for Federation {
73    fn drop(&mut self) {
74        block_in_place(|| {
75            block_on(async {
76                let mut set = JoinSet::new();
77
78                while let Some((_id, fedimintd)) = self.members.pop_first() {
79                    set.spawn(async { drop(fedimintd) });
80                }
81                while (set.join_next().await).is_some() {}
82            });
83        });
84    }
85}
86/// `fedimint-cli` instance (basically path with client state: config + db)
87#[derive(Clone)]
88pub struct Client {
89    name: String,
90}
91
92impl Client {
93    fn clients_dir() -> PathBuf {
94        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
95            .expect("FM_DATA_DIR_ENV not set")
96            .parse()
97            .expect("FM_DATA_DIR_ENV invalid");
98        data_dir.join("clients")
99    }
100
101    fn client_dir(&self) -> PathBuf {
102        Self::clients_dir().join(&self.name)
103    }
104
105    pub fn client_name_lock(name: &str) -> Result<FileLock> {
106        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
107        let file_lock = std::fs::OpenOptions::new()
108            .write(true)
109            .create(true)
110            .truncate(true)
111            .open(&lock_path)
112            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
113
114        fs_lock::FileLock::new_exclusive(file_lock)
115            .with_context(|| format!("Failed to lock {}", lock_path.display()))
116    }
117
118    /// Create a [`Client`] that starts with a fresh state.
119    pub async fn create(name: impl ToString) -> Result<Client> {
120        let name = name.to_string();
121        spawn_blocking(move || {
122            let _lock = Self::client_name_lock(&name);
123            for i in 0u64.. {
124                let client = Self {
125                    name: format!("{name}-{i}"),
126                };
127
128                if !client.client_dir().exists() {
129                    std::fs::create_dir_all(client.client_dir())?;
130                    return Ok(client);
131                }
132            }
133            unreachable!()
134        })
135        .await?
136    }
137
138    /// Open or create a [`Client`] that starts with a fresh state.
139    pub fn open_or_create(name: &str) -> Result<Client> {
140        block_in_place(|| {
141            let _lock = Self::client_name_lock(name);
142            let client = Self {
143                name: format!("{name}-0"),
144            };
145            if !client.client_dir().exists() {
146                std::fs::create_dir_all(client.client_dir())?;
147            }
148            Ok(client)
149        })
150    }
151
152    /// Client to join a federation
153    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
154        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
155        cmd!(self, "join-federation", invite_code).run().await?;
156
157        Ok(())
158    }
159
160    /// Client to join a federation with a restore procedure
161    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
162        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
163        cmd!(
164            self,
165            "restore",
166            "--invite-code",
167            invite_code,
168            "--mnemonic",
169            mnemonic
170        )
171        .run()
172        .await?;
173
174        Ok(())
175    }
176
177    /// Client to join a federation
178    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
179        let restored = Self::open_or_create(name)?;
180
181        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
182            .as_str()
183            .unwrap()
184            .to_owned();
185
186        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
187        cmd!(
188            restored,
189            "restore",
190            "--invite-code",
191            invite_code,
192            "--mnemonic",
193            mnemonic
194        )
195        .run()
196        .await?;
197
198        Ok(restored)
199    }
200
201    /// Create a [`Client`] that starts with a state that is a copy of
202    /// of another one.
203    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
204        let new = Client::create(name).await?;
205
206        cmd!(
207            "cp",
208            "-R",
209            self.client_dir().join("client.db").display(),
210            new.client_dir().display()
211        )
212        .run()
213        .await?;
214
215        Ok(new)
216    }
217
218    pub async fn balance(&self) -> Result<u64> {
219        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
220            .as_u64()
221            .unwrap())
222    }
223
224    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
225        let deposit = cmd!(self, "deposit-address").out_json().await?;
226        Ok((
227            deposit["address"].as_str().unwrap().to_string(),
228            deposit["operation_id"].as_str().unwrap().to_string(),
229        ))
230    }
231
232    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
233        cmd!(self, "await-deposit", operation_id).run().await
234    }
235
236    pub fn cmd(&self) -> Command {
237        cmd!(
238            crate::util::get_fedimint_cli_path(),
239            format!("--data-dir={}", self.client_dir().display())
240        )
241    }
242
243    pub fn get_name(&self) -> &str {
244        &self.name
245    }
246
247    /// Returns the current consensus session count
248    pub async fn get_session_count(&self) -> Result<u64> {
249        cmd!(self, "dev", "session-count").out_json().await?["count"]
250            .as_u64()
251            .context("count field wasn't a number")
252    }
253
254    /// Returns once all active state machines complete
255    pub async fn wait_complete(&self) -> Result<()> {
256        cmd!(self, "dev", "wait-complete").run().await
257    }
258
259    /// Returns once the current session completes
260    pub async fn wait_session(&self) -> anyhow::Result<()> {
261        info!("Waiting for a new session");
262        let session_count = self.get_session_count().await?;
263        self.wait_session_outcome(session_count).await?;
264        Ok(())
265    }
266
267    /// Returns once the provided session count completes
268    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
269        let timeout = {
270            let current_session_count = self.get_session_count().await?;
271            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
272            let session_duration_seconds = 180;
273            Duration::from_secs(sessions_to_wait * session_duration_seconds)
274        };
275
276        let start = Instant::now();
277        poll_with_timeout("Waiting for a new session", timeout, || async {
278            info!("Awaiting session outcome {session_count}");
279            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
280                .run()
281                .await
282            {
283                Err(e) => Err(ControlFlow::Continue(e)),
284                Ok(()) => Ok(()),
285            }
286        })
287        .await?;
288
289        let session_found_in = start.elapsed();
290        info!("session found in {session_found_in:?}");
291        Ok(())
292    }
293}
294
295impl Federation {
296    pub async fn new(
297        process_mgr: &ProcessManager,
298        bitcoind: Bitcoind,
299        skip_setup: bool,
300        pre_dkg: bool,
301        // Which of the pre-allocated federations to use (most tests just use single `0` one)
302        fed_index: usize,
303        federation_name: String,
304    ) -> Result<Self> {
305        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
306        let mut members = BTreeMap::new();
307        let mut peer_to_env_vars_map = BTreeMap::new();
308
309        let peers: Vec<_> = num_peers.peer_ids().collect();
310        let params: HashMap<PeerId, ConfigGenParams> =
311            local_config_gen_params(&peers, process_mgr.globals.FM_FEDERATION_BASE_PORT)?;
312
313        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
314        let mut endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
315        for peer_id in num_peers.peer_ids() {
316            let peer_env_vars = vars::Fedimintd::init(
317                &process_mgr.globals,
318                federation_name.clone(),
319                peer_id,
320                process_mgr
321                    .globals
322                    .fedimintd_overrides
323                    .peer_expect(fed_index, peer_id),
324            )
325            .await?;
326            members.insert(
327                peer_id.to_usize(),
328                Fedimintd::new(
329                    process_mgr,
330                    bitcoind.clone(),
331                    peer_id.to_usize(),
332                    &peer_env_vars,
333                    federation_name.clone(),
334                )
335                .await?,
336            );
337            let admin_client = DynGlobalApi::from_setup_endpoint(
338                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
339                &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
340                // We shouldn't need dht in devimint, so just disable it
341                false,
342                false,
343            )
344            .await?;
345            endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
346            admin_clients.insert(peer_id, admin_client);
347            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
348        }
349
350        if !skip_setup && !pre_dkg {
351            // we don't guarantee backwards-compatibility for dkg, so we use the
352            // fedimint-cli version that matches fedimintd
353            let (original_fedimint_cli_path, original_fm_mint_client) =
354                crate::util::use_matching_fedimint_cli_for_dkg().await?;
355
356            let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
357
358            if fedimint_cli_version >= *VERSION_0_7_0_ALPHA {
359                run_cli_dkg_v2(params, endpoints).await?;
360            } else {
361                run_cli_dkg(params, endpoints).await?;
362            }
363
364            // we're done with dkg, so we can reset the fedimint-cli version
365            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
366
367            // move configs to config directory
368            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
369            let invite_code_filename_original = "invite-code";
370
371            for peer_env_vars in peer_to_env_vars_map.values() {
372                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
373
374                let invite_code = poll_simple("awaiting-invite-code", || async {
375                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
376                    tokio::fs::read_to_string(&path)
377                        .await
378                        .with_context(|| format!("Awaiting invite code file: {path}"))
379                })
380                .await
381                .context("Awaiting invite code file")?;
382
383                Connector::default()
384                    .download_from_invite_code(&InviteCode::from_str(&invite_code)?, false, false)
385                    .await?;
386            }
387
388            // copy over invite-code file to client directory
389            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
390
391            tokio::fs::copy(
392                format!("{peer_data_dir}/{invite_code_filename_original}"),
393                format!("{client_dir}/{invite_code_filename_original}"),
394            )
395            .await
396            .context("copying invite-code file")?;
397
398            // move each guardian's invite-code file to the client's directory
399            // appending the peer id to the end
400            for (index, peer_env_vars) in &peer_to_env_vars_map {
401                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
402
403                let invite_code_filename_indexed =
404                    format!("{invite_code_filename_original}-{index}");
405                tokio::fs::rename(
406                    format!("{peer_data_dir}/{invite_code_filename_original}"),
407                    format!("{client_dir}/{invite_code_filename_indexed}"),
408                )
409                .await
410                .context("moving invite-code file")?;
411            }
412
413            debug!("Moved invite-code files to client data directory");
414        }
415
416        let client = JitTryAnyhow::new_try({
417            move || async move {
418                let client = Client::open_or_create(federation_name.as_str())?;
419                let invite_code = Self::invite_code_static()?;
420                if !skip_setup && !pre_dkg {
421                    cmd!(client, "join-federation", invite_code).run().await?;
422                }
423                Ok(client)
424            }
425        });
426
427        Ok(Self {
428            members,
429            vars: peer_to_env_vars_map,
430            bitcoind,
431            client,
432        })
433    }
434
435    pub fn client_config(&self) -> Result<ClientConfig> {
436        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
437        load_from_file(&cfg_path)
438    }
439
440    pub fn module_client_config<M: ClientModule>(
441        &self,
442    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
443        self.client_config()?
444            .modules
445            .iter()
446            .find_map(|(module_instance_id, module_cfg)| {
447                if module_cfg.kind == M::kind() {
448                    let decoders = ModuleDecoderRegistry::new(vec![(
449                        *module_instance_id,
450                        M::kind(),
451                        M::decoder(),
452                    )]);
453                    Some(
454                        module_cfg
455                            .config
456                            .clone()
457                            .redecode_raw(&decoders)
458                            .expect("Decoding client cfg failed")
459                            .expect_decoded_ref()
460                            .as_any()
461                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
462                            .cloned()
463                            .context("Cast to module config failed"),
464                    )
465                } else {
466                    None
467                }
468            })
469            .transpose()
470    }
471
472    pub fn deposit_fees(&self) -> Result<Amount> {
473        Ok(self
474            .module_client_config::<WalletClientModule>()?
475            .context("No wallet module found")?
476            .fee_consensus
477            .peg_in_abs)
478    }
479
480    /// Read the invite code from the client data dir
481    pub fn invite_code(&self) -> Result<String> {
482        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
483        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
484        Ok(invite_code)
485    }
486
487    pub fn invite_code_static() -> Result<String> {
488        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
489        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
490        Ok(invite_code)
491    }
492    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
493        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
494        let name = format!("invite-code-{peer_id}");
495        let invite_code = fs::read_to_string(data_dir.join(name))?;
496        Ok(invite_code)
497    }
498
499    /// Built-in, default, internal [`Client`]
500    ///
501    /// We should be moving away from using it for anything.
502    pub async fn internal_client(&self) -> Result<&Client> {
503        self.client
504            .get_try()
505            .await
506            .context("Internal client joining Federation")
507    }
508
509    /// New [`Client`] that already joined `self`
510    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
511        let client = Client::create(name).await?;
512        client.join_federation(self.invite_code()?).await?;
513        Ok(client)
514    }
515
516    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
517        if self.members.contains_key(&peer) {
518            bail!("fedimintd-{peer} already running");
519        }
520        self.members.insert(
521            peer,
522            Fedimintd::new(
523                process_mgr,
524                self.bitcoind.clone(),
525                peer,
526                &self.vars[&peer],
527                "default".to_string(),
528            )
529            .await?,
530        );
531        Ok(())
532    }
533
534    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
535        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
536            bail!("fedimintd-{peer_id} does not exist");
537        };
538        fedimintd.terminate().await?;
539        Ok(())
540    }
541
542    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
543        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
544            bail!("fedimintd-{peer_id} does not exist");
545        };
546        fedimintd.await_terminated().await?;
547        self.members.remove(&peer_id);
548        Ok(())
549    }
550
551    /// Starts all peers not currently running.
552    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
553        info!("starting all servers");
554        let fed_size = process_mgr.globals.FM_FED_SIZE;
555        for peer_id in 0..fed_size {
556            if self.members.contains_key(&peer_id) {
557                continue;
558            }
559            self.start_server(process_mgr, peer_id).await?;
560        }
561        self.await_all_peers().await?;
562        Ok(())
563    }
564
565    /// Terminates all running peers.
566    pub async fn terminate_all_servers(&mut self) -> Result<()> {
567        info!("terminating all servers");
568        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
569        for peer_id in running_peer_ids {
570            self.terminate_server(peer_id).await?;
571        }
572        Ok(())
573    }
574
575    /// Coordinated shutdown of all peers that restart using the provided
576    /// `bin_path`. Returns `Ok()` once all peers are online.
577    ///
578    /// Staggering the restart more closely simulates upgrades in the wild.
579    pub async fn restart_all_staggered_with_bin(
580        &mut self,
581        process_mgr: &ProcessManager,
582        bin_path: &PathBuf,
583    ) -> Result<()> {
584        let fed_size = process_mgr.globals.FM_FED_SIZE;
585
586        // ensure all peers are online
587        self.start_all_servers(process_mgr).await?;
588
589        // staggered shutdown of peers
590        while self.num_members() > 0 {
591            self.terminate_server(self.num_members() - 1).await?;
592            if self.num_members() > 0 {
593                fedimint_core::task::sleep_in_test(
594                    "waiting to shutdown remaining peers",
595                    Duration::from_secs(10),
596                )
597                .await;
598            }
599        }
600
601        // TODO: Audit that the environment access only happens in single-threaded code.
602        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
603
604        // staggered restart
605        for peer_id in 0..fed_size {
606            self.start_server(process_mgr, peer_id).await?;
607            if peer_id < fed_size - 1 {
608                fedimint_core::task::sleep_in_test(
609                    "waiting to restart remaining peers",
610                    Duration::from_secs(10),
611                )
612                .await;
613            }
614        }
615
616        self.await_all_peers().await?;
617
618        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
619        info!("upgraded fedimintd to version: {}", fedimintd_version);
620        Ok(())
621    }
622
623    pub async fn restart_all_with_bin(
624        &mut self,
625        process_mgr: &ProcessManager,
626        bin_path: &PathBuf,
627    ) -> Result<()> {
628        // get the version we're upgrading to, temporarily updating the fedimintd path
629        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
630        // TODO: Audit that the environment access only happens in single-threaded code.
631        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
632        // TODO: Audit that the environment access only happens in single-threaded code.
633        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
634
635        self.restart_all_staggered_with_bin(process_mgr, bin_path)
636            .await
637    }
638
639    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
640        let fed_size = process_mgr.globals.FM_FED_SIZE;
641        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
642        anyhow::ensure!(
643            fed_size > 3 * offline_nodes,
644            "too many offline nodes ({offline_nodes}) to reach consensus"
645        );
646
647        while self.num_members() > fed_size - offline_nodes {
648            self.terminate_server(self.num_members() - 1).await?;
649        }
650
651        if offline_nodes > 0 {
652            info!(fed_size, offline_nodes, "federation is degraded");
653        }
654        Ok(())
655    }
656
657    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
658        let deposit_fees_msat = self.deposit_fees()?.msats;
659        assert_eq!(
660            deposit_fees_msat % 1000,
661            0,
662            "Deposit fees expected to be whole sats in test suite"
663        );
664        let deposit_fees = deposit_fees_msat / 1000;
665        info!(amount, deposit_fees, "Pegging-in client funds");
666
667        let (address, operation_id) = client.get_deposit_addr().await?;
668
669        self.bitcoind
670            .send_to(address, amount + deposit_fees)
671            .await?;
672        self.bitcoind.mine_blocks(21).await?;
673
674        Ok(operation_id)
675    }
676
677    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
678        let operation_id = self.pegin_client_no_wait(amount, client).await?;
679
680        client.await_deposit(&operation_id).await?;
681        Ok(())
682    }
683
684    /// Inititates multiple peg-ins to the same federation for the set of
685    /// gateways to save on mining blocks in parallel.
686    pub async fn pegin_gateways(
687        &self,
688        amount: u64,
689        gateways: Vec<&super::gatewayd::Gatewayd>,
690    ) -> Result<()> {
691        let deposit_fees_msat = self.deposit_fees()?.msats;
692        assert_eq!(
693            deposit_fees_msat % 1000,
694            0,
695            "Deposit fees expected to be whole sats in test suite"
696        );
697        let deposit_fees = deposit_fees_msat / 1000;
698        info!(amount, deposit_fees, "Pegging-in gateway funds");
699        let fed_id = self.calculate_federation_id();
700        for gw in gateways.clone() {
701            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
702            self.bitcoind
703                .send_to(pegin_addr, amount + deposit_fees)
704                .await?;
705        }
706
707        self.bitcoind.mine_blocks(21).await?;
708        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
709        try_join_all(gateways.into_iter().map(|gw| {
710            poll("gateway pegin", || async {
711                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
712                let block_height: u64 = gw_info["block_height"]
713                    .as_u64()
714                    .expect("Could not parse block height");
715                if bitcoind_block_height != block_height {
716                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
717                        "gateway block height is not synced"
718                    )));
719                }
720
721                let gateway_balance = gw
722                    .ecash_balance(fed_id.clone())
723                    .await
724                    .map_err(ControlFlow::Continue)?;
725                poll_almost_equal!(gateway_balance, amount * 1000)
726            })
727        }))
728        .await?;
729
730        Ok(())
731    }
732
733    /// Initiates multiple peg-outs from the same federation for the set of
734    /// gateways to save on mining blocks in parallel.
735    pub async fn pegout_gateways(
736        &self,
737        amount: u64,
738        gateways: Vec<&super::gatewayd::Gatewayd>,
739    ) -> Result<()> {
740        info!(amount, "Pegging-out gateway funds");
741        let fed_id = self.calculate_federation_id();
742        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
743        for gw in gateways.clone() {
744            let prev_fed_ecash_balance = gw
745                .get_balances()
746                .await?
747                .ecash_balances
748                .into_iter()
749                .find(|fed| fed.federation_id.to_string() == fed_id)
750                .expect("Gateway has not joined federation")
751                .ecash_balance_msats;
752
753            let pegout_address = self.bitcoind.get_new_address().await?;
754            let value = cmd!(
755                gw,
756                "ecash",
757                "pegout",
758                "--federation-id",
759                fed_id,
760                "--amount",
761                amount,
762                "--address",
763                pegout_address
764            )
765            .out_json()
766            .await?;
767            let response: WithdrawResponse = serde_json::from_value(value)?;
768            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
769        }
770        self.bitcoind.mine_blocks(21).await?;
771
772        try_join_all(
773            peg_outs
774                .values()
775                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
776        )
777        .await?;
778
779        for gw in gateways.clone() {
780            let after_fed_ecash_balance = gw
781                .get_balances()
782                .await?
783                .ecash_balances
784                .into_iter()
785                .find(|fed| fed.federation_id.to_string() == fed_id)
786                .expect("Gateway has not joined federation")
787                .ecash_balance_msats;
788
789            let ln_type = gw.ln.ln_type();
790            let prev_balance = peg_outs
791                .get(&ln_type)
792                .expect("peg out does not exist")
793                .0
794                .msats;
795            let fees = peg_outs
796                .get(&ln_type)
797                .expect("peg out does not exist")
798                .1
799                .fees;
800            let total_fee = fees.amount().to_sat() * 1000;
801            crate::util::almost_equal(
802                after_fed_ecash_balance.msats,
803                prev_balance - amount - total_fee,
804                2000,
805            )
806            .map_err(|e| {
807                anyhow::anyhow!(
808                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
809                    e
810                )
811            })?;
812        }
813
814        Ok(())
815    }
816
817    pub fn calculate_federation_id(&self) -> String {
818        self.client_config()
819            .unwrap()
820            .global
821            .calculate_federation_id()
822            .to_string()
823    }
824
825    pub async fn await_block_sync(&self) -> Result<u64> {
826        let finality_delay = self.get_finality_delay()?;
827        let block_count = self.bitcoind.get_block_count().await?;
828        let expected = block_count.saturating_sub(finality_delay.into());
829        cmd!(
830            self.internal_client().await?,
831            "dev",
832            "wait-block-count",
833            expected
834        )
835        .run()
836        .await?;
837        Ok(expected)
838    }
839
840    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
841        let client_config = &self.client_config()?;
842        let wallet_cfg = client_config
843            .modules
844            .get(&LEGACY_HARDCODED_INSTANCE_ID_WALLET)
845            .context("wallet module not found")?
846            .clone()
847            .redecode_raw(&ModuleDecoderRegistry::new([(
848                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
849                fedimint_wallet_client::KIND,
850                fedimint_wallet_client::WalletModuleTypes::decoder(),
851            )]))?;
852        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
853
854        let finality_delay = wallet_cfg.finality_delay;
855        Ok(finality_delay)
856    }
857
858    pub async fn await_gateways_registered(&self) -> Result<()> {
859        let start_time = Instant::now();
860        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
861
862        poll("gateways registered", || async {
863            let num_gateways = cmd!(
864                self.internal_client()
865                    .await
866                    .map_err(ControlFlow::Continue)?,
867                "list-gateways"
868            )
869            .out_json()
870            .await
871            .map_err(ControlFlow::Continue)?
872            .as_array()
873            .context("invalid output")
874            .map_err(ControlFlow::Break)?
875            .len();
876            poll_eq!(num_gateways, 1)
877        })
878        .await?;
879        debug!(target: LOG_DEVIMINT,
880            elapsed_ms = %start_time.elapsed().as_millis(),
881            "Gateways registered");
882        Ok(())
883    }
884
885    pub async fn await_all_peers(&self) -> Result<()> {
886        poll("Waiting for all peers to be online", || async {
887            cmd!(
888                self.internal_client()
889                    .await
890                    .map_err(ControlFlow::Continue)?,
891                "dev",
892                "api",
893                "--module",
894                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
895                "block_count"
896            )
897            .run()
898            .await
899            .map_err(ControlFlow::Continue)?;
900            Ok(())
901        })
902        .await
903    }
904
905    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
906        poll("Waiting for all peers to be online", || async {
907            cmd!(
908                self.internal_client()
909                    .await
910                    .map_err(ControlFlow::Continue)?,
911                "dev",
912                "api",
913                "--peer-id",
914                peer_id,
915                "--module",
916                LEGACY_HARDCODED_INSTANCE_ID_WALLET,
917                "block_count"
918            )
919            .run()
920            .await
921            .map_err(ControlFlow::Continue)?;
922            Ok(())
923        })
924        .await
925    }
926
927    /// Mines enough blocks to finalize mempool transactions, then waits for
928    /// federation to process finalized blocks.
929    ///
930    /// ex:
931    ///   tx submitted to mempool at height 100
932    ///   finality delay = 10
933    ///   mine finality delay blocks + 1 => new height 111
934    ///   tx included in block 101
935    ///   highest finalized height = 111 - 10 = 101
936    pub async fn finalize_mempool_tx(&self) -> Result<()> {
937        let finality_delay = self.get_finality_delay()?;
938        let blocks_to_mine = finality_delay + 1;
939        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
940        self.await_block_sync().await?;
941        Ok(())
942    }
943
944    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
945        self.bitcoind.mine_blocks(blocks).await?;
946        self.await_block_sync().await?;
947        Ok(())
948    }
949
950    pub fn num_members(&self) -> usize {
951        self.members.len()
952    }
953
954    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
955        self.members
956            .keys()
957            .map(|&peer_id| PeerId::from(peer_id as u16))
958    }
959}
960
961#[derive(Clone)]
962pub struct Fedimintd {
963    _bitcoind: Bitcoind,
964    process: ProcessHandle,
965}
966
967impl Fedimintd {
968    pub async fn new(
969        process_mgr: &ProcessManager,
970        bitcoind: Bitcoind,
971        peer_id: usize,
972        env: &vars::Fedimintd,
973        fed_name: String,
974    ) -> Result<Self> {
975        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
976        let process = process_mgr
977            .spawn_daemon(
978                &format!("fedimintd-{fed_name}-{peer_id}"),
979                cmd!(FedimintdCmd).envs(env.vars()),
980            )
981            .await?;
982
983        Ok(Self {
984            _bitcoind: bitcoind,
985            process,
986        })
987    }
988
989    pub async fn terminate(self) -> Result<()> {
990        self.process.terminate().await
991    }
992
993    pub async fn await_terminated(&self) -> Result<()> {
994        self.process.await_terminated().await
995    }
996}
997
998pub async fn run_cli_dkg(
999    params: HashMap<PeerId, ConfigGenParams>,
1000    endpoints: BTreeMap<PeerId, String>,
1001) -> Result<()> {
1002    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1003
1004    debug!(target: LOG_DEVIMINT, "Running DKG");
1005    for endpoint in endpoints.values() {
1006        poll("trying-to-connect-to-peers", || async {
1007            crate::util::FedimintCli
1008                .ws_status(endpoint)
1009                .await
1010                .context("dkg status")
1011                .map_err(ControlFlow::Continue)
1012        })
1013        .await?;
1014    }
1015
1016    debug!(target: LOG_DEVIMINT, "Connected to all peers");
1017
1018    for (peer_id, endpoint) in &endpoints {
1019        let status = crate::util::FedimintCli.ws_status(endpoint).await?;
1020        assert_eq!(
1021            status.server,
1022            ServerStatusLegacy::AwaitingPassword,
1023            "peer_id isn't waiting for password: {peer_id}"
1024        );
1025    }
1026
1027    debug!(target: LOG_DEVIMINT, "Setting passwords");
1028    for (peer_id, endpoint) in &endpoints {
1029        crate::util::FedimintCli
1030            .set_password(auth_for(peer_id), endpoint)
1031            .await?;
1032    }
1033    let (leader_id, leader_endpoint) = endpoints.first_key_value().context("missing peer")?;
1034    let followers = endpoints
1035        .iter()
1036        .filter(|(id, _)| *id != leader_id)
1037        .collect::<BTreeMap<_, _>>();
1038
1039    debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for leader");
1040    let leader_name = "leader".to_string();
1041    crate::util::FedimintCli
1042        .set_config_gen_connections(auth_for(leader_id), leader_endpoint, &leader_name, None)
1043        .await?;
1044
1045    let server_gen_params = ServerModuleConfigGenParamsRegistry::default();
1046
1047    debug!(target: LOG_DEVIMINT, "calling set_config_gen_params for leader");
1048    cli_set_config_gen_params(
1049        leader_endpoint,
1050        auth_for(leader_id),
1051        server_gen_params.clone(),
1052    )
1053    .await?;
1054
1055    let followers_names = followers
1056        .keys()
1057        .map(|peer_id| {
1058            (*peer_id, {
1059                // This is to be clear that the name will be unrelated to peer id
1060                let random_string = rand::thread_rng()
1061                    .sample_iter(&rand::distributions::Alphanumeric)
1062                    .take(5)
1063                    .map(char::from)
1064                    .collect::<String>();
1065                format!("random-{random_string}{peer_id}")
1066            })
1067        })
1068        .collect::<BTreeMap<_, _>>();
1069    for (peer_id, endpoint) in &followers {
1070        let name = followers_names
1071            .get(peer_id)
1072            .context("missing follower name")?;
1073        debug!(target: LOG_DEVIMINT, "calling set_config_gen_connections for {peer_id} {name}");
1074
1075        crate::util::FedimintCli
1076            .set_config_gen_connections(auth_for(peer_id), endpoint, name, Some(leader_endpoint))
1077            .await?;
1078
1079        cli_set_config_gen_params(endpoint, auth_for(peer_id), server_gen_params.clone()).await?;
1080    }
1081
1082    debug!(target: LOG_DEVIMINT, "calling get_config_gen_peers for leader");
1083    let peers = crate::util::FedimintCli
1084        .get_config_gen_peers(leader_endpoint)
1085        .await?;
1086
1087    let found_names = peers
1088        .into_iter()
1089        .map(|peer| peer.name)
1090        .collect::<HashSet<_>>();
1091    let all_names = followers_names
1092        .values()
1093        .cloned()
1094        .chain(iter::once(leader_name))
1095        .collect::<HashSet<_>>();
1096    assert_eq!(found_names, all_names);
1097
1098    debug!(target: LOG_DEVIMINT, "Waiting for SharingConfigGenParams");
1099    cli_wait_server_status(leader_endpoint, ServerStatusLegacy::SharingConfigGenParams).await?;
1100
1101    debug!(target: LOG_DEVIMINT, "Getting consensus configs");
1102    let mut configs = vec![];
1103    for endpoint in endpoints.values() {
1104        let config = crate::util::FedimintCli
1105            .consensus_config_gen_params_legacy(endpoint)
1106            .await?;
1107        configs.push(config);
1108    }
1109    // Confirm all consensus configs are the same
1110    let mut consensus: Vec<_> = configs.iter().map(|p| p.consensus.clone()).collect();
1111    consensus.dedup();
1112    assert_eq!(consensus.len(), 1);
1113    // Confirm all peer ids are unique
1114    let ids = configs
1115        .iter()
1116        .map(|p| p.our_current_id)
1117        .collect::<HashSet<_>>();
1118    assert_eq!(ids.len(), endpoints.len());
1119    let dkg_results = endpoints
1120        .iter()
1121        .map(|(peer_id, endpoint)| crate::util::FedimintCli.run_dkg(auth_for(peer_id), endpoint));
1122    debug!(target: LOG_DEVIMINT, "Running DKG");
1123    let (dkg_results, leader_wait_result) = tokio::join!(
1124        join_all(dkg_results),
1125        cli_wait_server_status(leader_endpoint, ServerStatusLegacy::VerifyingConfigs)
1126    );
1127    for result in dkg_results {
1128        result?;
1129    }
1130    leader_wait_result?;
1131
1132    // verify config hashes equal for all peers
1133    debug!(target: LOG_DEVIMINT, "Verifying config hashes");
1134    let mut hashes = HashSet::new();
1135    for (peer_id, endpoint) in &endpoints {
1136        cli_wait_server_status(endpoint, ServerStatusLegacy::VerifyingConfigs).await?;
1137        let hash = crate::util::FedimintCli
1138            .get_verify_config_hash(auth_for(peer_id), endpoint)
1139            .await?;
1140        hashes.insert(hash);
1141    }
1142    assert_eq!(hashes.len(), 1);
1143    for (peer_id, endpoint) in &endpoints {
1144        let result = crate::util::FedimintCli
1145            .start_consensus(auth_for(peer_id), endpoint)
1146            .await;
1147        if let Err(e) = result {
1148            tracing::debug!(target: LOG_DEVIMINT, "Error calling start_consensus: {e:?}, trying to continue...");
1149        }
1150        cli_wait_server_status(endpoint, ServerStatusLegacy::ConsensusRunning).await?;
1151    }
1152    Ok(())
1153}
1154
1155pub async fn run_cli_dkg_v2(
1156    params: HashMap<PeerId, ConfigGenParams>,
1157    endpoints: BTreeMap<PeerId, String>,
1158) -> Result<()> {
1159    let auth_for = |peer: &PeerId| -> &ApiAuth { &params[peer].api_auth };
1160
1161    for (peer, endpoint) in &endpoints {
1162        let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1163            crate::util::FedimintCli
1164                .setup_status(auth_for(peer), endpoint)
1165                .await
1166                .map_err(ControlFlow::Continue)
1167        })
1168        .await
1169        .unwrap();
1170
1171        assert_eq!(status, SetupStatus::AwaitingLocalParams);
1172    }
1173
1174    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1175
1176    let mut connection_info = BTreeMap::new();
1177
1178    for (peer, endpoint) in &endpoints {
1179        let info = if peer.to_usize() == 0 {
1180            crate::util::FedimintCli
1181                .set_local_params_leader(peer, auth_for(peer), endpoint)
1182                .await?
1183        } else {
1184            crate::util::FedimintCli
1185                .set_local_params_follower(peer, auth_for(peer), endpoint)
1186                .await?
1187        };
1188
1189        connection_info.insert(peer, info);
1190    }
1191
1192    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1193
1194    for (peer, info) in connection_info {
1195        for (p, endpoint) in &endpoints {
1196            if p != peer {
1197                crate::util::FedimintCli
1198                    .add_peer(&info, auth_for(p), endpoint)
1199                    .await?;
1200            }
1201        }
1202    }
1203
1204    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1205
1206    for (peer, endpoint) in &endpoints {
1207        crate::util::FedimintCli
1208            .start_dkg(auth_for(peer), endpoint)
1209            .await?;
1210    }
1211
1212    Ok(())
1213}
1214
1215async fn cli_set_config_gen_params(
1216    endpoint: &str,
1217    auth: &ApiAuth,
1218    mut server_gen_params: ServerModuleConfigGenParamsRegistry,
1219) -> Result<()> {
1220    self::config::attach_default_module_init_params(
1221        &BitcoinRpcConfig::get_defaults_from_env_vars()?,
1222        &mut server_gen_params,
1223        Network::Regtest,
1224        10,
1225    );
1226
1227    let meta = iter::once(("federation_name".to_string(), "testfed".to_string())).collect();
1228
1229    crate::util::FedimintCli
1230        .set_config_gen_params(auth, endpoint, meta, server_gen_params)
1231        .await?;
1232
1233    Ok(())
1234}
1235
1236async fn cli_wait_server_status(endpoint: &str, expected_status: ServerStatusLegacy) -> Result<()> {
1237    poll(
1238        &format!("waiting-server-status: {expected_status:?}"),
1239        || async {
1240            let server_status = crate::util::FedimintCli
1241                .ws_status(endpoint)
1242                .await
1243                .context("server status")
1244                .map_err(ControlFlow::Continue)?
1245                .server;
1246            if server_status == expected_status {
1247                Ok(())
1248            } else {
1249                Err(ControlFlow::Continue(anyhow!(
1250                    "expected status: {expected_status:?} current status: {server_status:?}"
1251                )))
1252            }
1253        },
1254    )
1255    .await?;
1256    Ok(())
1257}