Skip to main content

devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_testing_core::config::API_AUTH;
27use fedimint_testing_core::node_type::LightningNodeType;
28use fedimint_wallet_client::WalletClientModule;
29use fedimint_wallet_client::config::WalletClientConfig;
30use fs_lock::FileLock;
31use futures::future::{join_all, try_join_all};
32use tokio::task::{JoinSet, spawn_blocking};
33use tokio::time::Instant;
34use tracing::{debug, info};
35
36use super::external::Bitcoind;
37use super::util::{Command, ProcessHandle, ProcessManager, cmd};
38use super::vars::utf8;
39use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
40use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
41use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
42use crate::{poll_almost_equal, poll_eq, vars};
43
44// TODO: Are we still using the 3rd port for anything?
45/// Number of ports we allocate for every `fedimintd` instance
46pub const PORTS_PER_FEDIMINTD: u16 = 4;
47/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
48pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
49/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
50pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
51/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
52pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
53/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
54pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
55
56#[derive(Clone)]
57pub struct Federation {
58    // client is only for internal use, use cli commands instead
59    pub members: BTreeMap<usize, Fedimintd>,
60    pub vars: BTreeMap<usize, vars::Fedimintd>,
61    pub bitcoind: Bitcoind,
62
63    /// Built in [`Client`], already joined
64    client: JitTryAnyhow<Client>,
65    #[allow(dead_code)] // Will need it later, maybe
66    connectors: ConnectorRegistry,
67}
68
69impl Drop for Federation {
70    fn drop(&mut self) {
71        block_in_place(|| {
72            block_on(async {
73                let mut set = JoinSet::new();
74
75                while let Some((_id, fedimintd)) = self.members.pop_first() {
76                    set.spawn(async { drop(fedimintd) });
77                }
78                while (set.join_next().await).is_some() {}
79            });
80        });
81    }
82}
83/// `fedimint-cli` instance (basically path with client state: config + db)
84#[derive(Clone)]
85pub struct Client {
86    name: String,
87}
88
89impl Client {
90    fn clients_dir() -> PathBuf {
91        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
92            .expect("FM_DATA_DIR_ENV not set")
93            .parse()
94            .expect("FM_DATA_DIR_ENV invalid");
95        data_dir.join("clients")
96    }
97
98    fn client_dir(&self) -> PathBuf {
99        Self::clients_dir().join(&self.name)
100    }
101
102    pub fn client_name_lock(name: &str) -> Result<FileLock> {
103        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
104        let file_lock = std::fs::OpenOptions::new()
105            .write(true)
106            .create(true)
107            .truncate(true)
108            .open(&lock_path)
109            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
110
111        fs_lock::FileLock::new_exclusive(file_lock)
112            .with_context(|| format!("Failed to lock {}", lock_path.display()))
113    }
114
115    /// Create a [`Client`] that starts with a fresh state.
116    pub async fn create(name: impl ToString) -> Result<Client> {
117        let name = name.to_string();
118        spawn_blocking(move || {
119            let _lock = Self::client_name_lock(&name);
120            for i in 0u64.. {
121                let client = Self {
122                    name: format!("{name}-{i}"),
123                };
124
125                if !client.client_dir().exists() {
126                    std::fs::create_dir_all(client.client_dir())?;
127                    return Ok(client);
128                }
129            }
130            unreachable!()
131        })
132        .await?
133    }
134
135    /// Open or create a [`Client`] that starts with a fresh state.
136    pub fn open_or_create(name: &str) -> Result<Client> {
137        block_in_place(|| {
138            let _lock = Self::client_name_lock(name);
139            let client = Self {
140                name: format!("{name}-0"),
141            };
142            if !client.client_dir().exists() {
143                std::fs::create_dir_all(client.client_dir())?;
144            }
145            Ok(client)
146        })
147    }
148
149    /// Client to join a federation
150    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
151        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
152        cmd!(self, "join-federation", invite_code).run().await?;
153
154        Ok(())
155    }
156
157    /// Client to join a federation with a restore procedure
158    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
159        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
160        cmd!(
161            self,
162            "restore",
163            "--invite-code",
164            invite_code,
165            "--mnemonic",
166            mnemonic
167        )
168        .run()
169        .await?;
170
171        Ok(())
172    }
173
174    /// Client to join a federation
175    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
176        let restored = Self::open_or_create(name)?;
177
178        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
179            .as_str()
180            .unwrap()
181            .to_owned();
182
183        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
184        cmd!(
185            restored,
186            "restore",
187            "--invite-code",
188            invite_code,
189            "--mnemonic",
190            mnemonic
191        )
192        .run()
193        .await?;
194
195        Ok(restored)
196    }
197
198    /// Create a [`Client`] that starts with a state that is a copy of
199    /// of another one.
200    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
201        let new = Client::create(name).await?;
202
203        cmd!(
204            "cp",
205            "-R",
206            self.client_dir().join("client.db").display(),
207            new.client_dir().display()
208        )
209        .run()
210        .await?;
211
212        Ok(new)
213    }
214
215    pub async fn balance(&self) -> Result<u64> {
216        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
217            .as_u64()
218            .unwrap())
219    }
220
221    /// Waits for the client balance to reach at least `min_balance_msat`.
222    pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
223        loop {
224            cmd!(self, "dev", "wait", "3").out_json().await?;
225
226            let balance = self.balance().await?;
227            if balance >= min_balance_msat {
228                return Ok(());
229            }
230
231            info!(
232                target: LOG_DEVIMINT,
233                balance,
234                min_balance_msat,
235                "Waiting for client balance to reach minimum"
236            );
237        }
238    }
239
240    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
241        if crate::util::supports_wallet_v2() {
242            let address = cmd!(self, "module", "walletv2", "receive")
243                .out_json()
244                .await?;
245            // Walletv2 auto-claims deposits, no operation_id needed
246            Ok((address.as_str().unwrap().to_string(), String::new()))
247        } else {
248            let deposit = cmd!(self, "deposit-address").out_json().await?;
249            Ok((
250                deposit["address"].as_str().unwrap().to_string(),
251                deposit["operation_id"].as_str().unwrap().to_string(),
252            ))
253        }
254    }
255
256    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
257        cmd!(self, "await-deposit", operation_id).run().await
258    }
259
260    pub fn cmd(&self) -> Command {
261        cmd!(
262            crate::util::get_fedimint_cli_path(),
263            format!("--data-dir={}", self.client_dir().display())
264        )
265    }
266
267    pub fn get_name(&self) -> &str {
268        &self.name
269    }
270
271    /// Returns the current consensus session count
272    pub async fn get_session_count(&self) -> Result<u64> {
273        cmd!(self, "dev", "session-count").out_json().await?["count"]
274            .as_u64()
275            .context("count field wasn't a number")
276    }
277
278    /// Returns once all active state machines complete
279    pub async fn wait_complete(&self) -> Result<()> {
280        cmd!(self, "dev", "wait-complete").run().await
281    }
282
283    /// Returns once the current session completes
284    pub async fn wait_session(&self) -> anyhow::Result<()> {
285        info!("Waiting for a new session");
286        let session_count = self.get_session_count().await?;
287        self.wait_session_outcome(session_count).await?;
288        Ok(())
289    }
290
291    /// Returns once the provided session count completes
292    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
293        let timeout = {
294            let current_session_count = self.get_session_count().await?;
295            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
296            let session_duration_seconds = 180;
297            Duration::from_secs(sessions_to_wait * session_duration_seconds)
298        };
299
300        let start = Instant::now();
301        poll_with_timeout("Waiting for a new session", timeout, || async {
302            info!("Awaiting session outcome {session_count}");
303            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
304                .run()
305                .await
306            {
307                Err(e) => Err(ControlFlow::Continue(e)),
308                Ok(()) => Ok(()),
309            }
310        })
311        .await?;
312
313        let session_found_in = start.elapsed();
314        info!("session found in {session_found_in:?}");
315        Ok(())
316    }
317}
318
319impl Federation {
320    pub async fn new(
321        process_mgr: &ProcessManager,
322        bitcoind: Bitcoind,
323        skip_setup: bool,
324        pre_dkg: bool,
325        // Which of the pre-allocated federations to use (most tests just use single `0` one)
326        fed_index: usize,
327        federation_name: String,
328    ) -> Result<Self> {
329        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
330        let mut members = BTreeMap::new();
331        let mut peer_to_env_vars_map = BTreeMap::new();
332
333        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
334        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
335
336        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
337        for peer_id in num_peers.peer_ids() {
338            let peer_env_vars = vars::Fedimintd::init(
339                &process_mgr.globals,
340                federation_name.clone(),
341                peer_id,
342                process_mgr
343                    .globals
344                    .fedimintd_overrides
345                    .peer_expect(fed_index, peer_id),
346            )
347            .await?;
348            members.insert(
349                peer_id.to_usize(),
350                Fedimintd::new(
351                    process_mgr,
352                    bitcoind.clone(),
353                    peer_id.to_usize(),
354                    &peer_env_vars,
355                    federation_name.clone(),
356                )
357                .await?,
358            );
359            let admin_client = DynGlobalApi::new_admin_setup(
360                connectors.clone(),
361                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
362                // TODO: will need it somewhere
363                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
364            )?;
365            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
366            admin_clients.insert(peer_id, admin_client);
367            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
368        }
369
370        if !skip_setup && !pre_dkg {
371            // we don't guarantee backwards-compatibility for dkg, so we use the
372            // fedimint-cli version that matches fedimintd
373            let (original_fedimint_cli_path, original_fm_mint_client) =
374                crate::util::use_matching_fedimint_cli_for_dkg().await?;
375
376            run_cli_dkg_v2(api_endpoints).await?;
377
378            // we're done with dkg, so we can reset the fedimint-cli version
379            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
380
381            // move configs to config directory
382            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
383            let invite_code_filename_original = "invite-code";
384
385            for peer_env_vars in peer_to_env_vars_map.values() {
386                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
387
388                let invite_code = poll_simple("awaiting-invite-code", || async {
389                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
390                    tokio::fs::read_to_string(&path)
391                        .await
392                        .with_context(|| format!("Awaiting invite code file: {path}"))
393                })
394                .await
395                .context("Awaiting invite code file")?;
396
397                download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
398                    .await?;
399            }
400
401            // copy over invite-code file to client directory
402            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
403
404            tokio::fs::copy(
405                format!("{peer_data_dir}/{invite_code_filename_original}"),
406                format!("{client_dir}/{invite_code_filename_original}"),
407            )
408            .await
409            .context("copying invite-code file")?;
410
411            // move each guardian's invite-code file to the client's directory
412            // appending the peer id to the end
413            for (index, peer_env_vars) in &peer_to_env_vars_map {
414                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
415
416                let invite_code_filename_indexed =
417                    format!("{invite_code_filename_original}-{index}");
418                tokio::fs::rename(
419                    format!("{peer_data_dir}/{invite_code_filename_original}"),
420                    format!("{client_dir}/{invite_code_filename_indexed}"),
421                )
422                .await
423                .context("moving invite-code file")?;
424            }
425
426            debug!("Moved invite-code files to client data directory");
427        }
428
429        let client = JitTryAnyhow::new_try({
430            move || async move {
431                let client = Client::open_or_create(federation_name.as_str())?;
432                let invite_code = Self::invite_code_static()?;
433                if !skip_setup && !pre_dkg {
434                    cmd!(client, "join-federation", invite_code).run().await?;
435                }
436                Ok(client)
437            }
438        });
439
440        Ok(Self {
441            members,
442            vars: peer_to_env_vars_map,
443            bitcoind,
444            client,
445            connectors,
446        })
447    }
448
449    pub fn client_config(&self) -> Result<ClientConfig> {
450        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
451        load_from_file(&cfg_path)
452    }
453
454    /// Get the module instance ID for a given module kind
455    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
456        self.client_config()?
457            .modules
458            .iter()
459            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
460            .with_context(|| format!("Module kind {kind} not found"))
461    }
462
463    pub fn module_client_config<M: ClientModule>(
464        &self,
465    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
466        self.client_config()?
467            .modules
468            .iter()
469            .find_map(|(module_instance_id, module_cfg)| {
470                if module_cfg.kind == M::kind() {
471                    let decoders = ModuleDecoderRegistry::new(vec![(
472                        *module_instance_id,
473                        M::kind(),
474                        M::decoder(),
475                    )]);
476                    Some(
477                        module_cfg
478                            .config
479                            .clone()
480                            .redecode_raw(&decoders)
481                            .expect("Decoding client cfg failed")
482                            .expect_decoded_ref()
483                            .as_any()
484                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
485                            .cloned()
486                            .context("Cast to module config failed"),
487                    )
488                } else {
489                    None
490                }
491            })
492            .transpose()
493    }
494
495    pub fn deposit_fees(&self) -> Result<Amount> {
496        if crate::util::supports_wallet_v2() {
497            Ok(self
498                .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
499                .context("No walletv2 module found")?
500                .fee_consensus
501                .base)
502        } else {
503            Ok(self
504                .module_client_config::<WalletClientModule>()?
505                .context("No wallet module found")?
506                .fee_consensus
507                .peg_in_abs)
508        }
509    }
510
511    /// Read the invite code from the client data dir
512    pub fn invite_code(&self) -> Result<String> {
513        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
514        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
515        Ok(invite_code)
516    }
517
518    pub fn invite_code_static() -> Result<String> {
519        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
520        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
521        Ok(invite_code)
522    }
523    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
524        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
525        let name = format!("invite-code-{peer_id}");
526        let invite_code = fs::read_to_string(data_dir.join(name))?;
527        Ok(invite_code)
528    }
529
530    /// Built-in, default, internal [`Client`]
531    ///
532    /// We should be moving away from using it for anything.
533    pub async fn internal_client(&self) -> Result<&Client> {
534        self.client
535            .get_try()
536            .await
537            .context("Internal client joining Federation")
538    }
539
540    /// New [`Client`] that already joined `self`
541    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
542        let client = Client::create(name).await?;
543        client.join_federation(self.invite_code()?).await?;
544        Ok(client)
545    }
546
547    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
548        if self.members.contains_key(&peer) {
549            bail!("fedimintd-{peer} already running");
550        }
551        self.members.insert(
552            peer,
553            Fedimintd::new(
554                process_mgr,
555                self.bitcoind.clone(),
556                peer,
557                &self.vars[&peer],
558                "default".to_string(),
559            )
560            .await?,
561        );
562        Ok(())
563    }
564
565    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
566        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
567            bail!("fedimintd-{peer_id} does not exist");
568        };
569        fedimintd.terminate().await?;
570        Ok(())
571    }
572
573    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
574        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
575            bail!("fedimintd-{peer_id} does not exist");
576        };
577        fedimintd.await_terminated().await?;
578        self.members.remove(&peer_id);
579        Ok(())
580    }
581
582    /// Starts all peers not currently running.
583    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
584        info!("starting all servers");
585        let fed_size = process_mgr.globals.FM_FED_SIZE;
586        for peer_id in 0..fed_size {
587            if self.members.contains_key(&peer_id) {
588                continue;
589            }
590            self.start_server(process_mgr, peer_id).await?;
591        }
592        self.await_all_peers().await?;
593        Ok(())
594    }
595
596    /// Terminates all running peers.
597    pub async fn terminate_all_servers(&mut self) -> Result<()> {
598        info!("terminating all servers");
599        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
600        for peer_id in running_peer_ids {
601            self.terminate_server(peer_id).await?;
602        }
603        Ok(())
604    }
605
606    /// Coordinated shutdown of all peers that restart using the provided
607    /// `bin_path`. Returns `Ok()` once all peers are online.
608    ///
609    /// Staggering the restart more closely simulates upgrades in the wild.
610    pub async fn restart_all_staggered_with_bin(
611        &mut self,
612        process_mgr: &ProcessManager,
613        bin_path: &PathBuf,
614    ) -> Result<()> {
615        let fed_size = process_mgr.globals.FM_FED_SIZE;
616
617        // ensure all peers are online
618        self.start_all_servers(process_mgr).await?;
619
620        // staggered shutdown of peers
621        while self.num_members() > 0 {
622            self.terminate_server(self.num_members() - 1).await?;
623            if self.num_members() > 0 {
624                fedimint_core::task::sleep_in_test(
625                    "waiting to shutdown remaining peers",
626                    Duration::from_secs(10),
627                )
628                .await;
629            }
630        }
631
632        // TODO: Audit that the environment access only happens in single-threaded code.
633        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
634
635        // staggered restart
636        for peer_id in 0..fed_size {
637            self.start_server(process_mgr, peer_id).await?;
638            if peer_id < fed_size - 1 {
639                fedimint_core::task::sleep_in_test(
640                    "waiting to restart remaining peers",
641                    Duration::from_secs(10),
642                )
643                .await;
644            }
645        }
646
647        self.await_all_peers().await?;
648
649        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
650        info!("upgraded fedimintd to version: {}", fedimintd_version);
651        Ok(())
652    }
653
654    pub async fn restart_all_with_bin(
655        &mut self,
656        process_mgr: &ProcessManager,
657        bin_path: &PathBuf,
658    ) -> Result<()> {
659        // get the version we're upgrading to, temporarily updating the fedimintd path
660        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
661        // TODO: Audit that the environment access only happens in single-threaded code.
662        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
663        // TODO: Audit that the environment access only happens in single-threaded code.
664        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
665
666        self.restart_all_staggered_with_bin(process_mgr, bin_path)
667            .await
668    }
669
670    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
671        let fed_size = process_mgr.globals.FM_FED_SIZE;
672        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
673        anyhow::ensure!(
674            fed_size > 3 * offline_nodes,
675            "too many offline nodes ({offline_nodes}) to reach consensus"
676        );
677
678        while self.num_members() > fed_size - offline_nodes {
679            self.terminate_server(self.num_members() - 1).await?;
680        }
681
682        if offline_nodes > 0 {
683            info!(fed_size, offline_nodes, "federation is degraded");
684        }
685        Ok(())
686    }
687
688    pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
689        self.bitcoind.send_to(address, amount).await?;
690
691        self.bitcoind.mine_blocks(21).await?;
692
693        Ok(())
694    }
695
696    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
697        let deposit_fees_msat = self.deposit_fees()?.msats;
698        assert_eq!(
699            deposit_fees_msat % 1000,
700            0,
701            "Deposit fees expected to be whole sats in test suite"
702        );
703        let deposit_fees = deposit_fees_msat / 1000;
704        info!(amount, deposit_fees, "Pegging-in client funds");
705
706        let (address, operation_id) = client.get_deposit_addr().await?;
707
708        self.bitcoind
709            .send_to(address, amount + deposit_fees)
710            .await?;
711        self.bitcoind.mine_blocks(21).await?;
712
713        Ok(operation_id)
714    }
715
716    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
717        // For walletv2, we need to capture the initial balance and wait for it to
718        // increase since there is no state machine - deposits are auto-claimed
719        let initial_balance = if crate::util::supports_wallet_v2() {
720            Some(client.balance().await?)
721        } else {
722            None
723        };
724
725        let operation_id = self.pegin_client_no_wait(amount, client).await?;
726
727        if let Some(initial) = initial_balance {
728            // Walletv2: wait for balance to increase. We expect slightly less than
729            // `amount` due to mint module fees when creating ecash notes.
730            let expected_balance = initial + (amount * 1000 * 9 / 10);
731            client.await_balance(expected_balance).await?;
732        } else {
733            client.await_deposit(&operation_id).await?;
734        }
735        Ok(())
736    }
737
738    /// Initiates multiple peg-ins to the same federation for the set of
739    /// gateways to save on mining blocks in parallel.
740    pub async fn pegin_gateways(
741        &self,
742        amount: u64,
743        gateways: Vec<&super::gatewayd::Gatewayd>,
744    ) -> Result<()> {
745        let deposit_fees_msat = self.deposit_fees()?.msats;
746        assert_eq!(
747            deposit_fees_msat % 1000,
748            0,
749            "Deposit fees expected to be whole sats in test suite"
750        );
751        let deposit_fees = deposit_fees_msat / 1000;
752        info!(amount, deposit_fees, "Pegging-in gateway funds");
753        let fed_id = self.calculate_federation_id();
754        for gw in gateways.clone() {
755            let pegin_addr = gw.get_pegin_addr(&fed_id).await?;
756            self.bitcoind
757                .send_to(pegin_addr, amount + deposit_fees)
758                .await?;
759        }
760
761        self.bitcoind.mine_blocks(21).await?;
762        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
763        try_join_all(gateways.into_iter().map(|gw| {
764            poll("gateway pegin", || async {
765                let gw_info = gw.get_info().await.map_err(ControlFlow::Continue)?;
766
767                let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
768                    gw_info["block_height"]
769                        .as_u64()
770                        .expect("Could not parse block height")
771                } else {
772                    gw_info["lightning_info"]["connected"]["block_height"]
773                        .as_u64()
774                        .expect("Could not parse block height")
775                };
776
777                if bitcoind_block_height != block_height {
778                    return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
779                        "gateway block height is not synced"
780                    )));
781                }
782
783                let gateway_balance = gw
784                    .ecash_balance(fed_id.clone())
785                    .await
786                    .map_err(ControlFlow::Continue)?;
787                poll_almost_equal!(gateway_balance, amount * 1000)
788            })
789        }))
790        .await?;
791
792        Ok(())
793    }
794
795    /// Initiates multiple peg-outs from the same federation for the set of
796    /// gateways to save on mining blocks in parallel.
797    pub async fn pegout_gateways(
798        &self,
799        amount: u64,
800        gateways: Vec<&super::gatewayd::Gatewayd>,
801    ) -> Result<()> {
802        info!(amount, "Pegging-out gateway funds");
803        let fed_id = self.calculate_federation_id();
804        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
805        for gw in gateways.clone() {
806            let prev_fed_ecash_balance = gw
807                .get_balances()
808                .await?
809                .ecash_balances
810                .into_iter()
811                .find(|fed| fed.federation_id.to_string() == fed_id)
812                .expect("Gateway has not joined federation")
813                .ecash_balance_msats;
814
815            let pegout_address = self.bitcoind.get_new_address().await?;
816            let value = cmd!(
817                gw,
818                "ecash",
819                "pegout",
820                "--federation-id",
821                fed_id,
822                "--amount",
823                amount,
824                "--address",
825                pegout_address
826            )
827            .out_json()
828            .await?;
829            let response: WithdrawResponse = serde_json::from_value(value)?;
830            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
831        }
832        self.bitcoind.mine_blocks(21).await?;
833
834        try_join_all(
835            peg_outs
836                .values()
837                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
838        )
839        .await?;
840
841        for gw in gateways.clone() {
842            let after_fed_ecash_balance = gw
843                .get_balances()
844                .await?
845                .ecash_balances
846                .into_iter()
847                .find(|fed| fed.federation_id.to_string() == fed_id)
848                .expect("Gateway has not joined federation")
849                .ecash_balance_msats;
850
851            let ln_type = gw.ln.ln_type();
852            let prev_balance = peg_outs
853                .get(&ln_type)
854                .expect("peg out does not exist")
855                .0
856                .msats;
857            let fees = peg_outs
858                .get(&ln_type)
859                .expect("peg out does not exist")
860                .1
861                .fees;
862            let total_fee = fees.amount().to_sat() * 1000;
863            // Walletv2 charges a module fee on top of the on-chain fee:
864            // 100 sats base + 1% of amount (amount is in msats)
865            let tolerance = if crate::util::supports_wallet_v2() {
866                let amount_sats = amount / 1000;
867                let module_fee_sats = 100 + amount_sats / 100;
868                module_fee_sats * 1000 + 2000
869            } else {
870                2000
871            };
872            crate::util::almost_equal(
873                after_fed_ecash_balance.msats,
874                prev_balance - amount - total_fee,
875                tolerance,
876            )
877            .map_err(|e| {
878                anyhow::anyhow!(
879                    "new balance did not equal prev balance minus withdraw_amount minus fees: {}",
880                    e
881                )
882            })?;
883        }
884
885        Ok(())
886    }
887
888    pub fn calculate_federation_id(&self) -> String {
889        self.client_config()
890            .unwrap()
891            .global
892            .calculate_federation_id()
893            .to_string()
894    }
895
896    pub async fn await_block_sync(&self) -> Result<u64> {
897        let finality_delay = self.get_finality_delay()?;
898        let block_count = self.bitcoind.get_block_count().await?;
899        let expected = block_count.saturating_sub(finality_delay.into());
900
901        if crate::util::supports_wallet_v2() {
902            // Walletv2 doesn't have `dev wait-block-count`, poll using CLI instead
903            let client = self.internal_client().await?;
904            loop {
905                let value = cmd!(client, "module", "walletv2", "info", "block-count")
906                    .out_json()
907                    .await?;
908                let current: u64 = serde_json::from_value(value)?;
909                if current >= expected {
910                    break;
911                }
912                fedimint_core::task::sleep_in_test(
913                    format!("Waiting for consensus block count to reach {expected}"),
914                    std::time::Duration::from_secs(1),
915                )
916                .await;
917            }
918        } else {
919            cmd!(
920                self.internal_client().await?,
921                "dev",
922                "wait-block-count",
923                expected
924            )
925            .run()
926            .await?;
927        }
928
929        Ok(expected)
930    }
931
932    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
933        // Walletv2 uses a constant finality delay
934        if crate::util::supports_wallet_v2() {
935            return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
936        }
937
938        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
939        let client_config = &self.client_config()?;
940        let wallet_cfg = client_config
941            .modules
942            .get(&wallet_instance_id)
943            .context("wallet module not found")?
944            .clone()
945            .redecode_raw(&ModuleDecoderRegistry::new([(
946                wallet_instance_id,
947                fedimint_wallet_client::KIND,
948                fedimint_wallet_client::WalletModuleTypes::decoder(),
949            )]))?;
950        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
951
952        let finality_delay = wallet_cfg.finality_delay;
953        Ok(finality_delay)
954    }
955
956    pub async fn await_gateways_registered(&self) -> Result<()> {
957        let start_time = Instant::now();
958        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
959
960        poll("gateways registered", || async {
961            let num_gateways = cmd!(
962                self.internal_client()
963                    .await
964                    .map_err(ControlFlow::Continue)?,
965                "list-gateways"
966            )
967            .out_json()
968            .await
969            .map_err(ControlFlow::Continue)?
970            .as_array()
971            .context("invalid output")
972            .map_err(ControlFlow::Break)?
973            .len();
974
975            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
976            // server, and once for the iroh endpoint.
977            let expected_gateways =
978                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
979                    1
980                } else {
981                    2
982                };
983
984            poll_eq!(num_gateways, expected_gateways)
985        })
986        .await?;
987        debug!(target: LOG_DEVIMINT,
988            elapsed_ms = %start_time.elapsed().as_millis(),
989            "Gateways registered");
990        Ok(())
991    }
992
993    pub async fn await_all_peers(&self) -> Result<()> {
994        let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
995            ("walletv2", "consensus_block_count")
996        } else {
997            ("wallet", "block_count")
998        };
999        poll("Waiting for all peers to be online", || async {
1000            cmd!(
1001                self.internal_client()
1002                    .await
1003                    .map_err(ControlFlow::Continue)?,
1004                "dev",
1005                "api",
1006                "--module",
1007                module_name,
1008                endpoint
1009            )
1010            .run()
1011            .await
1012            .map_err(ControlFlow::Continue)?;
1013            Ok(())
1014        })
1015        .await
1016    }
1017
1018    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1019        poll("Waiting for all peers to be online", || async {
1020            cmd!(
1021                self.internal_client()
1022                    .await
1023                    .map_err(ControlFlow::Continue)?,
1024                "dev",
1025                "api",
1026                "--peer-id",
1027                peer_id,
1028                "--module",
1029                "wallet",
1030                "block_count"
1031            )
1032            .run()
1033            .await
1034            .map_err(ControlFlow::Continue)?;
1035            Ok(())
1036        })
1037        .await
1038    }
1039
1040    /// Mines enough blocks to finalize mempool transactions, then waits for
1041    /// federation to process finalized blocks.
1042    ///
1043    /// ex:
1044    ///   tx submitted to mempool at height 100
1045    ///   finality delay = 10
1046    ///   mine finality delay blocks + 1 => new height 111
1047    ///   tx included in block 101
1048    ///   highest finalized height = 111 - 10 = 101
1049    pub async fn finalize_mempool_tx(&self) -> Result<()> {
1050        let finality_delay = self.get_finality_delay()?;
1051        let blocks_to_mine = finality_delay + 1;
1052        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1053        self.await_block_sync().await?;
1054        Ok(())
1055    }
1056
1057    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1058        self.bitcoind.mine_blocks(blocks).await?;
1059        self.await_block_sync().await?;
1060        Ok(())
1061    }
1062
1063    pub fn num_members(&self) -> usize {
1064        self.members.len()
1065    }
1066
1067    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1068        self.members
1069            .keys()
1070            .map(|&peer_id| PeerId::from(peer_id as u16))
1071    }
1072}
1073
1074#[derive(Clone)]
1075pub struct Fedimintd {
1076    _bitcoind: Bitcoind,
1077    process: ProcessHandle,
1078}
1079
1080impl Fedimintd {
1081    pub async fn new(
1082        process_mgr: &ProcessManager,
1083        bitcoind: Bitcoind,
1084        peer_id: usize,
1085        env: &vars::Fedimintd,
1086        fed_name: String,
1087    ) -> Result<Self> {
1088        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1089        let process = process_mgr
1090            .spawn_daemon(
1091                &format!("fedimintd-{fed_name}-{peer_id}"),
1092                cmd!(FedimintdCmd).envs(env.vars()),
1093            )
1094            .await?;
1095
1096        Ok(Self {
1097            _bitcoind: bitcoind,
1098            process,
1099        })
1100    }
1101
1102    pub async fn terminate(self) -> Result<()> {
1103        self.process.terminate().await
1104    }
1105
1106    pub async fn await_terminated(&self) -> Result<()> {
1107        self.process.await_terminated().await
1108    }
1109}
1110
1111pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1112    // Parallelize setup status checks
1113    let status_futures = endpoints.values().map(|endpoint| {
1114        let endpoint = endpoint.clone();
1115        async move {
1116            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1117                crate::util::FedimintCli
1118                    .setup_status(&API_AUTH, &endpoint)
1119                    .await
1120                    .map_err(ControlFlow::Continue)
1121            })
1122            .await
1123            .unwrap();
1124
1125            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1126        }
1127    });
1128    join_all(status_futures).await;
1129
1130    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1131
1132    // Parallelize setting local parameters
1133    // --federation-size is only supported by fedimint-cli >= 0.11.0-alpha
1134    let federation_size =
1135        if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1136            Some(endpoints.len())
1137        } else {
1138            None
1139        };
1140    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1141        let peer = *peer;
1142        let endpoint = endpoint.clone();
1143        async move {
1144            let info = if peer.to_usize() == 0 {
1145                crate::util::FedimintCli
1146                    .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1147                    .await
1148            } else {
1149                crate::util::FedimintCli
1150                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1151                    .await
1152            };
1153            info.map(|i| (peer, i))
1154        }
1155    });
1156    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1157        .await?
1158        .into_iter()
1159        .collect();
1160
1161    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1162
1163    // Parallelize peer addition - flatten the nested loop into a single parallel
1164    // operation
1165    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1166        endpoints
1167            .iter()
1168            .filter(move |(p, _)| *p != peer)
1169            .map(move |(_, endpoint)| {
1170                let endpoint = endpoint.clone();
1171                let info = info.clone();
1172                async move {
1173                    crate::util::FedimintCli
1174                        .add_peer(&info, &API_AUTH, &endpoint)
1175                        .await
1176                }
1177            })
1178    });
1179    try_join_all(add_peer_futures).await?;
1180
1181    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1182
1183    // Parallelize DKG start
1184    let start_dkg_futures = endpoints.values().map(|endpoint| {
1185        let endpoint = endpoint.clone();
1186        async move {
1187            crate::util::FedimintCli
1188                .start_dkg(&API_AUTH, &endpoint)
1189                .await
1190        }
1191    });
1192    try_join_all(start_dkg_futures).await?;
1193
1194    Ok(())
1195}