Skip to main content

devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
42use crate::{poll_almost_equal, poll_eq, vars};
43
44// TODO: Are we still using the 3rd port for anything?
45/// Number of ports we allocate for every `fedimintd` instance
46pub const PORTS_PER_FEDIMINTD: u16 = 4;
47/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58    // client is only for internal use, use cli commands instead
59    pub members: BTreeMap<usize, Fedimintd>,
60    pub vars: BTreeMap<usize, vars::Fedimintd>,
61    pub bitcoind: Bitcoind,
62
63    /// Built in [`Client`], already joined
64    client: JitTryAnyhow<Client>,
65    #[allow(dead_code)] // Will need it later, maybe
66    connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70    fn drop(&mut self) {
71        block_in_place(|| {
72            block_on(async {
73                let mut set = JoinSet::new();
74
75                while let Some((_id, fedimintd)) = self.members.pop_first() {
76                    set.spawn(async { drop(fedimintd) });
77                }
78                while (set.join_next().await).is_some() {}
79            });
80        });
81    }
82}
83/// `fedimint-cli` instance (basically path with client state: config + db)
84#[derive(Clone)]
85pub struct Client {
86    name: String,
87}
88
89impl Client {
90    fn clients_dir() -> PathBuf {
91        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92            .expect("FM_DATA_DIR_ENV not set")
93            .parse()
94            .expect("FM_DATA_DIR_ENV invalid");
95        data_dir.join("clients")
96    }
97
98    fn client_dir(&self) -> PathBuf {
99        Self::clients_dir().join(&self.name)
100    }
101
102    pub fn client_name_lock(name: &str) -> Result<FileLock> {
103        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104        let file_lock = std::fs::OpenOptions::new()
105            .write(true)
106            .create(true)
107            .truncate(true)
108            .open(&lock_path)
109            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111        fs_lock::FileLock::new_exclusive(file_lock)
112            .with_context(|| format!("Failed to lock {}", lock_path.display()))
113    }
114
115    /// Create a [`Client`] that starts with a fresh state.
116    pub async fn create(name: impl ToString) -> Result<Client> {
117        let name = name.to_string();
118        spawn_blocking(move || {
119            let _lock = Self::client_name_lock(&name);
120            for i in 0u64.. {
121                let client = Self {
122                    name: format!("{name}-{i}"),
123                };
124
125                if !client.client_dir().exists() {
126                    std::fs::create_dir_all(client.client_dir())?;
127                    return Ok(client);
128                }
129            }
130            unreachable!()
131        })
132        .await?
133    }
134
135    /// Open or create a [`Client`] that starts with a fresh state.
136    pub fn open_or_create(name: &str) -> Result<Client> {
137        block_in_place(|| {
138            let _lock = Self::client_name_lock(name);
139            let client = Self {
140                name: format!("{name}-0"),
141            };
142            if !client.client_dir().exists() {
143                std::fs::create_dir_all(client.client_dir())?;
144            }
145            Ok(client)
146        })
147    }
148
149    /// Client to join a federation
150    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152        cmd!(self, "join-federation", invite_code).run().await?;
153
154        Ok(())
155    }
156
157    /// Client to join a federation with a restore procedure
158    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160        cmd!(
161            self,
162            "restore",
163            "--invite-code",
164            invite_code,
165            "--mnemonic",
166            mnemonic
167        )
168        .run()
169        .await?;
170
171        Ok(())
172    }
173
174    /// Client to join a federation
175    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176        let restored = Self::open_or_create(name)?;
177
178        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179            .as_str()
180            .unwrap()
181            .to_owned();
182
183        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184        cmd!(
185            restored,
186            "restore",
187            "--invite-code",
188            invite_code,
189            "--mnemonic",
190            mnemonic
191        )
192        .run()
193        .await?;
194
195        Ok(restored)
196    }
197
198    /// Create a [`Client`] that starts with a state that is a copy of
199    /// of another one.
200    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201        let new = Client::create(name).await?;
202
203        cmd!(
204            "cp",
205            "-R",
206            self.client_dir().join("client.db").display(),
207            new.client_dir().display()
208        )
209        .run()
210        .await?;
211
212        Ok(new)
213    }
214
215    pub async fn balance(&self) -> Result<u64> {
216        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217            .as_u64()
218            .unwrap())
219    }
220
221    /// Waits for the client balance to reach at least `min_balance_msat`.
222    pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
223        loop {
224            cmd!(self, "dev", "wait", "3").out_json().await?;
225
226            let balance = self.balance().await?;
227            if balance >= min_balance_msat {
228                return Ok(());
229            }
230
231            info!(
232                target: LOG_DEVIMINT,
233                balance,
234                min_balance_msat,
235                "Waiting for client balance to reach minimum"
236            );
237        }
238    }
239
240    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
241        if crate::util::supports_wallet_v2() {
242            let address = cmd!(self, "module", "walletv2", "receive")
243                .out_json()
244                .await?;
245            // Walletv2 auto-claims deposits, no operation_id needed
246            Ok((address.as_str().unwrap().to_string(), String::new()))
247        } else {
248            let deposit = cmd!(self, "deposit-address").out_json().await?;
249            Ok((
250                deposit["address"].as_str().unwrap().to_string(),
251                deposit["operation_id"].as_str().unwrap().to_string(),
252            ))
253        }
254    }
255
256    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
257        cmd!(self, "await-deposit", operation_id).run().await
258    }
259
260    pub fn cmd(&self) -> Command {
261        cmd!(
262            crate::util::get_fedimint_cli_path(),
263            format!("--data-dir={}", self.client_dir().display())
264        )
265    }
266
267    pub fn get_name(&self) -> &str {
268        &self.name
269    }
270
271    /// Returns the current consensus session count
272    pub async fn get_session_count(&self) -> Result<u64> {
273        cmd!(self, "dev", "session-count").out_json().await?["count"]
274            .as_u64()
275            .context("count field wasn't a number")
276    }
277
278    /// Returns once all active state machines complete
279    pub async fn wait_complete(&self) -> Result<()> {
280        cmd!(self, "dev", "wait-complete").run().await
281    }
282
283    /// Returns once the current session completes
284    pub async fn wait_session(&self) -> anyhow::Result<()> {
285        info!("Waiting for a new session");
286        let session_count = self.get_session_count().await?;
287        self.wait_session_outcome(session_count).await?;
288        Ok(())
289    }
290
291    /// Returns once the provided session count completes
292    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
293        let timeout = {
294            let current_session_count = self.get_session_count().await?;
295            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
296            let session_duration_seconds = 180;
297            Duration::from_secs(sessions_to_wait * session_duration_seconds)
298        };
299
300        let start = Instant::now();
301        poll_with_timeout("Waiting for a new session", timeout, || async {
302            info!("Awaiting session outcome {session_count}");
303            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
304                .run()
305                .await
306            {
307                Err(e) => Err(ControlFlow::Continue(e)),
308                Ok(()) => Ok(()),
309            }
310        })
311        .await?;
312
313        let session_found_in = start.elapsed();
314        info!("session found in {session_found_in:?}");
315        Ok(())
316    }
317}
318
319impl Federation {
320    pub async fn new(
321        process_mgr: &ProcessManager,
322        bitcoind: Bitcoind,
323        skip_setup: bool,
324        pre_dkg: bool,
325        // Which of the pre-allocated federations to use (most tests just use single `0` one)
326        fed_index: usize,
327        federation_name: String,
328    ) -> Result<Self> {
329        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
330        let mut members = BTreeMap::new();
331        let mut peer_to_env_vars_map = BTreeMap::new();
332
333        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
334        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
335
336        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
337        for peer_id in num_peers.peer_ids() {
338            let peer_env_vars = vars::Fedimintd::init(
339                &process_mgr.globals,
340                federation_name.clone(),
341                peer_id,
342                process_mgr
343                    .globals
344                    .fedimintd_overrides
345                    .peer_expect(fed_index, peer_id),
346            )
347            .await?;
348            members.insert(
349                peer_id.to_usize(),
350                Fedimintd::new(
351                    process_mgr,
352                    bitcoind.clone(),
353                    peer_id.to_usize(),
354                    &peer_env_vars,
355                    federation_name.clone(),
356                )
357                .await?,
358            );
359            let admin_client = DynGlobalApi::new_admin_setup(
360                connectors.clone(),
361                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
362                // TODO: will need it somewhere
363                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
364            )?;
365            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
366            admin_clients.insert(peer_id, admin_client);
367            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
368        }
369
370        if !skip_setup && !pre_dkg {
371            // we don't guarantee backwards-compatibility for dkg, so we use the
372            // fedimint-cli version that matches fedimintd
373            let (original_fedimint_cli_path, original_fm_mint_client) =
374                crate::util::use_matching_fedimint_cli_for_dkg().await?;
375
376            run_cli_dkg_v2(api_endpoints).await?;
377
378            // we're done with dkg, so we can reset the fedimint-cli version
379            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
380
381            // move configs to config directory
382            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
383            let invite_code_filename_original = "invite-code";
384
385            for peer_env_vars in peer_to_env_vars_map.values() {
386                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
387
388                let invite_code = poll_simple("awaiting-invite-code", || async {
389                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
390                    tokio::fs::read_to_string(&path)
391                        .await
392                        .with_context(|| format!("Awaiting invite code file: {path}"))
393                })
394                .await
395                .context("Awaiting invite code file")?;
396
397                download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
398                    .await?;
399            }
400
401            // copy over invite-code file to client directory
402            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
403
404            tokio::fs::copy(
405                format!("{peer_data_dir}/{invite_code_filename_original}"),
406                format!("{client_dir}/{invite_code_filename_original}"),
407            )
408            .await
409            .context("copying invite-code file")?;
410
411            // move each guardian's invite-code file to the client's directory
412            // appending the peer id to the end
413            for (index, peer_env_vars) in &peer_to_env_vars_map {
414                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
415
416                let invite_code_filename_indexed =
417                    format!("{invite_code_filename_original}-{index}");
418                tokio::fs::rename(
419                    format!("{peer_data_dir}/{invite_code_filename_original}"),
420                    format!("{client_dir}/{invite_code_filename_indexed}"),
421                )
422                .await
423                .context("moving invite-code file")?;
424            }
425
426            debug!("Moved invite-code files to client data directory");
427        }
428
429        let client = JitTryAnyhow::new_try({
430            move || async move {
431                let client = Client::open_or_create(federation_name.as_str())?;
432                let invite_code = Self::invite_code_static()?;
433                if !skip_setup && !pre_dkg {
434                    cmd!(client, "join-federation", invite_code).run().await?;
435                }
436                Ok(client)
437            }
438        });
439
440        Ok(Self {
441            members,
442            vars: peer_to_env_vars_map,
443            bitcoind,
444            client,
445            connectors,
446        })
447    }
448
449    pub fn client_config(&self) -> Result<ClientConfig> {
450        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
451        load_from_file(&cfg_path)
452    }
453
454    /// Get the module instance ID for a given module kind
455    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
456        self.client_config()?
457            .modules
458            .iter()
459            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
460            .with_context(|| format!("Module kind {kind} not found"))
461    }
462
463    pub fn module_client_config<M: ClientModule>(
464        &self,
465    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
466        self.client_config()?
467            .modules
468            .iter()
469            .find_map(|(module_instance_id, module_cfg)| {
470                if module_cfg.kind == M::kind() {
471                    let decoders = ModuleDecoderRegistry::new(vec![(
472                        *module_instance_id,
473                        M::kind(),
474                        M::decoder(),
475                    )]);
476                    Some(
477                        module_cfg
478                            .config
479                            .clone()
480                            .redecode_raw(&decoders)
481                            .expect("Decoding client cfg failed")
482                            .expect_decoded_ref()
483                            .as_any()
484                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
485                            .cloned()
486                            .context("Cast to module config failed"),
487                    )
488                } else {
489                    None
490                }
491            })
492            .transpose()
493    }
494
495    pub fn deposit_fees(&self) -> Result<Amount> {
496        if crate::util::supports_wallet_v2() {
497            Ok(self
498                .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
499                .context("No walletv2 module found")?
500                .fee_consensus
501                .base)
502        } else {
503            Ok(self
504                .module_client_config::<WalletClientModule>()?
505                .context("No wallet module found")?
506                .fee_consensus
507                .peg_in_abs)
508        }
509    }
510
511    /// Read the invite code from the client data dir
512    pub fn invite_code(&self) -> Result<String> {
513        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
514        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
515        Ok(invite_code)
516    }
517
518    pub fn invite_code_static() -> Result<String> {
519        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
520        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
521        Ok(invite_code)
522    }
523    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
524        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
525        let name = format!("invite-code-{peer_id}");
526        let invite_code = fs::read_to_string(data_dir.join(name))?;
527        Ok(invite_code)
528    }
529
530    /// Built-in, default, internal [`Client`]
531    ///
532    /// We should be moving away from using it for anything.
533    pub async fn internal_client(&self) -> Result<&Client> {
534        self.client
535            .get_try()
536            .await
537            .context("Internal client joining Federation")
538    }
539
540    /// New [`Client`] that already joined `self`
541    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
542        let client = Client::create(name).await?;
543        client.join_federation(self.invite_code()?).await?;
544        Ok(client)
545    }
546
547    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
548        if self.members.contains_key(&peer) {
549            bail!("fedimintd-{peer} already running");
550        }
551        self.members.insert(
552            peer,
553            Fedimintd::new(
554                process_mgr,
555                self.bitcoind.clone(),
556                peer,
557                &self.vars[&peer],
558                "default".to_string(),
559            )
560            .await?,
561        );
562        Ok(())
563    }
564
565    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
566        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
567            bail!("fedimintd-{peer_id} does not exist");
568        };
569        fedimintd.terminate().await?;
570        Ok(())
571    }
572
573    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
574        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
575            bail!("fedimintd-{peer_id} does not exist");
576        };
577        fedimintd.await_terminated().await?;
578        self.members.remove(&peer_id);
579        Ok(())
580    }
581
582    /// Starts all peers not currently running.
583    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
584        info!("starting all servers");
585        let fed_size = process_mgr.globals.FM_FED_SIZE;
586        for peer_id in 0..fed_size {
587            if self.members.contains_key(&peer_id) {
588                continue;
589            }
590            self.start_server(process_mgr, peer_id).await?;
591        }
592        self.await_all_peers().await?;
593        Ok(())
594    }
595
596    /// Terminates all running peers.
597    pub async fn terminate_all_servers(&mut self) -> Result<()> {
598        info!("terminating all servers");
599        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
600        for peer_id in running_peer_ids {
601            self.terminate_server(peer_id).await?;
602        }
603        Ok(())
604    }
605
606    /// Coordinated shutdown of all peers that restart using the provided
607    /// `bin_path`. Returns `Ok()` once all peers are online.
608    ///
609    /// Staggering the restart more closely simulates upgrades in the wild.
610    pub async fn restart_all_staggered_with_bin(
611        &mut self,
612        process_mgr: &ProcessManager,
613        bin_path: &PathBuf,
614    ) -> Result<()> {
615        let fed_size = process_mgr.globals.FM_FED_SIZE;
616
617        // ensure all peers are online
618        self.start_all_servers(process_mgr).await?;
619
620        // staggered shutdown of peers
621        while self.num_members() > 0 {
622            self.terminate_server(self.num_members() - 1).await?;
623            if self.num_members() > 0 {
624                fedimint_core::task::sleep_in_test(
625                    "waiting to shutdown remaining peers",
626                    Duration::from_secs(10),
627                )
628                .await;
629            }
630        }
631
632        // TODO: Audit that the environment access only happens in single-threaded code.
633        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634
635        // staggered restart
636        for peer_id in 0..fed_size {
637            self.start_server(process_mgr, peer_id).await?;
638            if peer_id < fed_size - 1 {
639                fedimint_core::task::sleep_in_test(
640                    "waiting to restart remaining peers",
641                    Duration::from_secs(10),
642                )
643                .await;
644            }
645        }
646
647        self.await_all_peers().await?;
648
649        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
650        info!("upgraded fedimintd to version: {}", fedimintd_version);
651        Ok(())
652    }
653
654    pub async fn restart_all_with_bin(
655        &mut self,
656        process_mgr: &ProcessManager,
657        bin_path: &PathBuf,
658    ) -> Result<()> {
659        // get the version we're upgrading to, temporarily updating the fedimintd path
660        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
661        // TODO: Audit that the environment access only happens in single-threaded code.
662        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
663        // TODO: Audit that the environment access only happens in single-threaded code.
664        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
665
666        self.restart_all_staggered_with_bin(process_mgr, bin_path)
667            .await
668    }
669
670    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
671        let fed_size = process_mgr.globals.FM_FED_SIZE;
672        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
673        anyhow::ensure!(
674            fed_size > 3 * offline_nodes,
675            "too many offline nodes ({offline_nodes}) to reach consensus"
676        );
677
678        while self.num_members() > fed_size - offline_nodes {
679            self.terminate_server(self.num_members() - 1).await?;
680        }
681
682        if offline_nodes > 0 {
683            info!(fed_size, offline_nodes, "federation is degraded");
684        }
685        Ok(())
686    }
687
688    pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
689        self.bitcoind.send_to(address, amount).await?;
690
691        self.bitcoind.mine_blocks(21).await?;
692
693        Ok(())
694    }
695
696    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
697        let deposit_fees_msat = self.deposit_fees()?.msats;
698        assert_eq!(
699            deposit_fees_msat % 1000,
700            0,
701            "Deposit fees expected to be whole sats in test suite"
702        );
703        let deposit_fees = deposit_fees_msat / 1000;
704        info!(amount, deposit_fees, "Pegging-in client funds");
705
706        let (address, operation_id) = client.get_deposit_addr().await?;
707
708        self.bitcoind
709            .send_to(address, amount + deposit_fees)
710            .await?;
711        self.bitcoind.mine_blocks(21).await?;
712
713        Ok(operation_id)
714    }
715
716    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
717        // For walletv2, we need to capture the initial balance and wait for it to
718        // increase since there is no state machine - deposits are auto-claimed
719        let initial_balance = if crate::util::supports_wallet_v2() {
720            Some(client.balance().await?)
721        } else {
722            None
723        };
724
725        let operation_id = self.pegin_client_no_wait(amount, client).await?;
726
727        if let Some(initial) = initial_balance {
728            // Walletv2: wait for balance to increase. We expect slightly less than
729            // `amount` due to mint module fees when creating ecash notes.
730            let expected_balance = initial + (amount * 1000 * 9 / 10);
731            client.await_balance(expected_balance).await?;
732        } else {
733            client.await_deposit(&operation_id).await?;
734        }
735        Ok(())
736    }
737
738    /// Initiates multiple peg-ins to the same federation for the set of
739    /// gateways to save on mining blocks in parallel.
740    pub async fn pegin_gateways(
741        &self,
742        amount: u64,
743        gateways: Vec<&super::gatewayd::Gatewayd>,
744    ) -> Result<()> {
745        let deposit_fees_msat = self.deposit_fees()?.msats;
746        assert_eq!(
747            deposit_fees_msat % 1000,
748            0,
749            "Deposit fees expected to be whole sats in test suite"
750        );
751        let deposit_fees = deposit_fees_msat / 1000;
752        info!(amount, deposit_fees, "Pegging-in gateway funds");
753        let fed_id = self.calculate_federation_id();
754
755        // For walletv2, capture initial balances since deposits are auto-claimed
756        // and we need to check balance change rather than absolute balance
757        // (same approach as pegin_client).
758        let uses_walletv2 = crate::util::supports_wallet_v2();
759        let mut initial_balances = Vec::new();
760        if uses_walletv2 {
761            for gw in &gateways {
762                let balance = gw
763                    .client()
764                    .ecash_balance(fed_id.clone())
765                    .await
766                    .expect("failed to fetch initial gateway balance");
767                initial_balances.push(balance);
768            }
769        }
770
771        for gw in gateways.clone() {
772            let pegin_addr = gw.client().get_pegin_addr(&fed_id).await?;
773            self.bitcoind
774                .send_to(pegin_addr, amount + deposit_fees)
775                .await?;
776        }
777
778        self.bitcoind.mine_blocks(21).await?;
779        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
780        try_join_all(gateways.into_iter().enumerate().map(|(i, gw)| {
781            let initial_balance = if uses_walletv2 {
782                initial_balances[i]
783            } else {
784                0
785            };
786            let fed_id = fed_id.clone();
787            poll("gateway pegin", move || {
788                let fed_id = fed_id.clone();
789                async move {
790                    let gw_info = gw
791                        .client()
792                        .get_info()
793                        .await
794                        .map_err(ControlFlow::Continue)?;
795
796                    let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
797                        gw_info["block_height"]
798                            .as_u64()
799                            .expect("Could not parse block height")
800                    } else {
801                        gw_info["lightning_info"]["connected"]["block_height"]
802                            .as_u64()
803                            .expect("Could not parse block height")
804                    };
805
806                    if bitcoind_block_height != block_height {
807                        return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
808                            "gateway block height is not synced"
809                        )));
810                    }
811
812                    let gateway_balance = gw
813                        .client()
814                        .ecash_balance(fed_id)
815                        .await
816                        .map_err(ControlFlow::Continue)?;
817
818                    if uses_walletv2 {
819                        // Walletv2: check balance increased by approximately
820                        // the expected amount. Use 90% threshold to account
821                        // for mintv2 fees (same as pegin_client).
822                        let expected = initial_balance + (amount * 1000 * 9 / 10);
823                        if gateway_balance >= expected {
824                            Ok(())
825                        } else {
826                            Err(ControlFlow::Continue(anyhow::anyhow!(
827                                "Gateway balance {gateway_balance} has not reached expected {expected} (initial: {initial_balance})"
828                            )))
829                        }
830                    } else {
831                        poll_almost_equal!(gateway_balance, amount * 1000)
832                    }
833                }
834            })
835        }))
836        .await?;
837
838        Ok(())
839    }
840
841    /// Initiates multiple peg-outs from the same federation for the set of
842    /// gateways to save on mining blocks in parallel.
843    pub async fn pegout_gateways(
844        &self,
845        amount: u64,
846        gateways: Vec<&super::gatewayd::Gatewayd>,
847    ) -> Result<()> {
848        info!(amount, "Pegging-out gateway funds");
849        let fed_id = self.calculate_federation_id();
850        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
851        for gw in gateways.clone() {
852            let prev_fed_ecash_balance = gw
853                .client()
854                .get_balances()
855                .await?
856                .ecash_balances
857                .into_iter()
858                .find(|fed| fed.federation_id.to_string() == fed_id)
859                .expect("Gateway has not joined federation")
860                .ecash_balance_msats;
861
862            let pegout_address = self.bitcoind.get_new_address().await?;
863            let response = gw
864                .client()
865                .pegout(fed_id.clone(), amount, pegout_address)
866                .await?;
867            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
868        }
869        self.bitcoind.mine_blocks(21).await?;
870
871        try_join_all(
872            peg_outs
873                .values()
874                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
875        )
876        .await?;
877
878        for gw in gateways.clone() {
879            let after_fed_ecash_balance = gw
880                .client()
881                .get_balances()
882                .await?
883                .ecash_balances
884                .into_iter()
885                .find(|fed| fed.federation_id.to_string() == fed_id)
886                .expect("Gateway has not joined federation")
887                .ecash_balance_msats;
888
889            let ln_type = gw.ln.ln_type();
890            let prev_balance = peg_outs
891                .get(&ln_type)
892                .expect("peg out does not exist")
893                .0
894                .msats;
895            let fees = peg_outs
896                .get(&ln_type)
897                .expect("peg out does not exist")
898                .1
899                .fees;
900            let total_fee = fees.amount().to_sat() * 1000;
901            // Walletv2 charges a module fee on top of the on-chain fee:
902            // 100 sats base + 1% of amount (amount is in msats)
903            let tolerance = if crate::util::supports_wallet_v2() {
904                let amount_sats = amount / 1000;
905                let module_fee_sats = 100 + amount_sats / 100;
906                module_fee_sats * 1000 + 2000
907            } else if crate::util::supports_mint_v2() {
908                4000
909            } else {
910                2000
911            };
912            crate::util::almost_equal(
913                after_fed_ecash_balance.msats,
914                prev_balance - amount - total_fee,
915                tolerance,
916            )
917            .map_err(|e| {
918                anyhow::anyhow!(
919                    "new balance did not equal prev balance minus withdraw_amount minus fees: {e}"
920                )
921            })?;
922        }
923
924        Ok(())
925    }
926
927    pub fn calculate_federation_id(&self) -> String {
928        self.client_config()
929            .unwrap()
930            .global
931            .calculate_federation_id()
932            .to_string()
933    }
934
935    pub async fn await_block_sync(&self) -> Result<u64> {
936        let finality_delay = self.get_finality_delay()?;
937        let block_count = self.bitcoind.get_block_count().await?;
938        let expected = block_count.saturating_sub(finality_delay.into());
939
940        if crate::util::supports_wallet_v2() {
941            // Walletv2 doesn't have `dev wait-block-count`, poll using CLI instead
942            let client = self.internal_client().await?;
943            loop {
944                let value = cmd!(client, "module", "walletv2", "info", "block-count")
945                    .out_json()
946                    .await?;
947                let current: u64 = serde_json::from_value(value)?;
948                if current >= expected {
949                    break;
950                }
951                fedimint_core::task::sleep_in_test(
952                    format!("Waiting for consensus block count to reach {expected}"),
953                    std::time::Duration::from_secs(1),
954                )
955                .await;
956            }
957        } else {
958            cmd!(
959                self.internal_client().await?,
960                "dev",
961                "wait-block-count",
962                expected
963            )
964            .run()
965            .await?;
966        }
967
968        Ok(expected)
969    }
970
971    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
972        // Walletv2 uses a constant finality delay
973        if crate::util::supports_wallet_v2() {
974            return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
975        }
976
977        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
978        let client_config = &self.client_config()?;
979        let wallet_cfg = client_config
980            .modules
981            .get(&wallet_instance_id)
982            .context("wallet module not found")?
983            .clone()
984            .redecode_raw(&ModuleDecoderRegistry::new([(
985                wallet_instance_id,
986                fedimint_wallet_client::KIND,
987                fedimint_wallet_client::WalletModuleTypes::decoder(),
988            )]))?;
989        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
990
991        let finality_delay = wallet_cfg.finality_delay;
992        Ok(finality_delay)
993    }
994
995    pub async fn await_gateways_registered(&self) -> Result<()> {
996        let start_time = Instant::now();
997        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
998
999        poll("gateways registered", || async {
1000            let num_gateways = cmd!(
1001                self.internal_client()
1002                    .await
1003                    .map_err(ControlFlow::Continue)?,
1004                "list-gateways"
1005            )
1006            .out_json()
1007            .await
1008            .map_err(ControlFlow::Continue)?
1009            .as_array()
1010            .context("invalid output")
1011            .map_err(ControlFlow::Break)?
1012            .len();
1013
1014            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
1015            // server, and once for the iroh endpoint.
1016            let expected_gateways =
1017                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
1018                    1
1019                } else {
1020                    2
1021                };
1022
1023            poll_eq!(num_gateways, expected_gateways)
1024        })
1025        .await?;
1026        debug!(target: LOG_DEVIMINT,
1027            elapsed_ms = %start_time.elapsed().as_millis(),
1028            "Gateways registered");
1029        Ok(())
1030    }
1031
1032    pub async fn await_all_peers(&self) -> Result<()> {
1033        let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
1034            ("walletv2", "consensus_block_count")
1035        } else {
1036            ("wallet", "block_count")
1037        };
1038        poll("Waiting for all peers to be online", || async {
1039            cmd!(
1040                self.internal_client()
1041                    .await
1042                    .map_err(ControlFlow::Continue)?,
1043                "dev",
1044                "api",
1045                "--module",
1046                module_name,
1047                endpoint
1048            )
1049            .run()
1050            .await
1051            .map_err(ControlFlow::Continue)?;
1052            Ok(())
1053        })
1054        .await
1055    }
1056
1057    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1058        poll("Waiting for all peers to be online", || async {
1059            cmd!(
1060                self.internal_client()
1061                    .await
1062                    .map_err(ControlFlow::Continue)?,
1063                "dev",
1064                "api",
1065                "--peer-id",
1066                peer_id,
1067                "--module",
1068                "wallet",
1069                "block_count"
1070            )
1071            .run()
1072            .await
1073            .map_err(ControlFlow::Continue)?;
1074            Ok(())
1075        })
1076        .await
1077    }
1078
1079    /// Mines enough blocks to finalize mempool transactions, then waits for
1080    /// federation to process finalized blocks.
1081    ///
1082    /// ex:
1083    ///   tx submitted to mempool at height 100
1084    ///   finality delay = 10
1085    ///   mine finality delay blocks + 1 => new height 111
1086    ///   tx included in block 101
1087    ///   highest finalized height = 111 - 10 = 101
1088    pub async fn finalize_mempool_tx(&self) -> Result<()> {
1089        let finality_delay = self.get_finality_delay()?;
1090        let blocks_to_mine = finality_delay + 1;
1091        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1092        self.await_block_sync().await?;
1093        Ok(())
1094    }
1095
1096    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1097        self.bitcoind.mine_blocks(blocks).await?;
1098        self.await_block_sync().await?;
1099        Ok(())
1100    }
1101
1102    pub fn num_members(&self) -> usize {
1103        self.members.len()
1104    }
1105
1106    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1107        self.members
1108            .keys()
1109            .map(|&peer_id| PeerId::from(peer_id as u16))
1110    }
1111}
1112
1113#[derive(Clone)]
1114pub struct Fedimintd {
1115    _bitcoind: Bitcoind,
1116    process: ProcessHandle,
1117}
1118
1119impl Fedimintd {
1120    pub async fn new(
1121        process_mgr: &ProcessManager,
1122        bitcoind: Bitcoind,
1123        peer_id: usize,
1124        env: &vars::Fedimintd,
1125        fed_name: String,
1126    ) -> Result<Self> {
1127        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1128        let process = process_mgr
1129            .spawn_daemon(
1130                &format!("fedimintd-{fed_name}-{peer_id}"),
1131                cmd!(FedimintdCmd).envs(env.vars()),
1132            )
1133            .await?;
1134
1135        Ok(Self {
1136            _bitcoind: bitcoind,
1137            process,
1138        })
1139    }
1140
1141    pub async fn terminate(self) -> Result<()> {
1142        self.process.terminate().await
1143    }
1144
1145    pub async fn await_terminated(&self) -> Result<()> {
1146        self.process.await_terminated().await
1147    }
1148}
1149
1150pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1151    // Parallelize setup status checks
1152    let status_futures = endpoints.values().map(|endpoint| {
1153        let endpoint = endpoint.clone();
1154        async move {
1155            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1156                crate::util::FedimintCli
1157                    .setup_status(&API_AUTH, &endpoint)
1158                    .await
1159                    .map_err(ControlFlow::Continue)
1160            })
1161            .await
1162            .unwrap();
1163
1164            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1165        }
1166    });
1167    join_all(status_futures).await;
1168
1169    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1170
1171    // Parallelize setting local parameters
1172    // --federation-size is only supported by fedimint-cli >= 0.11.0-alpha
1173    let federation_size =
1174        if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1175            Some(endpoints.len())
1176        } else {
1177            None
1178        };
1179    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1180        let peer = *peer;
1181        let endpoint = endpoint.clone();
1182        async move {
1183            let info = if peer.to_usize() == 0 {
1184                crate::util::FedimintCli
1185                    .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1186                    .await
1187            } else {
1188                crate::util::FedimintCli
1189                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1190                    .await
1191            };
1192            info.map(|i| (peer, i))
1193        }
1194    });
1195    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1196        .await?
1197        .into_iter()
1198        .collect();
1199
1200    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1201
1202    // Parallelize peer addition - flatten the nested loop into a single parallel
1203    // operation
1204    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1205        endpoints
1206            .iter()
1207            .filter(move |(p, _)| *p != peer)
1208            .map(move |(_, endpoint)| {
1209                let endpoint = endpoint.clone();
1210                let info = info.clone();
1211                async move {
1212                    crate::util::FedimintCli
1213                        .add_peer(&info, &API_AUTH, &endpoint)
1214                        .await
1215                }
1216            })
1217    });
1218    try_join_all(add_peer_futures).await?;
1219
1220    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1221
1222    // Parallelize DKG start
1223    let start_dkg_futures = endpoints.values().map(|endpoint| {
1224        let endpoint = endpoint.clone();
1225        async move {
1226            crate::util::FedimintCli
1227                .start_dkg(&API_AUTH, &endpoint)
1228                .await
1229        }
1230    });
1231    try_join_all(start_dkg_futures).await?;
1232
1233    Ok(())
1234}