Skip to main content

devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
42use crate::{poll_almost_equal, poll_eq, vars};
43
44// TODO: Are we still using the 3rd port for anything?
45/// Number of ports we allocate for every `fedimintd` instance
46pub const PORTS_PER_FEDIMINTD: u16 = 4;
47/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58    // client is only for internal use, use cli commands instead
59    pub members: BTreeMap<usize, Fedimintd>,
60    pub vars: BTreeMap<usize, vars::Fedimintd>,
61    pub bitcoind: Bitcoind,
62
63    /// Built in [`Client`], already joined
64    client: JitTryAnyhow<Client>,
65    #[allow(dead_code)] // Will need it later, maybe
66    connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70    fn drop(&mut self) {
71        block_in_place(|| {
72            block_on(async {
73                let mut set = JoinSet::new();
74
75                while let Some((_id, fedimintd)) = self.members.pop_first() {
76                    set.spawn(async { drop(fedimintd) });
77                }
78                while (set.join_next().await).is_some() {}
79            });
80        });
81    }
82}
83/// `fedimint-cli` instance (basically path with client state: config + db)
84#[derive(Clone)]
85pub struct Client {
86    name: String,
87}
88
89impl Client {
90    fn clients_dir() -> PathBuf {
91        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92            .expect("FM_DATA_DIR_ENV not set")
93            .parse()
94            .expect("FM_DATA_DIR_ENV invalid");
95        data_dir.join("clients")
96    }
97
98    fn client_dir(&self) -> PathBuf {
99        Self::clients_dir().join(&self.name)
100    }
101
102    pub fn client_name_lock(name: &str) -> Result<FileLock> {
103        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104        let file_lock = std::fs::OpenOptions::new()
105            .write(true)
106            .create(true)
107            .truncate(true)
108            .open(&lock_path)
109            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111        fs_lock::FileLock::new_exclusive(file_lock)
112            .with_context(|| format!("Failed to lock {}", lock_path.display()))
113    }
114
115    /// Create a [`Client`] that starts with a fresh state.
116    pub async fn create(name: impl ToString) -> Result<Client> {
117        let name = name.to_string();
118        spawn_blocking(move || {
119            let _lock = Self::client_name_lock(&name);
120            for i in 0u64.. {
121                let client = Self {
122                    name: format!("{name}-{i}"),
123                };
124
125                if !client.client_dir().exists() {
126                    std::fs::create_dir_all(client.client_dir())?;
127                    return Ok(client);
128                }
129            }
130            unreachable!()
131        })
132        .await?
133    }
134
135    /// Open or create a [`Client`] that starts with a fresh state.
136    pub fn open_or_create(name: &str) -> Result<Client> {
137        block_in_place(|| {
138            let _lock = Self::client_name_lock(name);
139            let client = Self {
140                name: format!("{name}-0"),
141            };
142            if !client.client_dir().exists() {
143                std::fs::create_dir_all(client.client_dir())?;
144            }
145            Ok(client)
146        })
147    }
148
149    /// Client to join a federation
150    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152        cmd!(self, "join-federation", invite_code).run().await?;
153
154        Ok(())
155    }
156
157    /// Client to join a federation with a restore procedure
158    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160        cmd!(
161            self,
162            "restore",
163            "--invite-code",
164            invite_code,
165            "--mnemonic",
166            mnemonic
167        )
168        .run()
169        .await?;
170
171        Ok(())
172    }
173
174    /// Client to join a federation
175    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176        let restored = Self::open_or_create(name)?;
177
178        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179            .as_str()
180            .unwrap()
181            .to_owned();
182
183        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184        cmd!(
185            restored,
186            "restore",
187            "--invite-code",
188            invite_code,
189            "--mnemonic",
190            mnemonic
191        )
192        .run()
193        .await?;
194
195        Ok(restored)
196    }
197
198    /// Create a [`Client`] that starts with a state that is a copy of
199    /// of another one.
200    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201        let new = Client::create(name).await?;
202
203        cmd!(
204            "cp",
205            "-R",
206            self.client_dir().join("client.db").display(),
207            new.client_dir().display()
208        )
209        .run()
210        .await?;
211
212        Ok(new)
213    }
214
215    pub async fn balance(&self) -> Result<u64> {
216        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217            .as_u64()
218            .unwrap())
219    }
220
221    /// Waits for the client balance to reach at least `min_balance_msat`.
222    pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
223        loop {
224            cmd!(self, "dev", "wait", "3").out_json().await?;
225
226            let balance = self.balance().await?;
227            if balance >= min_balance_msat {
228                return Ok(());
229            }
230
231            info!(
232                target: LOG_DEVIMINT,
233                balance,
234                min_balance_msat,
235                "Waiting for client balance to reach minimum"
236            );
237        }
238    }
239
240    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
241        if crate::util::supports_wallet_v2() {
242            let address = cmd!(self, "module", "walletv2", "receive")
243                .out_json()
244                .await?;
245            // Walletv2 auto-claims deposits, no operation_id needed
246            Ok((address.as_str().unwrap().to_string(), String::new()))
247        } else {
248            let deposit = cmd!(self, "deposit-address").out_json().await?;
249            Ok((
250                deposit["address"].as_str().unwrap().to_string(),
251                deposit["operation_id"].as_str().unwrap().to_string(),
252            ))
253        }
254    }
255
256    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
257        cmd!(self, "await-deposit", operation_id).run().await
258    }
259
260    pub fn cmd(&self) -> Command {
261        cmd!(
262            crate::util::get_fedimint_cli_path(),
263            format!("--data-dir={}", self.client_dir().display())
264        )
265    }
266
267    pub fn get_name(&self) -> &str {
268        &self.name
269    }
270
271    /// Returns the current consensus session count
272    pub async fn get_session_count(&self) -> Result<u64> {
273        cmd!(self, "dev", "session-count").out_json().await?["count"]
274            .as_u64()
275            .context("count field wasn't a number")
276    }
277
278    /// Returns once all active state machines complete
279    pub async fn wait_complete(&self) -> Result<()> {
280        cmd!(self, "dev", "wait-complete").run().await
281    }
282
283    /// Returns once the current session completes
284    pub async fn wait_session(&self) -> anyhow::Result<()> {
285        info!("Waiting for a new session");
286        let session_count = self.get_session_count().await?;
287        self.wait_session_outcome(session_count).await?;
288        Ok(())
289    }
290
291    /// Returns once the provided session count completes
292    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
293        let timeout = {
294            let current_session_count = self.get_session_count().await?;
295            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
296            let session_duration_seconds = 180;
297            Duration::from_secs(sessions_to_wait * session_duration_seconds)
298        };
299
300        let start = Instant::now();
301        poll_with_timeout("Waiting for a new session", timeout, || async {
302            info!("Awaiting session outcome {session_count}");
303            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
304                .run()
305                .await
306            {
307                Err(e) => Err(ControlFlow::Continue(e)),
308                Ok(()) => Ok(()),
309            }
310        })
311        .await?;
312
313        let session_found_in = start.elapsed();
314        info!("session found in {session_found_in:?}");
315        Ok(())
316    }
317}
318
319impl Federation {
320    pub async fn new(
321        process_mgr: &ProcessManager,
322        bitcoind: Bitcoind,
323        skip_setup: bool,
324        pre_dkg: bool,
325        // Which of the pre-allocated federations to use (most tests just use single `0` one)
326        fed_index: usize,
327        federation_name: String,
328    ) -> Result<Self> {
329        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
330        let mut members = BTreeMap::new();
331        let mut peer_to_env_vars_map = BTreeMap::new();
332
333        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
334        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
335
336        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
337        for peer_id in num_peers.peer_ids() {
338            let peer_env_vars = vars::Fedimintd::init(
339                &process_mgr.globals,
340                federation_name.clone(),
341                peer_id,
342                process_mgr
343                    .globals
344                    .fedimintd_overrides
345                    .peer_expect(fed_index, peer_id),
346            )
347            .await?;
348            members.insert(
349                peer_id.to_usize(),
350                Fedimintd::new(
351                    process_mgr,
352                    bitcoind.clone(),
353                    peer_id.to_usize(),
354                    &peer_env_vars,
355                    federation_name.clone(),
356                )
357                .await?,
358            );
359            let admin_client = DynGlobalApi::new_admin_setup(
360                connectors.clone(),
361                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
362                // TODO: will need it somewhere
363                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
364            )?;
365            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
366            admin_clients.insert(peer_id, admin_client);
367            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
368        }
369
370        if !skip_setup && !pre_dkg {
371            // we don't guarantee backwards-compatibility for dkg, so we use the
372            // fedimint-cli version that matches fedimintd
373            let (original_fedimint_cli_path, original_fm_mint_client) =
374                crate::util::use_matching_fedimint_cli_for_dkg().await?;
375
376            run_cli_dkg_v2(api_endpoints).await?;
377
378            // we're done with dkg, so we can reset the fedimint-cli version
379            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
380
381            // move configs to config directory
382            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
383            let invite_code_filename_original = "invite-code";
384
385            for peer_env_vars in peer_to_env_vars_map.values() {
386                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
387
388                let invite_code = poll_simple("awaiting-invite-code", || async {
389                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
390                    tokio::fs::read_to_string(&path)
391                        .await
392                        .with_context(|| format!("Awaiting invite code file: {path}"))
393                })
394                .await
395                .context("Awaiting invite code file")?;
396
397                download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
398                    .await?;
399            }
400
401            // copy over invite-code file to client directory
402            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
403
404            tokio::fs::copy(
405                format!("{peer_data_dir}/{invite_code_filename_original}"),
406                format!("{client_dir}/{invite_code_filename_original}"),
407            )
408            .await
409            .context("copying invite-code file")?;
410
411            // move each guardian's invite-code file to the client's directory
412            // appending the peer id to the end
413            for (index, peer_env_vars) in &peer_to_env_vars_map {
414                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
415
416                let invite_code_filename_indexed =
417                    format!("{invite_code_filename_original}-{index}");
418                tokio::fs::rename(
419                    format!("{peer_data_dir}/{invite_code_filename_original}"),
420                    format!("{client_dir}/{invite_code_filename_indexed}"),
421                )
422                .await
423                .context("moving invite-code file")?;
424            }
425
426            debug!("Moved invite-code files to client data directory");
427        }
428
429        let client = JitTryAnyhow::new_try({
430            move || async move {
431                let client = Client::open_or_create(federation_name.as_str())?;
432                let invite_code = Self::invite_code_static()?;
433                if !skip_setup && !pre_dkg {
434                    cmd!(client, "join-federation", invite_code).run().await?;
435                }
436                Ok(client)
437            }
438        });
439
440        Ok(Self {
441            members,
442            vars: peer_to_env_vars_map,
443            bitcoind,
444            client,
445            connectors,
446        })
447    }
448
449    pub fn client_config(&self) -> Result<ClientConfig> {
450        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
451        load_from_file(&cfg_path)
452    }
453
454    /// Get the module instance ID for a given module kind
455    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
456        self.client_config()?
457            .modules
458            .iter()
459            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
460            .with_context(|| format!("Module kind {kind} not found"))
461    }
462
463    pub fn module_client_config<M: ClientModule>(
464        &self,
465    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
466        self.client_config()?
467            .modules
468            .iter()
469            .find_map(|(module_instance_id, module_cfg)| {
470                if module_cfg.kind == M::kind() {
471                    let decoders = ModuleDecoderRegistry::new(vec![(
472                        *module_instance_id,
473                        M::kind(),
474                        M::decoder(),
475                    )]);
476                    Some(
477                        module_cfg
478                            .config
479                            .clone()
480                            .redecode_raw(&decoders)
481                            .expect("Decoding client cfg failed")
482                            .expect_decoded_ref()
483                            .as_any()
484                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
485                            .cloned()
486                            .context("Cast to module config failed"),
487                    )
488                } else {
489                    None
490                }
491            })
492            .transpose()
493    }
494
495    pub fn deposit_fees(&self) -> Result<Amount> {
496        if crate::util::supports_wallet_v2() {
497            Ok(self
498                .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
499                .context("No walletv2 module found")?
500                .fee_consensus
501                .base)
502        } else {
503            Ok(self
504                .module_client_config::<WalletClientModule>()?
505                .context("No wallet module found")?
506                .fee_consensus
507                .peg_in_abs)
508        }
509    }
510
511    /// Read the invite code from the client data dir
512    pub fn invite_code(&self) -> Result<String> {
513        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
514        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
515        Ok(invite_code)
516    }
517
518    pub fn invite_code_static() -> Result<String> {
519        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
520        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
521        Ok(invite_code)
522    }
523    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
524        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
525        let name = format!("invite-code-{peer_id}");
526        let invite_code = fs::read_to_string(data_dir.join(name))?;
527        Ok(invite_code)
528    }
529
530    /// Built-in, default, internal [`Client`]
531    ///
532    /// We should be moving away from using it for anything.
533    pub async fn internal_client(&self) -> Result<&Client> {
534        self.client
535            .get_try()
536            .await
537            .context("Internal client joining Federation")
538    }
539
540    /// New [`Client`] that already joined `self`
541    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
542        let client = Client::create(name).await?;
543        client.join_federation(self.invite_code()?).await?;
544        Ok(client)
545    }
546
547    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
548        if self.members.contains_key(&peer) {
549            bail!("fedimintd-{peer} already running");
550        }
551        self.members.insert(
552            peer,
553            Fedimintd::new(
554                process_mgr,
555                self.bitcoind.clone(),
556                peer,
557                &self.vars[&peer],
558                "default".to_string(),
559            )
560            .await?,
561        );
562        Ok(())
563    }
564
565    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
566        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
567            bail!("fedimintd-{peer_id} does not exist");
568        };
569        fedimintd.terminate().await?;
570        Ok(())
571    }
572
573    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
574        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
575            bail!("fedimintd-{peer_id} does not exist");
576        };
577        fedimintd.await_terminated().await?;
578        self.members.remove(&peer_id);
579        Ok(())
580    }
581
582    /// Starts all peers not currently running.
583    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
584        info!("starting all servers");
585        let fed_size = process_mgr.globals.FM_FED_SIZE;
586        for peer_id in 0..fed_size {
587            if self.members.contains_key(&peer_id) {
588                continue;
589            }
590            self.start_server(process_mgr, peer_id).await?;
591        }
592        self.await_all_peers().await?;
593        Ok(())
594    }
595
596    /// Terminates all running peers.
597    pub async fn terminate_all_servers(&mut self) -> Result<()> {
598        info!("terminating all servers");
599        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
600        for peer_id in running_peer_ids {
601            self.terminate_server(peer_id).await?;
602        }
603        Ok(())
604    }
605
606    /// Coordinated shutdown of all peers that restart using the provided
607    /// `bin_path`. Returns `Ok()` once all peers are online.
608    ///
609    /// Staggering the restart more closely simulates upgrades in the wild.
610    pub async fn restart_all_staggered_with_bin(
611        &mut self,
612        process_mgr: &ProcessManager,
613        bin_path: &PathBuf,
614    ) -> Result<()> {
615        let fed_size = process_mgr.globals.FM_FED_SIZE;
616
617        // ensure all peers are online
618        self.start_all_servers(process_mgr).await?;
619
620        // staggered shutdown of peers
621        while self.num_members() > 0 {
622            self.terminate_server(self.num_members() - 1).await?;
623            if self.num_members() > 0 {
624                fedimint_core::task::sleep_in_test(
625                    "waiting to shutdown remaining peers",
626                    Duration::from_secs(10),
627                )
628                .await;
629            }
630        }
631
632        // TODO: Audit that the environment access only happens in single-threaded code.
633        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634
635        // staggered restart
636        for peer_id in 0..fed_size {
637            self.start_server(process_mgr, peer_id).await?;
638            if peer_id < fed_size - 1 {
639                fedimint_core::task::sleep_in_test(
640                    "waiting to restart remaining peers",
641                    Duration::from_secs(10),
642                )
643                .await;
644            }
645        }
646
647        self.await_all_peers().await?;
648
649        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
650        info!("upgraded fedimintd to version: {}", fedimintd_version);
651        Ok(())
652    }
653
654    pub async fn restart_all_with_bin(
655        &mut self,
656        process_mgr: &ProcessManager,
657        bin_path: &PathBuf,
658    ) -> Result<()> {
659        // get the version we're upgrading to, temporarily updating the fedimintd path
660        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
661        // TODO: Audit that the environment access only happens in single-threaded code.
662        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
663        // TODO: Audit that the environment access only happens in single-threaded code.
664        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
665
666        self.restart_all_staggered_with_bin(process_mgr, bin_path)
667            .await
668    }
669
670    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
671        let fed_size = process_mgr.globals.FM_FED_SIZE;
672        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
673        anyhow::ensure!(
674            fed_size > 3 * offline_nodes,
675            "too many offline nodes ({offline_nodes}) to reach consensus"
676        );
677
678        while self.num_members() > fed_size - offline_nodes {
679            self.terminate_server(self.num_members() - 1).await?;
680        }
681
682        if offline_nodes > 0 {
683            info!(fed_size, offline_nodes, "federation is degraded");
684        }
685        Ok(())
686    }
687
688    pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
689        self.bitcoind.send_to(address, amount).await?;
690
691        self.bitcoind.mine_blocks(21).await?;
692
693        Ok(())
694    }
695
696    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
697        let deposit_fees_msat = self.deposit_fees()?.msats;
698        assert_eq!(
699            deposit_fees_msat % 1000,
700            0,
701            "Deposit fees expected to be whole sats in test suite"
702        );
703        let deposit_fees = deposit_fees_msat / 1000;
704        info!(amount, deposit_fees, "Pegging-in client funds");
705
706        let (address, operation_id) = client.get_deposit_addr().await?;
707
708        self.bitcoind
709            .send_to(address, amount + deposit_fees)
710            .await?;
711        self.bitcoind.mine_blocks(21).await?;
712
713        Ok(operation_id)
714    }
715
716    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
717        // For walletv2, we need to capture the initial balance and wait for it to
718        // increase since there is no state machine - deposits are auto-claimed
719        let initial_balance = if crate::util::supports_wallet_v2() {
720            Some(client.balance().await?)
721        } else {
722            None
723        };
724
725        let operation_id = self.pegin_client_no_wait(amount, client).await?;
726
727        if let Some(initial) = initial_balance {
728            // Walletv2: wait for balance to increase. We expect slightly less than
729            // `amount` due to mint module fees when creating ecash notes.
730            let expected_balance = initial + (amount * 1000 * 9 / 10);
731            client.await_balance(expected_balance).await?;
732        } else {
733            client.await_deposit(&operation_id).await?;
734        }
735        Ok(())
736    }
737
738    /// Initiates multiple peg-ins to the same federation for the set of
739    /// gateways to save on mining blocks in parallel.
740    pub async fn pegin_gateways(
741        &self,
742        amount: u64,
743        gateways: Vec<&super::gatewayd::Gatewayd>,
744    ) -> Result<()> {
745        let deposit_fees_msat = self.deposit_fees()?.msats;
746        assert_eq!(
747            deposit_fees_msat % 1000,
748            0,
749            "Deposit fees expected to be whole sats in test suite"
750        );
751        let deposit_fees = deposit_fees_msat / 1000;
752        info!(amount, deposit_fees, "Pegging-in gateway funds");
753        let fed_id = self.calculate_federation_id();
754        for gw in gateways.clone() {
755            let pegin_addr = gw.client().get_pegin_addr(&fed_id).await?;
756            self.bitcoind
757                .send_to(pegin_addr, amount + deposit_fees)
758                .await?;
759        }
760
761        self.bitcoind.mine_blocks(21).await?;
762        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
763        try_join_all(gateways.into_iter().map(|gw| {
764            poll("gateway pegin", || async {
765                let gw_info = gw
766                    .client()
767                    .get_info()
768                    .await
769                    .map_err(ControlFlow::Continue)?;
770
771                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
772                    gw_info["block_height"]
773                        .as_u64()
774                        .expect("Could not parse block height")
775                } else {
776                    gw_info["lightning_info"]["connected"]["block_height"]
777                        .as_u64()
778                        .expect("Could not parse block height")
779                };
780
781                if bitcoind_block_height != block_height {
782                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
783                        "gateway block height is not synced"
784                    )));
785                }
786
787                let gateway_balance = gw
788                    .client()
789                    .ecash_balance(fed_id.clone())
790                    .await
791                    .map_err(ControlFlow::Continue)?;
792                poll_almost_equal!(gateway_balance, amount * 1000)
793            })
794        }))
795        .await?;
796
797        Ok(())
798    }
799
800    /// Initiates multiple peg-outs from the same federation for the set of
801    /// gateways to save on mining blocks in parallel.
802    pub async fn pegout_gateways(
803        &self,
804        amount: u64,
805        gateways: Vec<&super::gatewayd::Gatewayd>,
806    ) -> Result<()> {
807        info!(amount, "Pegging-out gateway funds");
808        let fed_id = self.calculate_federation_id();
809        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
810        for gw in gateways.clone() {
811            let prev_fed_ecash_balance = gw
812                .client()
813                .get_balances()
814                .await?
815                .ecash_balances
816                .into_iter()
817                .find(|fed| fed.federation_id.to_string() == fed_id)
818                .expect("Gateway has not joined federation")
819                .ecash_balance_msats;
820
821            let pegout_address = self.bitcoind.get_new_address().await?;
822            let response = gw
823                .client()
824                .pegout(fed_id.clone(), amount, pegout_address)
825                .await?;
826            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
827        }
828        self.bitcoind.mine_blocks(21).await?;
829
830        try_join_all(
831            peg_outs
832                .values()
833                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
834        )
835        .await?;
836
837        for gw in gateways.clone() {
838            let after_fed_ecash_balance = gw
839                .client()
840                .get_balances()
841                .await?
842                .ecash_balances
843                .into_iter()
844                .find(|fed| fed.federation_id.to_string() == fed_id)
845                .expect("Gateway has not joined federation")
846                .ecash_balance_msats;
847
848            let ln_type = gw.ln.ln_type();
849            let prev_balance = peg_outs
850                .get(&ln_type)
851                .expect("peg out does not exist")
852                .0
853                .msats;
854            let fees = peg_outs
855                .get(&ln_type)
856                .expect("peg out does not exist")
857                .1
858                .fees;
859            let total_fee = fees.amount().to_sat() * 1000;
860            // Walletv2 charges a module fee on top of the on-chain fee:
861            // 100 sats base + 1% of amount (amount is in msats)
862            let tolerance = if crate::util::supports_wallet_v2() {
863                let amount_sats = amount / 1000;
864                let module_fee_sats = 100 + amount_sats / 100;
865                module_fee_sats * 1000 + 2000
866            } else if crate::util::supports_mint_v2() {
867                4000
868            } else {
869                2000
870            };
871            crate::util::almost_equal(
872                after_fed_ecash_balance.msats,
873                prev_balance - amount - total_fee,
874                tolerance,
875            )
876            .map_err(|e| {
877                anyhow::anyhow!(
878                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
879                    e
880                )
881            })?;
882        }
883
884        Ok(())
885    }
886
887    pub fn calculate_federation_id(&self) -> String {
888        self.client_config()
889            .unwrap()
890            .global
891            .calculate_federation_id()
892            .to_string()
893    }
894
895    pub async fn await_block_sync(&self) -> Result<u64> {
896        let finality_delay = self.get_finality_delay()?;
897        let block_count = self.bitcoind.get_block_count().await?;
898        let expected = block_count.saturating_sub(finality_delay.into());
899
900        if crate::util::supports_wallet_v2() {
901            // Walletv2 doesn't have `dev wait-block-count`, poll using CLI instead
902            let client = self.internal_client().await?;
903            loop {
904                let value = cmd!(client, "module", "walletv2", "info", "block-count")
905                    .out_json()
906                    .await?;
907                let current: u64 = serde_json::from_value(value)?;
908                if current >= expected {
909                    break;
910                }
911                fedimint_core::task::sleep_in_test(
912                    format!("Waiting for consensus block count to reach {expected}"),
913                    std::time::Duration::from_secs(1),
914                )
915                .await;
916            }
917        } else {
918            cmd!(
919                self.internal_client().await?,
920                "dev",
921                "wait-block-count",
922                expected
923            )
924            .run()
925            .await?;
926        }
927
928        Ok(expected)
929    }
930
931    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
932        // Walletv2 uses a constant finality delay
933        if crate::util::supports_wallet_v2() {
934            return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
935        }
936
937        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
938        let client_config = &self.client_config()?;
939        let wallet_cfg = client_config
940            .modules
941            .get(&wallet_instance_id)
942            .context("wallet module not found")?
943            .clone()
944            .redecode_raw(&ModuleDecoderRegistry::new([(
945                wallet_instance_id,
946                fedimint_wallet_client::KIND,
947                fedimint_wallet_client::WalletModuleTypes::decoder(),
948            )]))?;
949        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
950
951        let finality_delay = wallet_cfg.finality_delay;
952        Ok(finality_delay)
953    }
954
955    pub async fn await_gateways_registered(&self) -> Result<()> {
956        let start_time = Instant::now();
957        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
958
959        poll("gateways registered", || async {
960            let num_gateways = cmd!(
961                self.internal_client()
962                    .await
963                    .map_err(ControlFlow::Continue)?,
964                "list-gateways"
965            )
966            .out_json()
967            .await
968            .map_err(ControlFlow::Continue)?
969            .as_array()
970            .context("invalid output")
971            .map_err(ControlFlow::Break)?
972            .len();
973
974            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
975            // server, and once for the iroh endpoint.
976            let expected_gateways =
977                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
978                    1
979                } else {
980                    2
981                };
982
983            poll_eq!(num_gateways, expected_gateways)
984        })
985        .await?;
986        debug!(target: LOG_DEVIMINT,
987            elapsed_ms = %start_time.elapsed().as_millis(),
988            "Gateways registered");
989        Ok(())
990    }
991
992    pub async fn await_all_peers(&self) -> Result<()> {
993        let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
994            ("walletv2", "consensus_block_count")
995        } else {
996            ("wallet", "block_count")
997        };
998        poll("Waiting for all peers to be online", || async {
999            cmd!(
1000                self.internal_client()
1001                    .await
1002                    .map_err(ControlFlow::Continue)?,
1003                "dev",
1004                "api",
1005                "--module",
1006                module_name,
1007                endpoint
1008            )
1009            .run()
1010            .await
1011            .map_err(ControlFlow::Continue)?;
1012            Ok(())
1013        })
1014        .await
1015    }
1016
1017    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1018        poll("Waiting for all peers to be online", || async {
1019            cmd!(
1020                self.internal_client()
1021                    .await
1022                    .map_err(ControlFlow::Continue)?,
1023                "dev",
1024                "api",
1025                "--peer-id",
1026                peer_id,
1027                "--module",
1028                "wallet",
1029                "block_count"
1030            )
1031            .run()
1032            .await
1033            .map_err(ControlFlow::Continue)?;
1034            Ok(())
1035        })
1036        .await
1037    }
1038
1039    /// Mines enough blocks to finalize mempool transactions, then waits for
1040    /// federation to process finalized blocks.
1041    ///
1042    /// ex:
1043    ///   tx submitted to mempool at height 100
1044    ///   finality delay = 10
1045    ///   mine finality delay blocks + 1 => new height 111
1046    ///   tx included in block 101
1047    ///   highest finalized height = 111 - 10 = 101
1048    pub async fn finalize_mempool_tx(&self) -> Result<()> {
1049        let finality_delay = self.get_finality_delay()?;
1050        let blocks_to_mine = finality_delay + 1;
1051        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1052        self.await_block_sync().await?;
1053        Ok(())
1054    }
1055
1056    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1057        self.bitcoind.mine_blocks(blocks).await?;
1058        self.await_block_sync().await?;
1059        Ok(())
1060    }
1061
1062    pub fn num_members(&self) -> usize {
1063        self.members.len()
1064    }
1065
1066    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1067        self.members
1068            .keys()
1069            .map(|&peer_id| PeerId::from(peer_id as u16))
1070    }
1071}
1072
1073#[derive(Clone)]
1074pub struct Fedimintd {
1075    _bitcoind: Bitcoind,
1076    process: ProcessHandle,
1077}
1078
1079impl Fedimintd {
1080    pub async fn new(
1081        process_mgr: &ProcessManager,
1082        bitcoind: Bitcoind,
1083        peer_id: usize,
1084        env: &vars::Fedimintd,
1085        fed_name: String,
1086    ) -> Result<Self> {
1087        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1088        let process = process_mgr
1089            .spawn_daemon(
1090                &format!("fedimintd-{fed_name}-{peer_id}"),
1091                cmd!(FedimintdCmd).envs(env.vars()),
1092            )
1093            .await?;
1094
1095        Ok(Self {
1096            _bitcoind: bitcoind,
1097            process,
1098        })
1099    }
1100
1101    pub async fn terminate(self) -> Result<()> {
1102        self.process.terminate().await
1103    }
1104
1105    pub async fn await_terminated(&self) -> Result<()> {
1106        self.process.await_terminated().await
1107    }
1108}
1109
1110pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1111    // Parallelize setup status checks
1112    let status_futures = endpoints.values().map(|endpoint| {
1113        let endpoint = endpoint.clone();
1114        async move {
1115            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1116                crate::util::FedimintCli
1117                    .setup_status(&API_AUTH, &endpoint)
1118                    .await
1119                    .map_err(ControlFlow::Continue)
1120            })
1121            .await
1122            .unwrap();
1123
1124            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1125        }
1126    });
1127    join_all(status_futures).await;
1128
1129    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1130
1131    // Parallelize setting local parameters
1132    // --federation-size is only supported by fedimint-cli >= 0.11.0-alpha
1133    let federation_size =
1134        if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1135            Some(endpoints.len())
1136        } else {
1137            None
1138        };
1139    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1140        let peer = *peer;
1141        let endpoint = endpoint.clone();
1142        async move {
1143            let info = if peer.to_usize() == 0 {
1144                crate::util::FedimintCli
1145                    .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1146                    .await
1147            } else {
1148                crate::util::FedimintCli
1149                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1150                    .await
1151            };
1152            info.map(|i| (peer, i))
1153        }
1154    });
1155    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1156        .await?
1157        .into_iter()
1158        .collect();
1159
1160    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1161
1162    // Parallelize peer addition - flatten the nested loop into a single parallel
1163    // operation
1164    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1165        endpoints
1166            .iter()
1167            .filter(move |(p, _)| *p != peer)
1168            .map(move |(_, endpoint)| {
1169                let endpoint = endpoint.clone();
1170                let info = info.clone();
1171                async move {
1172                    crate::util::FedimintCli
1173                        .add_peer(&info, &API_AUTH, &endpoint)
1174                        .await
1175                }
1176            })
1177    });
1178    try_join_all(add_peer_futures).await?;
1179
1180    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1181
1182    // Parallelize DKG start
1183    let start_dkg_futures = endpoints.values().map(|endpoint| {
1184        let endpoint = endpoint.clone();
1185        async move {
1186            crate::util::FedimintCli
1187                .start_dkg(&API_AUTH, &endpoint)
1188                .await
1189        }
1190    });
1191    try_join_all(start_dkg_futures).await?;
1192
1193    Ok(())
1194}