devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::VERSION_0_10_0_ALPHA;
42use crate::{poll_almost_equal, poll_eq, vars};
43
44// TODO: Are we still using the 3rd port for anything?
45/// Number of ports we allocate for every `fedimintd` instance
46pub const PORTS_PER_FEDIMINTD: u16 = 4;
47/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58    // client is only for internal use, use cli commands instead
59    pub members: BTreeMap<usize, Fedimintd>,
60    pub vars: BTreeMap<usize, vars::Fedimintd>,
61    pub bitcoind: Bitcoind,
62
63    /// Built in [`Client`], already joined
64    client: JitTryAnyhow<Client>,
65    #[allow(dead_code)] // Will need it later, maybe
66    connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70    fn drop(&mut self) {
71        block_in_place(|| {
72            block_on(async {
73                let mut set = JoinSet::new();
74
75                while let Some((_id, fedimintd)) = self.members.pop_first() {
76                    set.spawn(async { drop(fedimintd) });
77                }
78                while (set.join_next().await).is_some() {}
79            });
80        });
81    }
82}
83/// `fedimint-cli` instance (basically path with client state: config + db)
84#[derive(Clone)]
85pub struct Client {
86    name: String,
87}
88
89impl Client {
90    fn clients_dir() -> PathBuf {
91        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92            .expect("FM_DATA_DIR_ENV not set")
93            .parse()
94            .expect("FM_DATA_DIR_ENV invalid");
95        data_dir.join("clients")
96    }
97
98    fn client_dir(&self) -> PathBuf {
99        Self::clients_dir().join(&self.name)
100    }
101
102    pub fn client_name_lock(name: &str) -> Result<FileLock> {
103        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104        let file_lock = std::fs::OpenOptions::new()
105            .write(true)
106            .create(true)
107            .truncate(true)
108            .open(&lock_path)
109            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111        fs_lock::FileLock::new_exclusive(file_lock)
112            .with_context(|| format!("Failed to lock {}", lock_path.display()))
113    }
114
115    /// Create a [`Client`] that starts with a fresh state.
116    pub async fn create(name: impl ToString) -> Result<Client> {
117        let name = name.to_string();
118        spawn_blocking(move || {
119            let _lock = Self::client_name_lock(&name);
120            for i in 0u64.. {
121                let client = Self {
122                    name: format!("{name}-{i}"),
123                };
124
125                if !client.client_dir().exists() {
126                    std::fs::create_dir_all(client.client_dir())?;
127                    return Ok(client);
128                }
129            }
130            unreachable!()
131        })
132        .await?
133    }
134
135    /// Open or create a [`Client`] that starts with a fresh state.
136    pub fn open_or_create(name: &str) -> Result<Client> {
137        block_in_place(|| {
138            let _lock = Self::client_name_lock(name);
139            let client = Self {
140                name: format!("{name}-0"),
141            };
142            if !client.client_dir().exists() {
143                std::fs::create_dir_all(client.client_dir())?;
144            }
145            Ok(client)
146        })
147    }
148
149    /// Client to join a federation
150    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152        cmd!(self, "join-federation", invite_code).run().await?;
153
154        Ok(())
155    }
156
157    /// Client to join a federation with a restore procedure
158    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160        cmd!(
161            self,
162            "restore",
163            "--invite-code",
164            invite_code,
165            "--mnemonic",
166            mnemonic
167        )
168        .run()
169        .await?;
170
171        Ok(())
172    }
173
174    /// Client to join a federation
175    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176        let restored = Self::open_or_create(name)?;
177
178        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179            .as_str()
180            .unwrap()
181            .to_owned();
182
183        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184        cmd!(
185            restored,
186            "restore",
187            "--invite-code",
188            invite_code,
189            "--mnemonic",
190            mnemonic
191        )
192        .run()
193        .await?;
194
195        Ok(restored)
196    }
197
198    /// Create a [`Client`] that starts with a state that is a copy of
199    /// of another one.
200    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201        let new = Client::create(name).await?;
202
203        cmd!(
204            "cp",
205            "-R",
206            self.client_dir().join("client.db").display(),
207            new.client_dir().display()
208        )
209        .run()
210        .await?;
211
212        Ok(new)
213    }
214
215    pub async fn balance(&self) -> Result<u64> {
216        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217            .as_u64()
218            .unwrap())
219    }
220
221    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
222        let deposit = cmd!(self, "deposit-address").out_json().await?;
223        Ok((
224            deposit["address"].as_str().unwrap().to_string(),
225            deposit["operation_id"].as_str().unwrap().to_string(),
226        ))
227    }
228
229    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
230        cmd!(self, "await-deposit", operation_id).run().await
231    }
232
233    pub fn cmd(&self) -> Command {
234        cmd!(
235            crate::util::get_fedimint_cli_path(),
236            format!("--data-dir={}", self.client_dir().display())
237        )
238    }
239
240    pub fn get_name(&self) -> &str {
241        &self.name
242    }
243
244    /// Returns the current consensus session count
245    pub async fn get_session_count(&self) -> Result<u64> {
246        cmd!(self, "dev", "session-count").out_json().await?["count"]
247            .as_u64()
248            .context("count field wasn't a number")
249    }
250
251    /// Returns once all active state machines complete
252    pub async fn wait_complete(&self) -> Result<()> {
253        cmd!(self, "dev", "wait-complete").run().await
254    }
255
256    /// Returns once the current session completes
257    pub async fn wait_session(&self) -> anyhow::Result<()> {
258        info!("Waiting for a new session");
259        let session_count = self.get_session_count().await?;
260        self.wait_session_outcome(session_count).await?;
261        Ok(())
262    }
263
264    /// Returns once the provided session count completes
265    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
266        let timeout = {
267            let current_session_count = self.get_session_count().await?;
268            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
269            let session_duration_seconds = 180;
270            Duration::from_secs(sessions_to_wait * session_duration_seconds)
271        };
272
273        let start = Instant::now();
274        poll_with_timeout("Waiting for a new session", timeout, || async {
275            info!("Awaiting session outcome {session_count}");
276            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
277                .run()
278                .await
279            {
280                Err(e) => Err(ControlFlow::Continue(e)),
281                Ok(()) => Ok(()),
282            }
283        })
284        .await?;
285
286        let session_found_in = start.elapsed();
287        info!("session found in {session_found_in:?}");
288        Ok(())
289    }
290}
291
292impl Federation {
293    pub async fn new(
294        process_mgr: &ProcessManager,
295        bitcoind: Bitcoind,
296        skip_setup: bool,
297        pre_dkg: bool,
298        // Which of the pre-allocated federations to use (most tests just use single `0` one)
299        fed_index: usize,
300        federation_name: String,
301    ) -> Result<Self> {
302        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
303        let mut members = BTreeMap::new();
304        let mut peer_to_env_vars_map = BTreeMap::new();
305
306        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
307        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
308
309        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
310        for peer_id in num_peers.peer_ids() {
311            let peer_env_vars = vars::Fedimintd::init(
312                &process_mgr.globals,
313                federation_name.clone(),
314                peer_id,
315                process_mgr
316                    .globals
317                    .fedimintd_overrides
318                    .peer_expect(fed_index, peer_id),
319            )
320            .await?;
321            members.insert(
322                peer_id.to_usize(),
323                Fedimintd::new(
324                    process_mgr,
325                    bitcoind.clone(),
326                    peer_id.to_usize(),
327                    &peer_env_vars,
328                    federation_name.clone(),
329                )
330                .await?,
331            );
332            let admin_client = DynGlobalApi::new_admin_setup(
333                connectors.clone(),
334                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
335                // TODO: will need it somewhere
336                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
337            )?;
338            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
339            admin_clients.insert(peer_id, admin_client);
340            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
341        }
342
343        if !skip_setup && !pre_dkg {
344            // we don't guarantee backwards-compatibility for dkg, so we use the
345            // fedimint-cli version that matches fedimintd
346            let (original_fedimint_cli_path, original_fm_mint_client) =
347                crate::util::use_matching_fedimint_cli_for_dkg().await?;
348
349            run_cli_dkg_v2(api_endpoints).await?;
350
351            // we're done with dkg, so we can reset the fedimint-cli version
352            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
353
354            // move configs to config directory
355            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
356            let invite_code_filename_original = "invite-code";
357
358            for peer_env_vars in peer_to_env_vars_map.values() {
359                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
360
361                let invite_code = poll_simple("awaiting-invite-code", || async {
362                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
363                    tokio::fs::read_to_string(&path)
364                        .await
365                        .with_context(|| format!("Awaiting invite code file: {path}"))
366                })
367                .await
368                .context("Awaiting invite code file")?;
369
370                download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
371                    .await?;
372            }
373
374            // copy over invite-code file to client directory
375            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
376
377            tokio::fs::copy(
378                format!("{peer_data_dir}/{invite_code_filename_original}"),
379                format!("{client_dir}/{invite_code_filename_original}"),
380            )
381            .await
382            .context("copying invite-code file")?;
383
384            // move each guardian's invite-code file to the client's directory
385            // appending the peer id to the end
386            for (index, peer_env_vars) in &peer_to_env_vars_map {
387                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
388
389                let invite_code_filename_indexed =
390                    format!("{invite_code_filename_original}-{index}");
391                tokio::fs::rename(
392                    format!("{peer_data_dir}/{invite_code_filename_original}"),
393                    format!("{client_dir}/{invite_code_filename_indexed}"),
394                )
395                .await
396                .context("moving invite-code file")?;
397            }
398
399            debug!("Moved invite-code files to client data directory");
400        }
401
402        let client = JitTryAnyhow::new_try({
403            move || async move {
404                let client = Client::open_or_create(federation_name.as_str())?;
405                let invite_code = Self::invite_code_static()?;
406                if !skip_setup && !pre_dkg {
407                    cmd!(client, "join-federation", invite_code).run().await?;
408                }
409                Ok(client)
410            }
411        });
412
413        Ok(Self {
414            members,
415            vars: peer_to_env_vars_map,
416            bitcoind,
417            client,
418            connectors,
419        })
420    }
421
422    pub fn client_config(&self) -> Result<ClientConfig> {
423        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
424        load_from_file(&cfg_path)
425    }
426
427    /// Get the module instance ID for a given module kind
428    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
429        self.client_config()?
430            .modules
431            .iter()
432            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
433            .with_context(|| format!("Module kind {kind} not found"))
434    }
435
436    pub fn module_client_config<M: ClientModule>(
437        &self,
438    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
439        self.client_config()?
440            .modules
441            .iter()
442            .find_map(|(module_instance_id, module_cfg)| {
443                if module_cfg.kind == M::kind() {
444                    let decoders = ModuleDecoderRegistry::new(vec![(
445                        *module_instance_id,
446                        M::kind(),
447                        M::decoder(),
448                    )]);
449                    Some(
450                        module_cfg
451                            .config
452                            .clone()
453                            .redecode_raw(&decoders)
454                            .expect("Decoding client cfg failed")
455                            .expect_decoded_ref()
456                            .as_any()
457                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
458                            .cloned()
459                            .context("Cast to module config failed"),
460                    )
461                } else {
462                    None
463                }
464            })
465            .transpose()
466    }
467
468    pub fn deposit_fees(&self) -> Result<Amount> {
469        Ok(self
470            .module_client_config::<WalletClientModule>()?
471            .context("No wallet module found")?
472            .fee_consensus
473            .peg_in_abs)
474    }
475
476    /// Read the invite code from the client data dir
477    pub fn invite_code(&self) -> Result<String> {
478        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
479        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
480        Ok(invite_code)
481    }
482
483    pub fn invite_code_static() -> Result<String> {
484        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
485        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
486        Ok(invite_code)
487    }
488    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
489        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
490        let name = format!("invite-code-{peer_id}");
491        let invite_code = fs::read_to_string(data_dir.join(name))?;
492        Ok(invite_code)
493    }
494
495    /// Built-in, default, internal [`Client`]
496    ///
497    /// We should be moving away from using it for anything.
498    pub async fn internal_client(&self) -> Result<&Client> {
499        self.client
500            .get_try()
501            .await
502            .context("Internal client joining Federation")
503    }
504
505    /// New [`Client`] that already joined `self`
506    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
507        let client = Client::create(name).await?;
508        client.join_federation(self.invite_code()?).await?;
509        Ok(client)
510    }
511
512    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
513        if self.members.contains_key(&peer) {
514            bail!("fedimintd-{peer} already running");
515        }
516        self.members.insert(
517            peer,
518            Fedimintd::new(
519                process_mgr,
520                self.bitcoind.clone(),
521                peer,
522                &self.vars[&peer],
523                "default".to_string(),
524            )
525            .await?,
526        );
527        Ok(())
528    }
529
530    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
531        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
532            bail!("fedimintd-{peer_id} does not exist");
533        };
534        fedimintd.terminate().await?;
535        Ok(())
536    }
537
538    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
539        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
540            bail!("fedimintd-{peer_id} does not exist");
541        };
542        fedimintd.await_terminated().await?;
543        self.members.remove(&peer_id);
544        Ok(())
545    }
546
547    /// Starts all peers not currently running.
548    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
549        info!("starting all servers");
550        let fed_size = process_mgr.globals.FM_FED_SIZE;
551        for peer_id in 0..fed_size {
552            if self.members.contains_key(&peer_id) {
553                continue;
554            }
555            self.start_server(process_mgr, peer_id).await?;
556        }
557        self.await_all_peers().await?;
558        Ok(())
559    }
560
561    /// Terminates all running peers.
562    pub async fn terminate_all_servers(&mut self) -> Result<()> {
563        info!("terminating all servers");
564        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
565        for peer_id in running_peer_ids {
566            self.terminate_server(peer_id).await?;
567        }
568        Ok(())
569    }
570
571    /// Coordinated shutdown of all peers that restart using the provided
572    /// `bin_path`. Returns `Ok()` once all peers are online.
573    ///
574    /// Staggering the restart more closely simulates upgrades in the wild.
575    pub async fn restart_all_staggered_with_bin(
576        &mut self,
577        process_mgr: &ProcessManager,
578        bin_path: &PathBuf,
579    ) -> Result<()> {
580        let fed_size = process_mgr.globals.FM_FED_SIZE;
581
582        // ensure all peers are online
583        self.start_all_servers(process_mgr).await?;
584
585        // staggered shutdown of peers
586        while self.num_members() > 0 {
587            self.terminate_server(self.num_members() - 1).await?;
588            if self.num_members() > 0 {
589                fedimint_core::task::sleep_in_test(
590                    "waiting to shutdown remaining peers",
591                    Duration::from_secs(10),
592                )
593                .await;
594            }
595        }
596
597        // TODO: Audit that the environment access only happens in single-threaded code.
598        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
599
600        // staggered restart
601        for peer_id in 0..fed_size {
602            self.start_server(process_mgr, peer_id).await?;
603            if peer_id < fed_size - 1 {
604                fedimint_core::task::sleep_in_test(
605                    "waiting to restart remaining peers",
606                    Duration::from_secs(10),
607                )
608                .await;
609            }
610        }
611
612        self.await_all_peers().await?;
613
614        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
615        info!("upgraded fedimintd to version: {}", fedimintd_version);
616        Ok(())
617    }
618
619    pub async fn restart_all_with_bin(
620        &mut self,
621        process_mgr: &ProcessManager,
622        bin_path: &PathBuf,
623    ) -> Result<()> {
624        // get the version we're upgrading to, temporarily updating the fedimintd path
625        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
626        // TODO: Audit that the environment access only happens in single-threaded code.
627        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
628        // TODO: Audit that the environment access only happens in single-threaded code.
629        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
630
631        self.restart_all_staggered_with_bin(process_mgr, bin_path)
632            .await
633    }
634
635    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
636        let fed_size = process_mgr.globals.FM_FED_SIZE;
637        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
638        anyhow::ensure!(
639            fed_size > 3 * offline_nodes,
640            "too many offline nodes ({offline_nodes}) to reach consensus"
641        );
642
643        while self.num_members() > fed_size - offline_nodes {
644            self.terminate_server(self.num_members() - 1).await?;
645        }
646
647        if offline_nodes > 0 {
648            info!(fed_size, offline_nodes, "federation is degraded");
649        }
650        Ok(())
651    }
652
653    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
654        let deposit_fees_msat = self.deposit_fees()?.msats;
655        assert_eq!(
656            deposit_fees_msat % 1000,
657            0,
658            "Deposit fees expected to be whole sats in test suite"
659        );
660        let deposit_fees = deposit_fees_msat / 1000;
661        info!(amount, deposit_fees, "Pegging-in client funds");
662
663        let (address, operation_id) = client.get_deposit_addr().await?;
664
665        self.bitcoind
666            .send_to(address, amount + deposit_fees)
667            .await?;
668        self.bitcoind.mine_blocks(21).await?;
669
670        Ok(operation_id)
671    }
672
673    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
674        let operation_id = self.pegin_client_no_wait(amount, client).await?;
675
676        client.await_deposit(&operation_id).await?;
677        Ok(())
678    }
679
680    /// Initiates multiple peg-ins to the same federation for the set of
681    /// gateways to save on mining blocks in parallel.
682    pub async fn pegin_gateways(
683        &self,
684        amount: u64,
685        gateways: Vec<&super::gatewayd::Gatewayd>,
686    ) -> Result<()> {
687        let deposit_fees_msat = self.deposit_fees()?.msats;
688        assert_eq!(
689            deposit_fees_msat % 1000,
690            0,
691            "Deposit fees expected to be whole sats in test suite"
692        );
693        let deposit_fees = deposit_fees_msat / 1000;
694        info!(amount, deposit_fees, "Pegging-in gateway funds");
695        let fed_id = self.calculate_federation_id();
696        for gw in gateways.clone() {
697            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
698            self.bitcoind
699                .send_to(pegin_addr, amount + deposit_fees)
700                .await?;
701        }
702
703        self.bitcoind.mine_blocks(21).await?;
704        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
705        try_join_all(gateways.into_iter().map(|gw| {
706            poll("gateway pegin", || async {
707                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
708
709                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
710                    gw_info["block_height"]
711                        .as_u64()
712                        .expect("Could not parse block height")
713                } else {
714                    gw_info["lightning_info"]["connected"]["block_height"]
715                        .as_u64()
716                        .expect("Could not parse block height")
717                };
718
719                if bitcoind_block_height != block_height {
720                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
721                        "gateway block height is not synced"
722                    )));
723                }
724
725                let gateway_balance = gw
726                    .ecash_balance(fed_id.clone())
727                    .await
728                    .map_err(ControlFlow::Continue)?;
729                poll_almost_equal!(gateway_balance, amount * 1000)
730            })
731        }))
732        .await?;
733
734        Ok(())
735    }
736
737    /// Initiates multiple peg-outs from the same federation for the set of
738    /// gateways to save on mining blocks in parallel.
739    pub async fn pegout_gateways(
740        &self,
741        amount: u64,
742        gateways: Vec<&super::gatewayd::Gatewayd>,
743    ) -> Result<()> {
744        info!(amount, "Pegging-out gateway funds");
745        let fed_id = self.calculate_federation_id();
746        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
747        for gw in gateways.clone() {
748            let prev_fed_ecash_balance = gw
749                .get_balances()
750                .await?
751                .ecash_balances
752                .into_iter()
753                .find(|fed| fed.federation_id.to_string() == fed_id)
754                .expect("Gateway has not joined federation")
755                .ecash_balance_msats;
756
757            let pegout_address = self.bitcoind.get_new_address().await?;
758            let value = cmd!(
759                gw,
760                "ecash",
761                "pegout",
762                "--federation-id",
763                fed_id,
764                "--amount",
765                amount,
766                "--address",
767                pegout_address
768            )
769            .out_json()
770            .await?;
771            let response: WithdrawResponse = serde_json::from_value(value)?;
772            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
773        }
774        self.bitcoind.mine_blocks(21).await?;
775
776        try_join_all(
777            peg_outs
778                .values()
779                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
780        )
781        .await?;
782
783        for gw in gateways.clone() {
784            let after_fed_ecash_balance = gw
785                .get_balances()
786                .await?
787                .ecash_balances
788                .into_iter()
789                .find(|fed| fed.federation_id.to_string() == fed_id)
790                .expect("Gateway has not joined federation")
791                .ecash_balance_msats;
792
793            let ln_type = gw.ln.ln_type();
794            let prev_balance = peg_outs
795                .get(&ln_type)
796                .expect("peg out does not exist")
797                .0
798                .msats;
799            let fees = peg_outs
800                .get(&ln_type)
801                .expect("peg out does not exist")
802                .1
803                .fees;
804            let total_fee = fees.amount().to_sat() * 1000;
805            crate::util::almost_equal(
806                after_fed_ecash_balance.msats,
807                prev_balance - amount - total_fee,
808                2000,
809            )
810            .map_err(|e| {
811                anyhow::anyhow!(
812                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
813                    e
814                )
815            })?;
816        }
817
818        Ok(())
819    }
820
821    pub fn calculate_federation_id(&self) -> String {
822        self.client_config()
823            .unwrap()
824            .global
825            .calculate_federation_id()
826            .to_string()
827    }
828
829    pub async fn await_block_sync(&self) -> Result<u64> {
830        let finality_delay = self.get_finality_delay()?;
831        let block_count = self.bitcoind.get_block_count().await?;
832        let expected = block_count.saturating_sub(finality_delay.into());
833        cmd!(
834            self.internal_client().await?,
835            "dev",
836            "wait-block-count",
837            expected
838        )
839        .run()
840        .await?;
841        Ok(expected)
842    }
843
844    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
845        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
846        let client_config = &self.client_config()?;
847        let wallet_cfg = client_config
848            .modules
849            .get(&wallet_instance_id)
850            .context("wallet module not found")?
851            .clone()
852            .redecode_raw(&ModuleDecoderRegistry::new([(
853                wallet_instance_id,
854                fedimint_wallet_client::KIND,
855                fedimint_wallet_client::WalletModuleTypes::decoder(),
856            )]))?;
857        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
858
859        let finality_delay = wallet_cfg.finality_delay;
860        Ok(finality_delay)
861    }
862
863    pub async fn await_gateways_registered(&self) -> Result<()> {
864        let start_time = Instant::now();
865        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
866
867        poll("gateways registered", || async {
868            let num_gateways = cmd!(
869                self.internal_client()
870                    .await
871                    .map_err(ControlFlow::Continue)?,
872                "list-gateways"
873            )
874            .out_json()
875            .await
876            .map_err(ControlFlow::Continue)?
877            .as_array()
878            .context("invalid output")
879            .map_err(ControlFlow::Break)?
880            .len();
881
882            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
883            // server, and once for the iroh endpoint.
884            let expected_gateways =
885                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
886                    1
887                } else {
888                    2
889                };
890
891            poll_eq!(num_gateways, expected_gateways)
892        })
893        .await?;
894        debug!(target: LOG_DEVIMINT,
895            elapsed_ms = %start_time.elapsed().as_millis(),
896            "Gateways registered");
897        Ok(())
898    }
899
900    pub async fn await_all_peers(&self) -> Result<()> {
901        poll("Waiting for all peers to be online", || async {
902            cmd!(
903                self.internal_client()
904                    .await
905                    .map_err(ControlFlow::Continue)?,
906                "dev",
907                "api",
908                "--module",
909                "wallet",
910                "block_count"
911            )
912            .run()
913            .await
914            .map_err(ControlFlow::Continue)?;
915            Ok(())
916        })
917        .await
918    }
919
920    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
921        poll("Waiting for all peers to be online", || async {
922            cmd!(
923                self.internal_client()
924                    .await
925                    .map_err(ControlFlow::Continue)?,
926                "dev",
927                "api",
928                "--peer-id",
929                peer_id,
930                "--module",
931                "wallet",
932                "block_count"
933            )
934            .run()
935            .await
936            .map_err(ControlFlow::Continue)?;
937            Ok(())
938        })
939        .await
940    }
941
942    /// Mines enough blocks to finalize mempool transactions, then waits for
943    /// federation to process finalized blocks.
944    ///
945    /// ex:
946    ///   tx submitted to mempool at height 100
947    ///   finality delay = 10
948    ///   mine finality delay blocks + 1 => new height 111
949    ///   tx included in block 101
950    ///   highest finalized height = 111 - 10 = 101
951    pub async fn finalize_mempool_tx(&self) -> Result<()> {
952        let finality_delay = self.get_finality_delay()?;
953        let blocks_to_mine = finality_delay + 1;
954        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
955        self.await_block_sync().await?;
956        Ok(())
957    }
958
959    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
960        self.bitcoind.mine_blocks(blocks).await?;
961        self.await_block_sync().await?;
962        Ok(())
963    }
964
965    pub fn num_members(&self) -> usize {
966        self.members.len()
967    }
968
969    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
970        self.members
971            .keys()
972            .map(|&peer_id| PeerId::from(peer_id as u16))
973    }
974}
975
976#[derive(Clone)]
977pub struct Fedimintd {
978    _bitcoind: Bitcoind,
979    process: ProcessHandle,
980}
981
982impl Fedimintd {
983    pub async fn new(
984        process_mgr: &ProcessManager,
985        bitcoind: Bitcoind,
986        peer_id: usize,
987        env: &vars::Fedimintd,
988        fed_name: String,
989    ) -> Result<Self> {
990        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
991        let process = process_mgr
992            .spawn_daemon(
993                &format!("fedimintd-{fed_name}-{peer_id}"),
994                cmd!(FedimintdCmd).envs(env.vars()),
995            )
996            .await?;
997
998        Ok(Self {
999            _bitcoind: bitcoind,
1000            process,
1001        })
1002    }
1003
1004    pub async fn terminate(self) -> Result<()> {
1005        self.process.terminate().await
1006    }
1007
1008    pub async fn await_terminated(&self) -> Result<()> {
1009        self.process.await_terminated().await
1010    }
1011}
1012
1013pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1014    // Parallelize setup status checks
1015    let status_futures = endpoints.values().map(|endpoint| {
1016        let endpoint = endpoint.clone();
1017        async move {
1018            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1019                crate::util::FedimintCli
1020                    .setup_status(&API_AUTH, &endpoint)
1021                    .await
1022                    .map_err(ControlFlow::Continue)
1023            })
1024            .await
1025            .unwrap();
1026
1027            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1028        }
1029    });
1030    join_all(status_futures).await;
1031
1032    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1033
1034    // Parallelize setting local parameters
1035    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1036        let peer = *peer;
1037        let endpoint = endpoint.clone();
1038        async move {
1039            let info = if peer.to_usize() == 0 {
1040                crate::util::FedimintCli
1041                    .set_local_params_leader(&peer, &API_AUTH, &endpoint)
1042                    .await
1043            } else {
1044                crate::util::FedimintCli
1045                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1046                    .await
1047            };
1048            info.map(|i| (peer, i))
1049        }
1050    });
1051    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1052        .await?
1053        .into_iter()
1054        .collect();
1055
1056    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1057
1058    // Parallelize peer addition - flatten the nested loop into a single parallel
1059    // operation
1060    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1061        endpoints
1062            .iter()
1063            .filter(move |(p, _)| *p != peer)
1064            .map(move |(_, endpoint)| {
1065                let endpoint = endpoint.clone();
1066                let info = info.clone();
1067                async move {
1068                    crate::util::FedimintCli
1069                        .add_peer(&info, &API_AUTH, &endpoint)
1070                        .await
1071                }
1072            })
1073    });
1074    try_join_all(add_peer_futures).await?;
1075
1076    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1077
1078    // Parallelize DKG start
1079    let start_dkg_futures = endpoints.values().map(|endpoint| {
1080        let endpoint = endpoint.clone();
1081        async move {
1082            crate::util::FedimintCli
1083                .start_dkg(&API_AUTH, &endpoint)
1084                .await
1085        }
1086    });
1087    try_join_all(start_dkg_futures).await?;
1088
1089    Ok(())
1090}