Skip to main content

devimint/
federation.rs

1use std::collections::BTreeMap;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::time::Duration;
6use std::{env, fs};
7
8use anyhow::{Context, Result, bail};
9use fedimint_api_client::api::DynGlobalApi;
10use fedimint_api_client::download_from_invite_code;
11use fedimint_client_module::module::ClientModule;
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::admin_client::SetupStatus;
14use fedimint_core::config::{ClientConfig, load_from_file};
15use fedimint_core::core::{ModuleInstanceId, ModuleKind};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::ModuleCommon;
18use fedimint_core::module::registry::ModuleDecoderRegistry;
19use fedimint_core::runtime::block_in_place;
20use fedimint_core::task::block_on;
21use fedimint_core::task::jit::JitTryAnyhow;
22use fedimint_core::util::SafeUrl;
23use fedimint_core::{Amount, NumPeers, PeerId};
24use fedimint_gateway_common::WithdrawResponse;
25use fedimint_logging::LOG_DEVIMINT;
26use fedimint_server::config::io::{
27    CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
28};
29use fedimint_testing_core::config::API_AUTH;
30use fedimint_testing_core::node_type::LightningNodeType;
31use fedimint_wallet_client::WalletClientModule;
32use fedimint_wallet_client::config::WalletClientConfig;
33use fs_lock::FileLock;
34use futures::future::{join_all, try_join_all};
35use tokio::task::{JoinSet, spawn_blocking};
36use tokio::time::Instant;
37use tracing::{debug, info};
38
39use super::external::Bitcoind;
40use super::util::{Command, ProcessHandle, ProcessManager, cmd};
41use super::vars::utf8;
42use crate::envs::{FM_CLIENT_DIR_ENV, FM_DATA_DIR_ENV};
43use crate::util::{FedimintdCmd, poll, poll_simple, poll_with_timeout};
44use crate::version_constants::{VERSION_0_10_0_ALPHA, VERSION_0_11_0_ALPHA};
45use crate::{poll_almost_equal, poll_eq, vars};
46
47// TODO: Are we still using the 3rd port for anything?
48/// Number of ports we allocate for every `fedimintd` instance
49pub const PORTS_PER_FEDIMINTD: u16 = 4;
50/// Which port is for p2p inside the range from [`PORTS_PER_FEDIMINTD`]
51pub const FEDIMINTD_P2P_PORT_OFFSET: u16 = 0;
52/// Which port is for api inside the range from [`PORTS_PER_FEDIMINTD`]
53pub const FEDIMINTD_API_PORT_OFFSET: u16 = 1;
54/// Which port is for the web ui inside the range from [`PORTS_PER_FEDIMINTD`]
55pub const FEDIMINTD_UI_PORT_OFFSET: u16 = 2;
56/// Which port is for prometheus inside the range from [`PORTS_PER_FEDIMINTD`]
57pub const FEDIMINTD_METRICS_PORT_OFFSET: u16 = 3;
58
59#[derive(Clone)]
60pub struct Federation {
61    // client is only for internal use, use cli commands instead
62    pub members: BTreeMap<usize, Fedimintd>,
63    pub vars: BTreeMap<usize, vars::Fedimintd>,
64    pub bitcoind: Bitcoind,
65
66    /// Built in [`Client`], already joined
67    client: JitTryAnyhow<Client>,
68    #[allow(dead_code)] // Will need it later, maybe
69    connectors: ConnectorRegistry,
70}
71
72impl Drop for Federation {
73    fn drop(&mut self) {
74        block_in_place(|| {
75            block_on(async {
76                let mut set = JoinSet::new();
77
78                while let Some((_id, fedimintd)) = self.members.pop_first() {
79                    set.spawn(async { drop(fedimintd) });
80                }
81                while (set.join_next().await).is_some() {}
82            });
83        });
84    }
85}
86/// `fedimint-cli` instance (basically path with client state: config + db)
87#[derive(Clone)]
88pub struct Client {
89    name: String,
90}
91
92impl Client {
93    fn clients_dir() -> PathBuf {
94        let data_dir: PathBuf = env::var(FM_DATA_DIR_ENV)
95            .expect("FM_DATA_DIR_ENV not set")
96            .parse()
97            .expect("FM_DATA_DIR_ENV invalid");
98        data_dir.join("clients")
99    }
100
101    fn client_dir(&self) -> PathBuf {
102        Self::clients_dir().join(&self.name)
103    }
104
105    pub fn client_name_lock(name: &str) -> Result<FileLock> {
106        let lock_path = Self::clients_dir().join(format!(".{name}.lock"));
107        let file_lock = std::fs::OpenOptions::new()
108            .write(true)
109            .create(true)
110            .truncate(true)
111            .open(&lock_path)
112            .with_context(|| format!("Failed to open {}", lock_path.display()))?;
113
114        fs_lock::FileLock::new_exclusive(file_lock)
115            .with_context(|| format!("Failed to lock {}", lock_path.display()))
116    }
117
118    /// Create a [`Client`] that starts with a fresh state.
119    pub async fn create(name: impl ToString) -> Result<Client> {
120        let name = name.to_string();
121        spawn_blocking(move || {
122            let _lock = Self::client_name_lock(&name);
123            for i in 0u64.. {
124                let client = Self {
125                    name: format!("{name}-{i}"),
126                };
127
128                if !client.client_dir().exists() {
129                    std::fs::create_dir_all(client.client_dir())?;
130                    return Ok(client);
131                }
132            }
133            unreachable!()
134        })
135        .await?
136    }
137
138    /// Open or create a [`Client`] that starts with a fresh state.
139    pub fn open_or_create(name: &str) -> Result<Client> {
140        block_in_place(|| {
141            let _lock = Self::client_name_lock(name);
142            let client = Self {
143                name: format!("{name}-0"),
144            };
145            if !client.client_dir().exists() {
146                std::fs::create_dir_all(client.client_dir())?;
147            }
148            Ok(client)
149        })
150    }
151
152    /// Client to join a federation
153    pub async fn join_federation(&self, invite_code: String) -> Result<()> {
154        debug!(target: LOG_DEVIMINT, "Joining federation with the main client");
155        cmd!(self, "join-federation", invite_code).run().await?;
156
157        Ok(())
158    }
159
160    /// Client to join a federation with a restore procedure
161    pub async fn restore_federation(&self, invite_code: String, mnemonic: String) -> Result<()> {
162        debug!(target: LOG_DEVIMINT, "Joining federation with restore procedure");
163        cmd!(
164            self,
165            "restore",
166            "--invite-code",
167            invite_code,
168            "--mnemonic",
169            mnemonic
170        )
171        .run()
172        .await?;
173
174        Ok(())
175    }
176
177    /// Client to join a federation
178    pub async fn new_restored(&self, name: &str, invite_code: String) -> Result<Self> {
179        let restored = Self::open_or_create(name)?;
180
181        let mnemonic = cmd!(self, "print-secret").out_json().await?["secret"]
182            .as_str()
183            .unwrap()
184            .to_owned();
185
186        debug!(target: LOG_DEVIMINT, name, "Restoring from mnemonic");
187        cmd!(
188            restored,
189            "restore",
190            "--invite-code",
191            invite_code,
192            "--mnemonic",
193            mnemonic
194        )
195        .run()
196        .await?;
197
198        Ok(restored)
199    }
200
201    /// Create a [`Client`] that starts with a state that is a copy of
202    /// of another one.
203    pub async fn new_forked(&self, name: impl ToString) -> Result<Client> {
204        let new = Client::create(name).await?;
205
206        cmd!(
207            "cp",
208            "-R",
209            self.client_dir().join("client.db").display(),
210            new.client_dir().display()
211        )
212        .run()
213        .await?;
214
215        Ok(new)
216    }
217
218    pub async fn balance(&self) -> Result<u64> {
219        Ok(cmd!(self, "info").out_json().await?["total_amount_msat"]
220            .as_u64()
221            .unwrap())
222    }
223
224    /// Waits for the client balance to reach at least `min_balance_msat`.
225    pub async fn await_balance(&self, min_balance_msat: u64) -> Result<()> {
226        loop {
227            cmd!(self, "dev", "wait", "3").out_json().await?;
228
229            let balance = self.balance().await?;
230            if balance >= min_balance_msat {
231                return Ok(());
232            }
233
234            info!(
235                target: LOG_DEVIMINT,
236                balance,
237                min_balance_msat,
238                "Waiting for client balance to reach minimum"
239            );
240        }
241    }
242
243    pub async fn get_deposit_addr(&self) -> Result<(String, String)> {
244        if crate::util::supports_wallet_v2() {
245            let address = cmd!(self, "module", "walletv2", "receive")
246                .out_json()
247                .await?;
248            // Walletv2 auto-claims deposits, no operation_id needed
249            Ok((address.as_str().unwrap().to_string(), String::new()))
250        } else {
251            let deposit = cmd!(self, "deposit-address").out_json().await?;
252            Ok((
253                deposit["address"].as_str().unwrap().to_string(),
254                deposit["operation_id"].as_str().unwrap().to_string(),
255            ))
256        }
257    }
258
259    pub async fn await_deposit(&self, operation_id: &str) -> Result<()> {
260        cmd!(self, "await-deposit", operation_id).run().await
261    }
262
263    pub fn cmd(&self) -> Command {
264        cmd!(
265            crate::util::get_fedimint_cli_path(),
266            format!("--data-dir={}", self.client_dir().display())
267        )
268    }
269
270    pub fn get_name(&self) -> &str {
271        &self.name
272    }
273
274    /// Returns the current consensus session count
275    pub async fn get_session_count(&self) -> Result<u64> {
276        cmd!(self, "dev", "session-count").out_json().await?["count"]
277            .as_u64()
278            .context("count field wasn't a number")
279    }
280
281    /// Returns once all active state machines complete
282    pub async fn wait_complete(&self) -> Result<()> {
283        cmd!(self, "dev", "wait-complete").run().await
284    }
285
286    /// Returns once the current session completes
287    pub async fn wait_session(&self) -> anyhow::Result<()> {
288        info!("Waiting for a new session");
289        let session_count = self.get_session_count().await?;
290        self.wait_session_outcome(session_count).await?;
291        Ok(())
292    }
293
294    /// Returns once the provided session count completes
295    pub async fn wait_session_outcome(&self, session_count: u64) -> anyhow::Result<()> {
296        let timeout = {
297            let current_session_count = self.get_session_count().await?;
298            let sessions_to_wait = session_count.saturating_sub(current_session_count) + 1;
299            let session_duration_seconds = 180;
300            Duration::from_secs(sessions_to_wait * session_duration_seconds)
301        };
302
303        let start = Instant::now();
304        poll_with_timeout("Waiting for a new session", timeout, || async {
305            info!("Awaiting session outcome {session_count}");
306            match cmd!(self, "dev", "api", "await_session_outcome", session_count)
307                .run()
308                .await
309            {
310                Err(e) => Err(ControlFlow::Continue(e)),
311                Ok(()) => Ok(()),
312            }
313        })
314        .await?;
315
316        let session_found_in = start.elapsed();
317        info!("session found in {session_found_in:?}");
318        Ok(())
319    }
320}
321
322impl Federation {
323    pub async fn new(
324        process_mgr: &ProcessManager,
325        bitcoind: Bitcoind,
326        skip_setup: bool,
327        pre_dkg: bool,
328        pre_restore: bool,
329        // Which of the pre-allocated federations to use (most tests just use single `0` one)
330        fed_index: usize,
331        federation_name: String,
332    ) -> Result<Self> {
333        let num_peers = NumPeers::from(process_mgr.globals.FM_FED_SIZE);
334        let mut members = BTreeMap::new();
335        let mut peer_to_env_vars_map = BTreeMap::new();
336
337        let mut admin_clients: BTreeMap<PeerId, DynGlobalApi> = BTreeMap::new();
338        let mut api_endpoints: BTreeMap<PeerId, _> = BTreeMap::new();
339
340        let connectors = ConnectorRegistry::build_from_testing_env()?.bind().await?;
341        for peer_id in num_peers.peer_ids() {
342            let peer_env_vars = vars::Fedimintd::init(
343                &process_mgr.globals,
344                federation_name.clone(),
345                peer_id,
346                process_mgr
347                    .globals
348                    .fedimintd_overrides
349                    .peer_expect(fed_index, peer_id),
350            )
351            .await?;
352            members.insert(
353                peer_id.to_usize(),
354                Fedimintd::new(
355                    process_mgr,
356                    bitcoind.clone(),
357                    peer_id.to_usize(),
358                    &peer_env_vars,
359                    federation_name.clone(),
360                )
361                .await?,
362            );
363            let admin_client = DynGlobalApi::new_admin_setup(
364                connectors.clone(),
365                SafeUrl::parse(&peer_env_vars.FM_API_URL)?,
366                // TODO: will need it somewhere
367                // &process_mgr.globals.FM_FORCE_API_SECRETS.get_active(),
368            )?;
369            api_endpoints.insert(peer_id, peer_env_vars.FM_API_URL.clone());
370            admin_clients.insert(peer_id, admin_client);
371            peer_to_env_vars_map.insert(peer_id.to_usize(), peer_env_vars);
372        }
373
374        if !skip_setup && !pre_dkg {
375            // we don't guarantee backwards-compatibility for dkg, so we use the
376            // fedimint-cli version that matches fedimintd
377            let (original_fedimint_cli_path, original_fm_mint_client) =
378                crate::util::use_matching_fedimint_cli_for_dkg().await?;
379
380            run_cli_dkg_v2(api_endpoints).await?;
381
382            // we're done with dkg, so we can reset the fedimint-cli version
383            crate::util::use_fedimint_cli(original_fedimint_cli_path, original_fm_mint_client);
384
385            // move configs to config directory
386            let client_dir = utf8(&process_mgr.globals.FM_CLIENT_DIR);
387            let invite_code_filename_original = "invite-code";
388
389            for peer_env_vars in peer_to_env_vars_map.values() {
390                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
391
392                let invite_code = poll_simple("awaiting-invite-code", || async {
393                    let path = format!("{peer_data_dir}/{invite_code_filename_original}");
394                    tokio::fs::read_to_string(&path)
395                        .await
396                        .with_context(|| format!("Awaiting invite code file: {path}"))
397                })
398                .await
399                .context("Awaiting invite code file")?;
400
401                download_from_invite_code(&connectors, &InviteCode::from_str(&invite_code)?)
402                    .await?;
403            }
404
405            // copy over invite-code file to client directory
406            let peer_data_dir = utf8(&peer_to_env_vars_map[&0].FM_DATA_DIR);
407
408            tokio::fs::copy(
409                format!("{peer_data_dir}/{invite_code_filename_original}"),
410                format!("{client_dir}/{invite_code_filename_original}"),
411            )
412            .await
413            .context("copying invite-code file")?;
414
415            // move each guardian's invite-code file to the client's directory
416            // appending the peer id to the end
417            for (index, peer_env_vars) in &peer_to_env_vars_map {
418                let peer_data_dir = utf8(&peer_env_vars.FM_DATA_DIR);
419
420                let invite_code_filename_indexed =
421                    format!("{invite_code_filename_original}-{index}");
422                tokio::fs::rename(
423                    format!("{peer_data_dir}/{invite_code_filename_original}"),
424                    format!("{client_dir}/{invite_code_filename_indexed}"),
425                )
426                .await
427                .context("moving invite-code file")?;
428            }
429
430            debug!("Moved invite-code files to client data directory");
431
432            if pre_restore {
433                Self::restart_guardian_for_manual_restore(
434                    process_mgr,
435                    &mut members,
436                    &peer_to_env_vars_map,
437                    &bitcoind,
438                )
439                .await?;
440            }
441        }
442
443        let client = JitTryAnyhow::new_try({
444            move || async move {
445                let client = Client::open_or_create(federation_name.as_str())?;
446                let invite_code = Self::invite_code_static()?;
447                if !skip_setup && !pre_dkg {
448                    cmd!(client, "join-federation", invite_code).run().await?;
449                }
450                Ok(client)
451            }
452        });
453
454        Ok(Self {
455            members,
456            vars: peer_to_env_vars_map,
457            bitcoind,
458            client,
459            connectors,
460        })
461    }
462
463    async fn restart_guardian_for_manual_restore(
464        process_mgr: &ProcessManager,
465        members: &mut BTreeMap<usize, Fedimintd>,
466        peer_to_env_vars_map: &BTreeMap<usize, vars::Fedimintd>,
467        bitcoind: &Bitcoind,
468    ) -> Result<()> {
469        const RESTORE_PEER: usize = 0;
470
471        let peer_env_vars = &peer_to_env_vars_map[&RESTORE_PEER];
472        let backup_dir = process_mgr.globals.FM_TEST_DIR.join("fedimintd-backups");
473        tokio::fs::create_dir_all(&backup_dir)
474            .await
475            .context("Creating fedimintd backup directory")?;
476        let backup_path = backup_dir.join("fedimint-0-guardian-backup.tar");
477
478        Self::write_guardian_backup_tar(&peer_env_vars.FM_DATA_DIR, &backup_path).await?;
479        info!(
480            target: LOG_DEVIMINT,
481            path = %backup_path.display(),
482            "Wrote guardian backup for manual restore"
483        );
484
485        let fedimintd = members
486            .remove(&RESTORE_PEER)
487            .context("Missing fedimint-0 process")?;
488        fedimintd.terminate().await?;
489        tokio::fs::remove_dir_all(&peer_env_vars.FM_DATA_DIR)
490            .await
491            .with_context(|| format!("Removing {}", peer_env_vars.FM_DATA_DIR.display()))?;
492        tokio::fs::create_dir_all(&peer_env_vars.FM_DATA_DIR)
493            .await
494            .with_context(|| format!("Creating {}", peer_env_vars.FM_DATA_DIR.display()))?;
495
496        let fedimintd = Fedimintd::new(
497            process_mgr,
498            bitcoind.clone(),
499            RESTORE_PEER,
500            peer_env_vars,
501            "default".to_string(),
502        )
503        .await?;
504        members.insert(RESTORE_PEER, fedimintd);
505
506        info!(
507            target: LOG_DEVIMINT,
508            ui = %peer_env_vars.FM_BIND_UI,
509            backup = %backup_path.display(),
510            "fedimint-0 restarted in setup mode for manual restore"
511        );
512
513        Ok(())
514    }
515
516    async fn write_guardian_backup_tar(data_dir: &Path, backup_path: &Path) -> Result<()> {
517        let data_dir = data_dir.to_path_buf();
518        let backup_path = backup_path.to_path_buf();
519        spawn_blocking(move || {
520            let file = fs::File::options()
521                .write(true)
522                .create_new(true)
523                .open(&backup_path)
524                .with_context(|| format!("Creating {}", backup_path.display()))?;
525            let mut archive = tar::Builder::new(file);
526            for path in [
527                PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
528                PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
529                PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
530                PathBuf::from(SALT_FILE),
531            ] {
532                archive
533                    .append_path_with_name(data_dir.join(&path), &path)
534                    .with_context(|| format!("Adding {} to backup", path.display()))?;
535            }
536            archive.finish().context("Finishing guardian backup tar")?;
537            Ok::<_, anyhow::Error>(())
538        })
539        .await?
540    }
541
542    pub fn client_config(&self) -> Result<ClientConfig> {
543        let cfg_path = self.vars[&0].FM_DATA_DIR.join("client.json");
544        load_from_file(&cfg_path)
545    }
546
547    /// Get the module instance ID for a given module kind
548    pub fn module_instance_id_by_kind(&self, kind: &ModuleKind) -> Result<ModuleInstanceId> {
549        self.client_config()?
550            .modules
551            .iter()
552            .find_map(|(id, cfg)| if &cfg.kind == kind { Some(*id) } else { None })
553            .with_context(|| format!("Module kind {kind} not found"))
554    }
555
556    pub fn module_client_config<M: ClientModule>(
557        &self,
558    ) -> Result<Option<<M::Common as ModuleCommon>::ClientConfig>> {
559        self.client_config()?
560            .modules
561            .iter()
562            .find_map(|(module_instance_id, module_cfg)| {
563                if module_cfg.kind == M::kind() {
564                    let decoders = ModuleDecoderRegistry::new(vec![(
565                        *module_instance_id,
566                        M::kind(),
567                        M::decoder(),
568                    )]);
569                    Some(
570                        module_cfg
571                            .config
572                            .clone()
573                            .redecode_raw(&decoders)
574                            .expect("Decoding client cfg failed")
575                            .expect_decoded_ref()
576                            .as_any()
577                            .downcast_ref::<<M::Common as ModuleCommon>::ClientConfig>()
578                            .cloned()
579                            .context("Cast to module config failed"),
580                    )
581                } else {
582                    None
583                }
584            })
585            .transpose()
586    }
587
588    pub fn deposit_fees(&self) -> Result<Amount> {
589        if crate::util::supports_wallet_v2() {
590            Ok(self
591                .module_client_config::<fedimint_walletv2_client::WalletClientModule>()?
592                .context("No walletv2 module found")?
593                .fee_consensus
594                .base)
595        } else {
596            Ok(self
597                .module_client_config::<WalletClientModule>()?
598                .context("No wallet module found")?
599                .fee_consensus
600                .peg_in_abs)
601        }
602    }
603
604    /// Read the invite code from the client data dir
605    pub fn invite_code(&self) -> Result<String> {
606        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
607        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
608        Ok(invite_code)
609    }
610
611    pub fn invite_code_static() -> Result<String> {
612        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
613        let invite_code = fs::read_to_string(data_dir.join("invite-code"))?;
614        Ok(invite_code)
615    }
616    pub fn invite_code_for(peer_id: PeerId) -> Result<String> {
617        let data_dir: PathBuf = env::var(FM_CLIENT_DIR_ENV)?.parse()?;
618        let name = format!("invite-code-{peer_id}");
619        let invite_code = fs::read_to_string(data_dir.join(name))?;
620        Ok(invite_code)
621    }
622
623    /// Built-in, default, internal [`Client`]
624    ///
625    /// We should be moving away from using it for anything.
626    pub async fn internal_client(&self) -> Result<&Client> {
627        self.client
628            .get_try()
629            .await
630            .context("Internal client joining Federation")
631    }
632
633    /// New [`Client`] that already joined `self`
634    pub async fn new_joined_client(&self, name: impl ToString) -> Result<Client> {
635        let client = Client::create(name).await?;
636        client.join_federation(self.invite_code()?).await?;
637        Ok(client)
638    }
639
640    pub async fn start_server(&mut self, process_mgr: &ProcessManager, peer: usize) -> Result<()> {
641        if self.members.contains_key(&peer) {
642            bail!("fedimintd-{peer} already running");
643        }
644        self.members.insert(
645            peer,
646            Fedimintd::new(
647                process_mgr,
648                self.bitcoind.clone(),
649                peer,
650                &self.vars[&peer],
651                "default".to_string(),
652            )
653            .await?,
654        );
655        Ok(())
656    }
657
658    pub async fn terminate_server(&mut self, peer_id: usize) -> Result<()> {
659        let Some((_, fedimintd)) = self.members.remove_entry(&peer_id) else {
660            bail!("fedimintd-{peer_id} does not exist");
661        };
662        fedimintd.terminate().await?;
663        Ok(())
664    }
665
666    pub async fn await_server_terminated(&mut self, peer_id: usize) -> Result<()> {
667        let Some(fedimintd) = self.members.get_mut(&peer_id) else {
668            bail!("fedimintd-{peer_id} does not exist");
669        };
670        fedimintd.await_terminated().await?;
671        self.members.remove(&peer_id);
672        Ok(())
673    }
674
675    /// Starts all peers not currently running.
676    pub async fn start_all_servers(&mut self, process_mgr: &ProcessManager) -> Result<()> {
677        info!("starting all servers");
678        let fed_size = process_mgr.globals.FM_FED_SIZE;
679        for peer_id in 0..fed_size {
680            if self.members.contains_key(&peer_id) {
681                continue;
682            }
683            self.start_server(process_mgr, peer_id).await?;
684        }
685        self.await_all_peers().await?;
686        Ok(())
687    }
688
689    /// Terminates all running peers.
690    pub async fn terminate_all_servers(&mut self) -> Result<()> {
691        info!("terminating all servers");
692        let running_peer_ids: Vec<_> = self.members.keys().copied().collect();
693        for peer_id in running_peer_ids {
694            self.terminate_server(peer_id).await?;
695        }
696        Ok(())
697    }
698
699    /// Coordinated shutdown of all peers that restart using the provided
700    /// `bin_path`. Returns `Ok()` once all peers are online.
701    ///
702    /// Staggering the restart more closely simulates upgrades in the wild.
703    pub async fn restart_all_staggered_with_bin(
704        &mut self,
705        process_mgr: &ProcessManager,
706        bin_path: &PathBuf,
707    ) -> Result<()> {
708        let fed_size = process_mgr.globals.FM_FED_SIZE;
709
710        // ensure all peers are online
711        self.start_all_servers(process_mgr).await?;
712
713        // staggered shutdown of peers
714        while self.num_members() > 0 {
715            self.terminate_server(self.num_members() - 1).await?;
716            if self.num_members() > 0 {
717                fedimint_core::task::sleep_in_test(
718                    "waiting to shutdown remaining peers",
719                    Duration::from_secs(10),
720                )
721                .await;
722            }
723        }
724
725        // TODO: Audit that the environment access only happens in single-threaded code.
726        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
727
728        // staggered restart
729        for peer_id in 0..fed_size {
730            self.start_server(process_mgr, peer_id).await?;
731            if peer_id < fed_size - 1 {
732                fedimint_core::task::sleep_in_test(
733                    "waiting to restart remaining peers",
734                    Duration::from_secs(10),
735                )
736                .await;
737            }
738        }
739
740        self.await_all_peers().await?;
741
742        let fedimintd_version = crate::util::FedimintdCmd::version_or_default().await;
743        info!("upgraded fedimintd to version: {}", fedimintd_version);
744        Ok(())
745    }
746
747    pub async fn restart_all_with_bin(
748        &mut self,
749        process_mgr: &ProcessManager,
750        bin_path: &PathBuf,
751    ) -> Result<()> {
752        // get the version we're upgrading to, temporarily updating the fedimintd path
753        let current_fedimintd_path = std::env::var("FM_FEDIMINTD_BASE_EXECUTABLE")?;
754        // TODO: Audit that the environment access only happens in single-threaded code.
755        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", bin_path) };
756        // TODO: Audit that the environment access only happens in single-threaded code.
757        unsafe { std::env::set_var("FM_FEDIMINTD_BASE_EXECUTABLE", current_fedimintd_path) };
758
759        self.restart_all_staggered_with_bin(process_mgr, bin_path)
760            .await
761    }
762
763    pub async fn degrade_federation(&mut self, process_mgr: &ProcessManager) -> Result<()> {
764        let fed_size = process_mgr.globals.FM_FED_SIZE;
765        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
766        anyhow::ensure!(
767            fed_size > 3 * offline_nodes,
768            "too many offline nodes ({offline_nodes}) to reach consensus"
769        );
770
771        while self.num_members() > fed_size - offline_nodes {
772            self.terminate_server(self.num_members() - 1).await?;
773        }
774
775        if offline_nodes > 0 {
776            info!(fed_size, offline_nodes, "federation is degraded");
777        }
778        Ok(())
779    }
780
781    pub async fn send_to_address(&self, address: String, amount: u64) -> Result<()> {
782        self.bitcoind.send_to(address, amount).await?;
783
784        self.bitcoind.mine_blocks(21).await?;
785
786        Ok(())
787    }
788
789    pub async fn pegin_client_no_wait(&self, amount: u64, client: &Client) -> Result<String> {
790        let deposit_fees_msat = self.deposit_fees()?.msats;
791        assert_eq!(
792            deposit_fees_msat % 1000,
793            0,
794            "Deposit fees expected to be whole sats in test suite"
795        );
796        let deposit_fees = deposit_fees_msat / 1000;
797        info!(amount, deposit_fees, "Pegging-in client funds");
798
799        let (address, operation_id) = client.get_deposit_addr().await?;
800
801        self.bitcoind
802            .send_to(address, amount + deposit_fees)
803            .await?;
804        self.bitcoind.mine_blocks(21).await?;
805
806        Ok(operation_id)
807    }
808
809    pub async fn pegin_client(&self, amount: u64, client: &Client) -> Result<()> {
810        // For walletv2, we need to capture the initial balance and wait for it to
811        // increase since there is no state machine - deposits are auto-claimed
812        let initial_balance = if crate::util::supports_wallet_v2() {
813            Some(client.balance().await?)
814        } else {
815            None
816        };
817
818        let operation_id = self.pegin_client_no_wait(amount, client).await?;
819
820        if let Some(initial) = initial_balance {
821            // Walletv2: wait for balance to increase. We expect slightly less than
822            // `amount` due to mint module fees when creating ecash notes.
823            let expected_balance = initial + (amount * 1000 * 9 / 10);
824            client.await_balance(expected_balance).await?;
825        } else {
826            client.await_deposit(&operation_id).await?;
827        }
828        Ok(())
829    }
830
831    /// Initiates multiple peg-ins to the same federation for the set of
832    /// gateways to save on mining blocks in parallel.
833    pub async fn pegin_gateways(
834        &self,
835        amount: u64,
836        gateways: Vec<&super::gatewayd::Gatewayd>,
837    ) -> Result<()> {
838        let deposit_fees_msat = self.deposit_fees()?.msats;
839        assert_eq!(
840            deposit_fees_msat % 1000,
841            0,
842            "Deposit fees expected to be whole sats in test suite"
843        );
844        let deposit_fees = deposit_fees_msat / 1000;
845        info!(amount, deposit_fees, "Pegging-in gateway funds");
846        let fed_id = self.calculate_federation_id();
847
848        // For walletv2, capture initial balances since deposits are auto-claimed
849        // and we need to check balance change rather than absolute balance
850        // (same approach as pegin_client).
851        let uses_walletv2 = crate::util::supports_wallet_v2();
852        let mut initial_balances = Vec::new();
853        if uses_walletv2 {
854            for gw in &gateways {
855                let balance = gw
856                    .client()
857                    .ecash_balance(fed_id.clone())
858                    .await
859                    .expect("failed to fetch initial gateway balance");
860                initial_balances.push(balance);
861            }
862        }
863
864        for gw in gateways.clone() {
865            let pegin_addr = gw.client().get_pegin_addr(&fed_id).await?;
866            self.bitcoind
867                .send_to(pegin_addr, amount + deposit_fees)
868                .await?;
869        }
870
871        self.bitcoind.mine_blocks(21).await?;
872        let bitcoind_block_height: u64 = self.bitcoind.get_block_count().await? - 1;
873        try_join_all(gateways.into_iter().enumerate().map(|(i, gw)| {
874            let initial_balance = if uses_walletv2 {
875                initial_balances[i]
876            } else {
877                0
878            };
879            let fed_id = fed_id.clone();
880            poll("gateway pegin", move || {
881                let fed_id = fed_id.clone();
882                async move {
883                    let gw_info = gw
884                        .client()
885                        .get_info()
886                        .await
887                        .map_err(ControlFlow::Continue)?;
888
889                    let block_height: u64 = if gw.gatewayd_version < *VERSION_0_10_0_ALPHA {
890                        gw_info["block_height"]
891                            .as_u64()
892                            .expect("Could not parse block height")
893                    } else {
894                        gw_info["lightning_info"]["connected"]["block_height"]
895                            .as_u64()
896                            .expect("Could not parse block height")
897                    };
898
899                    if bitcoind_block_height != block_height {
900                        return Err(std::ops::ControlFlow::Continue(anyhow::anyhow!(
901                            "gateway block height is not synced"
902                        )));
903                    }
904
905                    let gateway_balance = gw
906                        .client()
907                        .ecash_balance(fed_id)
908                        .await
909                        .map_err(ControlFlow::Continue)?;
910
911                    if uses_walletv2 {
912                        // Walletv2: check balance increased by approximately
913                        // the expected amount. Use 90% threshold to account
914                        // for mintv2 fees (same as pegin_client).
915                        let expected = initial_balance + (amount * 1000 * 9 / 10);
916                        if gateway_balance >= expected {
917                            Ok(())
918                        } else {
919                            Err(ControlFlow::Continue(anyhow::anyhow!(
920                                "Gateway balance {gateway_balance} has not reached expected {expected} (initial: {initial_balance})"
921                            )))
922                        }
923                    } else {
924                        poll_almost_equal!(gateway_balance, amount * 1000)
925                    }
926                }
927            })
928        }))
929        .await?;
930
931        Ok(())
932    }
933
934    /// Initiates multiple peg-outs from the same federation for the set of
935    /// gateways to save on mining blocks in parallel.
936    pub async fn pegout_gateways(
937        &self,
938        amount: u64,
939        gateways: Vec<&super::gatewayd::Gatewayd>,
940    ) -> Result<()> {
941        info!(amount, "Pegging-out gateway funds");
942        let fed_id = self.calculate_federation_id();
943        let mut peg_outs: BTreeMap<LightningNodeType, (Amount, WithdrawResponse)> = BTreeMap::new();
944        for gw in gateways.clone() {
945            let prev_fed_ecash_balance = gw
946                .client()
947                .get_balances()
948                .await?
949                .ecash_balances
950                .into_iter()
951                .find(|fed| fed.federation_id.to_string() == fed_id)
952                .expect("Gateway has not joined federation")
953                .ecash_balance_msats;
954
955            let pegout_address = self.bitcoind.get_new_address().await?;
956            let response = gw
957                .client()
958                .pegout(fed_id.clone(), amount, pegout_address)
959                .await?;
960            peg_outs.insert(gw.ln.ln_type(), (prev_fed_ecash_balance, response));
961        }
962        self.bitcoind.mine_blocks(21).await?;
963
964        try_join_all(
965            peg_outs
966                .values()
967                .map(|(_, pegout)| self.bitcoind.poll_get_transaction(pegout.txid)),
968        )
969        .await?;
970
971        for gw in gateways.clone() {
972            let after_fed_ecash_balance = gw
973                .client()
974                .get_balances()
975                .await?
976                .ecash_balances
977                .into_iter()
978                .find(|fed| fed.federation_id.to_string() == fed_id)
979                .expect("Gateway has not joined federation")
980                .ecash_balance_msats;
981
982            let ln_type = gw.ln.ln_type();
983            let prev_balance = peg_outs
984                .get(&ln_type)
985                .expect("peg out does not exist")
986                .0
987                .msats;
988            let fees = peg_outs
989                .get(&ln_type)
990                .expect("peg out does not exist")
991                .1
992                .fees;
993            let total_fee = fees.amount().to_sat() * 1000;
994            // Walletv2 charges a module fee on top of the on-chain fee:
995            // 100 sats base + 1% of amount (amount is in msats)
996            let tolerance = if crate::util::supports_wallet_v2() {
997                let amount_sats = amount / 1000;
998                let module_fee_sats = 100 + amount_sats / 100;
999                module_fee_sats * 1000 + 2000
1000            } else if crate::util::supports_mint_v2() {
1001                4000
1002            } else {
1003                2000
1004            };
1005            crate::util::almost_equal(
1006                after_fed_ecash_balance.msats,
1007                prev_balance - amount - total_fee,
1008                tolerance,
1009            )
1010            .map_err(|e| {
1011                anyhow::anyhow!(
1012                    "new balance did not equal prev balance minus withdraw_amount minus fees: {e}"
1013                )
1014            })?;
1015        }
1016
1017        Ok(())
1018    }
1019
1020    pub fn calculate_federation_id(&self) -> String {
1021        self.client_config()
1022            .unwrap()
1023            .global
1024            .calculate_federation_id()
1025            .to_string()
1026    }
1027
1028    pub async fn await_block_sync(&self) -> Result<u64> {
1029        let finality_delay = self.get_finality_delay()?;
1030        let block_count = self.bitcoind.get_block_count().await?;
1031        let expected = block_count.saturating_sub(finality_delay.into());
1032
1033        if crate::util::supports_wallet_v2() {
1034            // Walletv2 doesn't have `dev wait-block-count`, poll using CLI instead
1035            let client = self.internal_client().await?;
1036            loop {
1037                let value = cmd!(client, "module", "walletv2", "info", "block-count")
1038                    .out_json()
1039                    .await?;
1040                let current: u64 = serde_json::from_value(value)?;
1041                if current >= expected {
1042                    break;
1043                }
1044                fedimint_core::task::sleep_in_test(
1045                    format!("Waiting for consensus block count to reach {expected}"),
1046                    std::time::Duration::from_secs(1),
1047                )
1048                .await;
1049            }
1050        } else {
1051            cmd!(
1052                self.internal_client().await?,
1053                "dev",
1054                "wait-block-count",
1055                expected
1056            )
1057            .run()
1058            .await?;
1059        }
1060
1061        Ok(expected)
1062    }
1063
1064    fn get_finality_delay(&self) -> Result<u32, anyhow::Error> {
1065        // Walletv2 uses a constant finality delay
1066        if crate::util::supports_wallet_v2() {
1067            return Ok(fedimint_walletv2_server::CONFIRMATION_FINALITY_DELAY as u32);
1068        }
1069
1070        let wallet_instance_id = self.module_instance_id_by_kind(&fedimint_wallet_client::KIND)?;
1071        let client_config = &self.client_config()?;
1072        let wallet_cfg = client_config
1073            .modules
1074            .get(&wallet_instance_id)
1075            .context("wallet module not found")?
1076            .clone()
1077            .redecode_raw(&ModuleDecoderRegistry::new([(
1078                wallet_instance_id,
1079                fedimint_wallet_client::KIND,
1080                fedimint_wallet_client::WalletModuleTypes::decoder(),
1081            )]))?;
1082        let wallet_cfg: &WalletClientConfig = wallet_cfg.cast()?;
1083
1084        let finality_delay = wallet_cfg.finality_delay;
1085        Ok(finality_delay)
1086    }
1087
1088    pub async fn await_gateways_registered(&self) -> Result<()> {
1089        let start_time = Instant::now();
1090        debug!(target: LOG_DEVIMINT, "Awaiting LN gateways registration");
1091
1092        poll("gateways registered", || async {
1093            let num_gateways = cmd!(
1094                self.internal_client()
1095                    .await
1096                    .map_err(ControlFlow::Continue)?,
1097                "list-gateways"
1098            )
1099            .out_json()
1100            .await
1101            .map_err(ControlFlow::Continue)?
1102            .as_array()
1103            .context("invalid output")
1104            .map_err(ControlFlow::Break)?
1105            .len();
1106
1107            // After version v0.10.0, the LND gateway will register twice. Once for the HTTP
1108            // server, and once for the iroh endpoint.
1109            let expected_gateways =
1110                if crate::util::Gatewayd::version_or_default().await < *VERSION_0_10_0_ALPHA {
1111                    1
1112                } else {
1113                    2
1114                };
1115
1116            poll_eq!(num_gateways, expected_gateways)
1117        })
1118        .await?;
1119        debug!(target: LOG_DEVIMINT,
1120            elapsed_ms = %start_time.elapsed().as_millis(),
1121            "Gateways registered");
1122        Ok(())
1123    }
1124
1125    pub async fn await_all_peers(&self) -> Result<()> {
1126        let (module_name, endpoint) = if crate::util::supports_wallet_v2() {
1127            ("walletv2", "consensus_block_count")
1128        } else {
1129            ("wallet", "block_count")
1130        };
1131        poll("Waiting for all peers to be online", || async {
1132            cmd!(
1133                self.internal_client()
1134                    .await
1135                    .map_err(ControlFlow::Continue)?,
1136                "dev",
1137                "api",
1138                "--module",
1139                module_name,
1140                endpoint
1141            )
1142            .run()
1143            .await
1144            .map_err(ControlFlow::Continue)?;
1145            Ok(())
1146        })
1147        .await
1148    }
1149
1150    pub async fn await_peer(&self, peer_id: usize) -> Result<()> {
1151        poll("Waiting for all peers to be online", || async {
1152            cmd!(
1153                self.internal_client()
1154                    .await
1155                    .map_err(ControlFlow::Continue)?,
1156                "dev",
1157                "api",
1158                "--peer-id",
1159                peer_id,
1160                "--module",
1161                "wallet",
1162                "block_count"
1163            )
1164            .run()
1165            .await
1166            .map_err(ControlFlow::Continue)?;
1167            Ok(())
1168        })
1169        .await
1170    }
1171
1172    /// Mines enough blocks to finalize mempool transactions, then waits for
1173    /// federation to process finalized blocks.
1174    ///
1175    /// ex:
1176    ///   tx submitted to mempool at height 100
1177    ///   finality delay = 10
1178    ///   mine finality delay blocks + 1 => new height 111
1179    ///   tx included in block 101
1180    ///   highest finalized height = 111 - 10 = 101
1181    pub async fn finalize_mempool_tx(&self) -> Result<()> {
1182        let finality_delay = self.get_finality_delay()?;
1183        let blocks_to_mine = finality_delay + 1;
1184        self.bitcoind.mine_blocks(blocks_to_mine.into()).await?;
1185        self.await_block_sync().await?;
1186        Ok(())
1187    }
1188
1189    pub async fn mine_then_wait_blocks_sync(&self, blocks: u64) -> Result<()> {
1190        self.bitcoind.mine_blocks(blocks).await?;
1191        self.await_block_sync().await?;
1192        Ok(())
1193    }
1194
1195    pub fn num_members(&self) -> usize {
1196        self.members.len()
1197    }
1198
1199    pub fn member_ids(&self) -> impl Iterator<Item = PeerId> + '_ {
1200        self.members
1201            .keys()
1202            .map(|&peer_id| PeerId::from(peer_id as u16))
1203    }
1204}
1205
1206#[derive(Clone)]
1207pub struct Fedimintd {
1208    _bitcoind: Bitcoind,
1209    process: ProcessHandle,
1210}
1211
1212impl Fedimintd {
1213    pub async fn new(
1214        process_mgr: &ProcessManager,
1215        bitcoind: Bitcoind,
1216        peer_id: usize,
1217        env: &vars::Fedimintd,
1218        fed_name: String,
1219    ) -> Result<Self> {
1220        debug!(target: LOG_DEVIMINT, "Starting fedimintd-{fed_name}-{peer_id}");
1221        let process = process_mgr
1222            .spawn_daemon(
1223                &format!("fedimintd-{fed_name}-{peer_id}"),
1224                cmd!(FedimintdCmd).envs(env.vars()),
1225            )
1226            .await?;
1227
1228        Ok(Self {
1229            _bitcoind: bitcoind,
1230            process,
1231        })
1232    }
1233
1234    pub async fn terminate(self) -> Result<()> {
1235        self.process.terminate().await
1236    }
1237
1238    pub async fn await_terminated(&self) -> Result<()> {
1239        self.process.await_terminated().await
1240    }
1241}
1242
1243pub async fn run_cli_dkg_v2(endpoints: BTreeMap<PeerId, String>) -> Result<()> {
1244    // Parallelize setup status checks
1245    let status_futures = endpoints.values().map(|endpoint| {
1246        let endpoint = endpoint.clone();
1247        async move {
1248            let status = poll("awaiting-setup-status-awaiting-local-params", || async {
1249                crate::util::FedimintCli
1250                    .setup_status(&API_AUTH, &endpoint)
1251                    .await
1252                    .map_err(ControlFlow::Continue)
1253            })
1254            .await
1255            .unwrap();
1256
1257            assert_eq!(status, SetupStatus::AwaitingLocalParams);
1258        }
1259    });
1260    join_all(status_futures).await;
1261
1262    debug!(target: LOG_DEVIMINT, "Setting local parameters...");
1263
1264    // Parallelize setting local parameters
1265    // --federation-size is only supported by fedimint-cli >= 0.11.0-alpha
1266    let federation_size =
1267        if crate::util::FedimintCli::version_or_default().await >= *VERSION_0_11_0_ALPHA {
1268            Some(endpoints.len())
1269        } else {
1270            None
1271        };
1272    let local_params_futures = endpoints.iter().map(|(peer, endpoint)| {
1273        let peer = *peer;
1274        let endpoint = endpoint.clone();
1275        async move {
1276            let info = if peer.to_usize() == 0 {
1277                crate::util::FedimintCli
1278                    .set_local_params_leader(&peer, &API_AUTH, &endpoint, federation_size)
1279                    .await
1280            } else {
1281                crate::util::FedimintCli
1282                    .set_local_params_follower(&peer, &API_AUTH, &endpoint)
1283                    .await
1284            };
1285            info.map(|i| (peer, i))
1286        }
1287    });
1288    let connection_info: BTreeMap<_, _> = try_join_all(local_params_futures)
1289        .await?
1290        .into_iter()
1291        .collect();
1292
1293    debug!(target: LOG_DEVIMINT, "Exchanging peer connection info...");
1294
1295    // Parallelize peer addition - flatten the nested loop into a single parallel
1296    // operation
1297    let add_peer_futures = connection_info.iter().flat_map(|(peer, info)| {
1298        endpoints
1299            .iter()
1300            .filter(move |(p, _)| *p != peer)
1301            .map(move |(_, endpoint)| {
1302                let endpoint = endpoint.clone();
1303                let info = info.clone();
1304                async move {
1305                    crate::util::FedimintCli
1306                        .add_peer(&info, &API_AUTH, &endpoint)
1307                        .await
1308                }
1309            })
1310    });
1311    try_join_all(add_peer_futures).await?;
1312
1313    debug!(target: LOG_DEVIMINT, "Starting DKG...");
1314
1315    // Parallelize DKG start
1316    let start_dkg_futures = endpoints.values().map(|endpoint| {
1317        let endpoint = endpoint.clone();
1318        async move {
1319            crate::util::FedimintCli
1320                .start_dkg(&API_AUTH, &endpoint)
1321                .await
1322        }
1323    });
1324    try_join_all(start_dkg_futures).await?;
1325
1326    Ok(())
1327}